Skip to main content

mz_repr/
row.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::cell::{Cell, RefCell, RefMut};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::hash::{Hash, Hasher};
16use std::marker::PhantomData;
17use std::mem::{size_of, transmute};
18use std::ops::Deref;
19use std::str;
20
21use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
22use compact_bytes::CompactBytes;
23use mz_ore::cast::{CastFrom, ReinterpretCast};
24use mz_ore::soft_assert_no_log;
25use mz_ore::vec::Vector;
26use mz_persist_types::Codec64;
27use num_enum::{IntoPrimitive, TryFromPrimitive};
28use ordered_float::OrderedFloat;
29#[cfg(any(test, feature = "proptest"))]
30use proptest::prelude::*;
31#[cfg(any(test, feature = "proptest"))]
32use proptest::strategy::{BoxedStrategy, Strategy};
33use serde::{Deserialize, Serialize};
34use uuid::Uuid;
35
36use crate::adt::array::{
37    Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
38};
39use crate::adt::date::Date;
40use crate::adt::interval::Interval;
41use crate::adt::mz_acl_item::{AclItem, MzAclItem};
42use crate::adt::numeric;
43use crate::adt::numeric::Numeric;
44use crate::adt::range::{
45    self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
46};
47use crate::adt::timestamp::CheckedTimestamp;
48#[cfg(any(test, feature = "proptest"))]
49use crate::scalar::arb_datum;
50use crate::scalar::{DatumKind, SqlScalarType};
51use crate::{Datum, RelationDesc, Timestamp};
52
53pub(crate) mod encode;
54pub mod iter;
55
56include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
57
58/// A packed representation for `Datum`s.
59///
60/// `Datum` is easy to work with but very space inefficient. A `Datum::Int32(42)`
61/// is laid out in memory like this:
62///
63///   tag: 3
64///   padding: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
65///   data: 0 0 0 42
66///   padding: 0 0 0 0 0 0 0 0 0 0 0 0
67///
68/// For a total of 32 bytes! The second set of padding is needed in case we were
69/// to write a 16-byte datum into this location. The first set of padding is
70/// needed to align that hypothetical decimal to a 16 bytes boundary.
71///
72/// A `Row` stores zero or more `Datum`s without any padding. We avoid the need
73/// for the first set of padding by only providing access to the `Datum`s via
74/// calls to `ptr::read_unaligned`, which on modern x86 is barely penalized. We
75/// avoid the need for the second set of padding by not providing mutable access
76/// to the `Datum`. Instead, `Row` is append-only.
77///
78/// A `Row` can be built from a collection of `Datum`s using `Row::pack`, but it
79/// is more efficient to use `Row::pack_slice` so that a right-sized allocation
80/// can be created. If that is not possible, consider using the row buffer
81/// pattern: allocate one row, pack into it, and then call [`Row::clone`] to
82/// receive a copy of that row, leaving behind the original allocation to pack
83/// future rows.
84///
85/// Creating a row via [`Row::pack_slice`]:
86///
87/// ```
88/// # use mz_repr::{Row, Datum};
89/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
90/// assert_eq!(row.unpack(), vec![Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)])
91/// ```
92///
93/// `Row`s can be unpacked by iterating over them:
94///
95/// ```
96/// # use mz_repr::{Row, Datum};
97/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
98/// assert_eq!(row.iter().nth(1).unwrap(), Datum::Int32(1));
99/// ```
100///
101/// If you want random access to the `Datum`s in a `Row`, use `Row::unpack` to create a `Vec<Datum>`
102/// ```
103/// # use mz_repr::{Row, Datum};
104/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
105/// let datums = row.unpack();
106/// assert_eq!(datums[1], Datum::Int32(1));
107/// ```
108///
109/// # Performance
110///
111/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
112/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
113/// avoids the allocations involved in `Row::new()`.
114#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
115pub struct Row {
116    data: CompactBytes,
117}
118
119impl Row {
120    const SIZE: usize = CompactBytes::MAX_INLINE;
121
122    /// A variant of `Row::from_proto` that allows for reuse of internal allocs
123    /// and validates the decoding against a provided [`RelationDesc`].
124    pub fn decode_from_proto(
125        &mut self,
126        proto: &ProtoRow,
127        desc: &RelationDesc,
128    ) -> Result<(), String> {
129        let mut packer = self.packer();
130        for (col_idx, _, _) in desc.iter_all() {
131            let d = match proto.datums.get(col_idx.to_raw()) {
132                Some(x) => x,
133                None => {
134                    packer.push(Datum::Null);
135                    continue;
136                }
137            };
138            packer.try_push_proto(d)?;
139        }
140
141        Ok(())
142    }
143
144    /// Allocate an empty `Row` with a pre-allocated capacity.
145    #[inline]
146    pub fn with_capacity(cap: usize) -> Self {
147        Self {
148            data: CompactBytes::with_capacity(cap),
149        }
150    }
151
152    /// Create an empty `Row`.
153    #[inline]
154    pub const fn empty() -> Self {
155        Self {
156            data: CompactBytes::empty(),
157        }
158    }
159
160    /// Creates a new row from supplied bytes.
161    ///
162    /// # Safety
163    ///
164    /// This method relies on `data` being an appropriate row encoding, and can
165    /// result in unsafety if this is not the case.
166    pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
167        Row {
168            data: CompactBytes::new(data),
169        }
170    }
171
172    /// Constructs a [`RowPacker`] that will pack datums into this row's
173    /// allocation.
174    ///
175    /// This method clears the existing contents of the row, but retains the
176    /// allocation.
177    pub fn packer(&mut self) -> RowPacker<'_> {
178        self.clear();
179        RowPacker { row: self }
180    }
181
182    /// Take some `Datum`s and pack them into a `Row`.
183    ///
184    /// This method builds a `Row` by repeatedly increasing the backing
185    /// allocation. If the contents of the iterator are known ahead of
186    /// time, consider [`Row::with_capacity`] to right-size the allocation
187    /// first, and then [`RowPacker::extend`] to populate it with `Datum`s.
188    /// This avoids the repeated allocation resizing and copying.
189    pub fn pack<'a, I, D>(iter: I) -> Row
190    where
191        I: IntoIterator<Item = D>,
192        D: Borrow<Datum<'a>>,
193    {
194        let mut row = Row::default();
195        row.packer().extend(iter);
196        row
197    }
198
199    /// Use `self` to pack `iter`, and then clone the result.
200    ///
201    /// This is a convenience method meant to reduce boilerplate around row
202    /// formation.
203    pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
204    where
205        I: IntoIterator<Item = D>,
206        D: Borrow<Datum<'a>>,
207    {
208        self.packer().extend(iter);
209        self.clone()
210    }
211
212    /// Like [`Row::pack`], but the provided iterator is allowed to produce an
213    /// error, in which case the packing operation is aborted and the error
214    /// returned.
215    pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
216    where
217        I: IntoIterator<Item = Result<D, E>>,
218        D: Borrow<Datum<'a>>,
219    {
220        let mut row = Row::default();
221        row.packer().try_extend(iter)?;
222        Ok(row)
223    }
224
225    /// Pack a slice of `Datum`s into a `Row`.
226    ///
227    /// This method has the advantage over `pack` that it can determine the required
228    /// allocation before packing the elements, ensuring only one allocation and no
229    /// redundant copies required.
230    pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
231        // Pre-allocate the needed number of bytes.
232        let mut row = Row::with_capacity(datums_size(slice.iter()));
233        row.packer().extend(slice.iter());
234        row
235    }
236
237    /// Returns the total amount of bytes used by this row.
238    pub fn byte_len(&self) -> usize {
239        let heap_size = if self.data.spilled() {
240            self.data.len()
241        } else {
242            0
243        };
244        let inline_size = std::mem::size_of::<Self>();
245        inline_size.saturating_add(heap_size)
246    }
247
248    /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself.
249    pub fn data_len(&self) -> usize {
250        self.data.len()
251    }
252
253    /// Returns the total capacity in bytes used by this row.
254    pub fn byte_capacity(&self) -> usize {
255        self.data.capacity()
256    }
257
258    /// Extracts a Row slice containing the entire [`Row`].
259    #[inline]
260    pub fn as_row_ref(&self) -> &RowRef {
261        // SAFETY: `Row` contains valid row data, by construction.
262        unsafe { RowRef::from_slice(self.data.as_slice()) }
263    }
264
265    /// Clear the contents of the [`Row`], leaving any allocation in place.
266    #[inline]
267    fn clear(&mut self) {
268        self.data.clear();
269    }
270}
271
272impl Borrow<RowRef> for Row {
273    #[inline]
274    fn borrow(&self) -> &RowRef {
275        self.as_row_ref()
276    }
277}
278
279impl AsRef<RowRef> for Row {
280    #[inline]
281    fn as_ref(&self) -> &RowRef {
282        self.as_row_ref()
283    }
284}
285
286impl Deref for Row {
287    type Target = RowRef;
288
289    #[inline]
290    fn deref(&self) -> &Self::Target {
291        self.as_row_ref()
292    }
293}
294
295// Nothing depends on Row being exactly 24, we just want to add visibility to the size.
296static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
297
298impl Clone for Row {
299    fn clone(&self) -> Self {
300        Row {
301            data: self.data.clone(),
302        }
303    }
304
305    fn clone_from(&mut self, source: &Self) {
306        self.data.clone_from(&source.data);
307    }
308}
309
310// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
311impl std::hash::Hash for Row {
312    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
313        self.as_row_ref().hash(state)
314    }
315}
316
317#[cfg(any(test, feature = "proptest"))]
318impl Arbitrary for Row {
319    type Parameters = prop::collection::SizeRange;
320    type Strategy = BoxedStrategy<Row>;
321
322    fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
323        prop::collection::vec(arb_datum(true), size)
324            .prop_map(|items| {
325                let mut row = Row::default();
326                let mut packer = row.packer();
327                for item in items.iter() {
328                    let datum: Datum<'_> = item.into();
329                    packer.push(datum);
330                }
331                row
332            })
333            .boxed()
334    }
335}
336
337impl PartialOrd for Row {
338    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
339        Some(self.cmp(other))
340    }
341}
342
343impl Ord for Row {
344    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
345        self.as_ref().cmp(other.as_ref())
346    }
347}
348
349#[allow(missing_debug_implementations)]
350mod columnation {
351    use columnation::{Columnation, Region};
352    use mz_ore::region::LgAllocRegion;
353
354    use crate::Row;
355
356    /// Region allocation for `Row` data.
357    ///
358    /// Content bytes are stored in stable contiguous memory locations,
359    /// and then a `Row` referencing them is falsified.
360    pub struct RowStack {
361        region: LgAllocRegion<u8>,
362    }
363
364    impl RowStack {
365        const LIMIT: usize = 2 << 20;
366    }
367
368    // Implement `Default` manually to specify a region allocation limit.
369    impl Default for RowStack {
370        fn default() -> Self {
371            Self {
372                // Limit the region size to 2MiB.
373                region: LgAllocRegion::with_limit(Self::LIMIT),
374            }
375        }
376    }
377
378    impl Columnation for Row {
379        type InnerRegion = RowStack;
380    }
381
382    impl Region for RowStack {
383        type Item = Row;
384        #[inline]
385        fn clear(&mut self) {
386            self.region.clear();
387        }
388        #[inline(always)]
389        unsafe fn copy(&mut self, item: &Row) -> Row {
390            if item.data.spilled() {
391                let bytes = self.region.copy_slice(&item.data[..]);
392                Row {
393                    data: compact_bytes::CompactBytes::from_raw_parts(
394                        bytes.as_mut_ptr(),
395                        item.data.len(),
396                        item.data.capacity(),
397                    ),
398                }
399            } else {
400                item.clone()
401            }
402        }
403
404        fn reserve_items<'a, I>(&mut self, items: I)
405        where
406            Self: 'a,
407            I: Iterator<Item = &'a Self::Item> + Clone,
408        {
409            let size = items
410                .filter(|row| row.data.spilled())
411                .map(|row| row.data.len())
412                .sum();
413            let size = std::cmp::min(size, Self::LIMIT);
414            self.region.reserve(size);
415        }
416
417        fn reserve_regions<'a, I>(&mut self, regions: I)
418        where
419            Self: 'a,
420            I: Iterator<Item = &'a Self> + Clone,
421        {
422            let size = regions.map(|r| r.region.len()).sum();
423            let size = std::cmp::min(size, Self::LIMIT);
424            self.region.reserve(size);
425        }
426
427        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
428            self.region.heap_size(callback)
429        }
430    }
431}
432
433mod columnar {
434    use columnar::common::PushIndexAs;
435    use columnar::{
436        AsBytes, Borrow, Clear, Columnar, Container, FromBytes, Index, IndexAs, Len, Push,
437    };
438    use mz_ore::cast::CastFrom;
439    use std::ops::Range;
440
441    use crate::{Row, RowRef};
442
443    #[derive(
444        Copy,
445        Clone,
446        Debug,
447        Default,
448        PartialEq,
449        serde::Serialize,
450        serde::Deserialize
451    )]
452    pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
453        /// Bounds container; provides indexed access to offsets.
454        bounds: BC,
455        /// Values container; provides slice access to bytes.
456        values: VC,
457    }
458
459    impl Columnar for Row {
460        #[inline(always)]
461        fn copy_from(&mut self, other: columnar::Ref<'_, Self>) {
462            self.clear();
463            self.data.extend_from_slice(other.data());
464        }
465        #[inline(always)]
466        fn into_owned(other: columnar::Ref<'_, Self>) -> Self {
467            other.to_owned()
468        }
469        type Container = Rows;
470        #[inline(always)]
471        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
472        where
473            Self: 'a,
474        {
475            thing
476        }
477    }
478
479    impl<BC: PushIndexAs<u64>> Borrow for Rows<BC, Vec<u8>> {
480        type Ref<'a> = &'a RowRef;
481        type Borrowed<'a>
482            = Rows<BC::Borrowed<'a>, &'a [u8]>
483        where
484            Self: 'a;
485        #[inline(always)]
486        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
487            Rows {
488                bounds: self.bounds.borrow(),
489                values: self.values.borrow(),
490            }
491        }
492        #[inline(always)]
493        fn reborrow<'c, 'a: 'c>(item: Self::Borrowed<'a>) -> Self::Borrowed<'c>
494        where
495            Self: 'a,
496        {
497            Rows {
498                bounds: BC::reborrow(item.bounds),
499                values: item.values,
500            }
501        }
502
503        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
504        where
505            Self: 'a,
506        {
507            item
508        }
509    }
510
511    impl<BC: PushIndexAs<u64>> Container for Rows<BC, Vec<u8>> {
512        fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
513            if !range.is_empty() {
514                // Imported bounds will be relative to this starting offset.
515                let values_len: u64 = self.values.len().try_into().expect("must fit");
516
517                // Push all bytes that we can, all at once.
518                let other_lower = if range.start == 0 {
519                    0
520                } else {
521                    other.bounds.index_as(range.start - 1)
522                };
523                let other_upper = other.bounds.index_as(range.end - 1);
524                self.values.extend_from_self(
525                    other.values,
526                    usize::try_from(other_lower).expect("must fit")
527                        ..usize::try_from(other_upper).expect("must fit"),
528                );
529
530                // Each bound needs to be shifted by `values_len - other_lower`.
531                if values_len == other_lower {
532                    self.bounds.extend_from_self(other.bounds, range);
533                } else {
534                    for index in range {
535                        let shifted = other.bounds.index_as(index) - other_lower + values_len;
536                        self.bounds.push(&shifted)
537                    }
538                }
539            }
540        }
541        fn reserve_for<'a, I>(&mut self, selves: I)
542        where
543            Self: 'a,
544            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
545        {
546            self.bounds.reserve_for(selves.clone().map(|r| r.bounds));
547            self.values.reserve_for(selves.map(|r| r.values));
548        }
549    }
550
551    impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
552        const SLICE_COUNT: usize = BC::SLICE_COUNT + VC::SLICE_COUNT;
553        #[inline(always)]
554        fn get_byte_slice(&self, index: usize) -> (u64, &'a [u8]) {
555            debug_assert!(index < Self::SLICE_COUNT);
556            if index < BC::SLICE_COUNT {
557                self.bounds.get_byte_slice(index)
558            } else {
559                self.values.get_byte_slice(index - BC::SLICE_COUNT)
560            }
561        }
562    }
563    impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
564        const SLICE_COUNT: usize = BC::SLICE_COUNT + VC::SLICE_COUNT;
565        #[inline(always)]
566        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
567            Self {
568                bounds: FromBytes::from_bytes(bytes),
569                values: FromBytes::from_bytes(bytes),
570            }
571        }
572    }
573
574    impl<BC: Len, VC> Len for Rows<BC, VC> {
575        #[inline(always)]
576        fn len(&self) -> usize {
577            self.bounds.len()
578        }
579    }
580
581    impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
582        type Ref = &'a RowRef;
583        #[inline(always)]
584        fn get(&self, index: usize) -> Self::Ref {
585            let lower = if index == 0 {
586                0
587            } else {
588                self.bounds.index_as(index - 1)
589            };
590            let upper = self.bounds.index_as(index);
591            let lower = usize::cast_from(lower);
592            let upper = usize::cast_from(upper);
593            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
594            // that correspond to the original rows.
595            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
596        }
597    }
598    impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
599        type Ref = &'a RowRef;
600        #[inline(always)]
601        fn get(&self, index: usize) -> Self::Ref {
602            let lower = if index == 0 {
603                0
604            } else {
605                self.bounds.index_as(index - 1)
606            };
607            let upper = self.bounds.index_as(index);
608            let lower = usize::cast_from(lower);
609            let upper = usize::cast_from(upper);
610            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
611            // that correspond to the original rows.
612            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
613        }
614    }
615
616    impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
617        #[inline(always)]
618        fn push(&mut self, item: &Row) {
619            self.values.extend_from_slice(item.data.as_slice());
620            self.bounds.push(u64::cast_from(self.values.len()));
621        }
622    }
623    impl<BC: for<'a> Push<&'a u64>> Push<&RowRef> for Rows<BC> {
624        #[inline(always)]
625        fn push(&mut self, item: &RowRef) {
626            self.values.extend_from_slice(item.data());
627            self.bounds.push(&u64::cast_from(self.values.len()));
628        }
629    }
630    impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
631        #[inline(always)]
632        fn clear(&mut self) {
633            self.bounds.clear();
634            self.values.clear();
635        }
636    }
637}
638
639/// A contiguous slice of bytes that are row data.
640///
641/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
642#[derive(PartialEq, Eq, Hash)]
643#[repr(transparent)]
644pub struct RowRef([u8]);
645
646impl RowRef {
647    /// Create a [`RowRef`] from a slice of data.
648    ///
649    /// # Safety
650    ///
651    /// We do not check that the provided slice is valid [`Row`] data; the caller is required to
652    /// ensure this.
653    pub unsafe fn from_slice(row: &[u8]) -> &RowRef {
654        #[allow(clippy::as_conversions)]
655        let ptr = row as *const [u8] as *const RowRef;
656        // SAFETY: We know `ptr` is non-null and aligned because it came from a &[u8].
657        unsafe { &*ptr }
658    }
659
660    /// Unpack `self` into a `Vec<Datum>` for efficient random access.
661    pub fn unpack(&self) -> Vec<Datum<'_>> {
662        // It's usually cheaper to unpack twice to figure out the right length than it is to grow the vec as we go
663        let len = self.iter().count();
664        let mut vec = Vec::with_capacity(len);
665        vec.extend(self.iter());
666        vec
667    }
668
669    /// Return the first [`Datum`] in `self`
670    ///
671    /// Panics if the [`RowRef`] is empty.
672    pub fn unpack_first(&self) -> Datum<'_> {
673        self.iter().next().unwrap()
674    }
675
676    /// Iterate the [`Datum`] elements of the [`RowRef`].
677    pub fn iter(&self) -> DatumListIter<'_> {
678        DatumListIter { data: &self.0 }
679    }
680
681    /// Return the byte length of this [`RowRef`].
682    pub fn byte_len(&self) -> usize {
683        self.0.len()
684    }
685
686    /// For debugging only.
687    pub fn data(&self) -> &[u8] {
688        &self.0
689    }
690
691    /// True iff there is no data in this [`RowRef`].
692    pub fn is_empty(&self) -> bool {
693        self.0.is_empty()
694    }
695}
696
697impl ToOwned for RowRef {
698    type Owned = Row;
699
700    fn to_owned(&self) -> Self::Owned {
701        // SAFETY: RowRef has the invariant that the wrapped data must be a valid Row encoding.
702        unsafe { Row::from_bytes_unchecked(&self.0) }
703    }
704}
705
706impl<'a> IntoIterator for &'a RowRef {
707    type Item = Datum<'a>;
708    type IntoIter = DatumListIter<'a>;
709
710    fn into_iter(self) -> DatumListIter<'a> {
711        DatumListIter { data: &self.0 }
712    }
713}
714
715/// These implementations order first by length, and then by slice contents.
716/// This allows many comparisons to complete without dereferencing memory.
717/// Warning: These order by the u8 array representation, and NOT by Datum::cmp.
718impl PartialOrd for RowRef {
719    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
720        Some(self.cmp(other))
721    }
722}
723
724impl Ord for RowRef {
725    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
726        match self.0.len().cmp(&other.0.len()) {
727            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
728            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
729            std::cmp::Ordering::Equal => self.0.cmp(&other.0),
730        }
731    }
732}
733
734impl fmt::Debug for RowRef {
735    /// Debug representation using the internal datums
736    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
737        f.write_str("RowRef{")?;
738        f.debug_list().entries(&*self).finish()?;
739        f.write_str("}")
740    }
741}
742
743/// Packs datums into a [`Row`].
744///
745/// Creating a `RowPacker` via [`Row::packer`] starts a packing operation on the
746/// row. A packing operation always starts from scratch: the existing contents
747/// of the underlying row are cleared.
748///
749/// To complete a packing operation, drop the `RowPacker`.
750#[derive(Debug)]
751pub struct RowPacker<'a> {
752    row: &'a mut Row,
753}
754
755/// Infallible conversion from a [`Datum`] to a typed value.
756///
757/// Used by [`DatumList::typed_iter`] to yield elements as `T` rather than
758/// raw `Datum`s. At runtime, `T` is always `Datum<'a>`, so the conversion
759/// is identity.
760///
761/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
762/// behind the generic type parameter and type erasure.
763///
764/// This trait is sealed and cannot be implemented outside of this crate.
765pub trait FromDatum<'a>:
766    Sized + PartialEq + std::borrow::Borrow<Datum<'a>> + sealed::Sealed
767{
768    fn from_datum(datum: Datum<'a>) -> Self;
769}
770
771mod sealed {
772    use crate::Datum;
773
774    pub trait Sealed {}
775    impl<'a> Sealed for Datum<'a> {}
776}
777
778impl<'a> FromDatum<'a> for Datum<'a> {
779    #[inline]
780    fn from_datum(datum: Datum<'a>) -> Self {
781        datum
782    }
783}
784
785#[derive(Debug, Clone)]
786pub struct DatumListIter<'a> {
787    data: &'a [u8],
788}
789
790#[derive(Debug, Clone)]
791pub struct DatumListTypedIter<'a, T> {
792    inner: DatumListIter<'a>,
793    _phantom: PhantomData<fn() -> T>,
794}
795
796#[derive(Debug, Clone)]
797pub struct DatumDictIter<'a> {
798    data: &'a [u8],
799    prev_key: Option<&'a str>,
800}
801
802#[derive(Debug, Clone)]
803pub struct DatumDictTypedIter<'a, T> {
804    inner: DatumDictIter<'a>,
805    _phantom: PhantomData<fn() -> T>,
806}
807
808/// `RowArena` is used to hold on to temporary `Row`s for functions like `eval` that need to create complex `Datum`s but don't have a `Row` to put them in yet.
809#[derive(Debug)]
810pub struct RowArena {
811    // A stack of byte regions, used as a bump allocator. Bytes handed to
812    // `push_bytes` are *copied* into the active (last) region and a reference
813    // into that region is returned.
814    //
815    // The invariant that keeps returned references valid for the arena's
816    // lifetime is that a region is never reallocated once it holds data: when
817    // the active region lacks spare capacity for a push we allocate a *new*,
818    // larger region rather than growing the current one (which would move its
819    // bytes and dangle outstanding references). The outer `Vec` may itself
820    // reallocate as regions are added, but that only moves the `Vec<u8>`
821    // headers, not the heap buffers they own, so references remain valid.
822    //
823    // `clear` retains only the largest region (emptied) to right-size the arena
824    // for reuse; reusing one region across `clear` cycles makes a steady-state
825    // workload (e.g. decoding rows one at a time) allocation-free.
826    inner: RefCell<Vec<Vec<u8>>>,
827    // A single reusable scratch buffer backing `RowArena::writer`. A writer builds into it and
828    // `finish` copies the result into a region (via `push_bytes`); the buffer is then retained
829    // (cleared, capacity kept) for the next writer, so building values incrementally does not
830    // allocate per use once it reaches its high-water mark.
831    scratch: RefCell<Vec<u8>>,
832}
833
834// DatumList and DatumDict defined here rather than near Datum because we need private access to the unsafe data field
835
836/// A sequence of Datums
837///
838/// The type parameter `T` represents the element type of the list. It is a
839/// phantom parameter that carries no runtime data — the actual elements are
840/// stored as serialized bytes and `T` is not enforced at runtime. It is up
841/// to the caller to ensure `T` matches the actual element type. The default
842/// `T = Datum<'a>` means existing code that writes `DatumList<'a>` continues
843/// to work unchanged.
844///
845/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
846/// behind the generic type parameter.
847pub struct DatumList<'a, T = Datum<'a>> {
848    /// Points at the serialized datums
849    data: &'a [u8],
850    _phantom: PhantomData<fn() -> T>,
851}
852
853impl<'a, T> DatumList<'a, T> {
854    /// Private constructor. All `DatumList` values should be created through
855    /// this function to keep the `PhantomData` bookkeeping in one place.
856    pub(crate) fn new(data: &'a [u8]) -> Self {
857        DatumList {
858            data,
859            _phantom: PhantomData,
860        }
861    }
862}
863
864impl<'a, T> Clone for DatumList<'a, T> {
865    fn clone(&self) -> Self {
866        *self
867    }
868}
869
870impl<'a, T> Copy for DatumList<'a, T> {}
871
872impl<'a, T> Debug for DatumList<'a, T> {
873    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
874        f.debug_list().entries(self.iter()).finish()
875    }
876}
877
878impl<'a, T> PartialEq for DatumList<'a, T> {
879    #[inline(always)]
880    fn eq(&self, other: &DatumList<'a, T>) -> bool {
881        self.iter().eq(other.iter())
882    }
883}
884
885impl<'a, T> Eq for DatumList<'a, T> {}
886
887impl<'a, T> Hash for DatumList<'a, T> {
888    #[inline(always)]
889    fn hash<H: Hasher>(&self, state: &mut H) {
890        for d in self.iter() {
891            d.hash(state);
892        }
893    }
894}
895
896impl<T> Ord for DatumList<'_, T> {
897    #[inline(always)]
898    fn cmp(&self, other: &DatumList<'_, T>) -> Ordering {
899        self.iter().cmp(other.iter())
900    }
901}
902
903impl<T> PartialOrd for DatumList<'_, T> {
904    #[inline(always)]
905    fn partial_cmp(&self, other: &DatumList<'_, T>) -> Option<Ordering> {
906        Some(self.cmp(other))
907    }
908}
909
910/// A mapping from string keys to Datums
911///
912/// The type parameter `T` represents the value type of the map. It is a
913/// phantom parameter — the actual values are stored as serialized bytes and
914/// `T` is not enforced at runtime. It is up to the caller to ensure `T`
915/// matches the actual value type. The default `T = Datum<'a>` means existing
916/// code that writes `DatumMap<'a>` continues to work unchanged.
917///
918/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
919/// behind the generic type parameter.
920pub struct DatumMap<'a, T = Datum<'a>> {
921    /// Points at the serialized datums, which should be sorted in key order
922    data: &'a [u8],
923    _phantom: PhantomData<fn() -> T>,
924}
925
926impl<'a, T> DatumMap<'a, T> {
927    /// Private constructor. All `DatumMap` values should be created through
928    /// this function to keep the `PhantomData` bookkeeping in one place.
929    pub(crate) fn new(data: &'a [u8]) -> Self {
930        DatumMap {
931            data,
932            _phantom: PhantomData,
933        }
934    }
935}
936
937impl<'a, T> Clone for DatumMap<'a, T> {
938    fn clone(&self) -> Self {
939        *self
940    }
941}
942
943impl<'a, T> Copy for DatumMap<'a, T> {}
944
945impl<'a, T> PartialEq for DatumMap<'a, T> {
946    #[inline(always)]
947    fn eq(&self, other: &DatumMap<'a, T>) -> bool {
948        self.iter().eq(other.iter())
949    }
950}
951
952impl<'a, T> Eq for DatumMap<'a, T> {}
953
954impl<'a, T> Hash for DatumMap<'a, T> {
955    #[inline(always)]
956    fn hash<H: Hasher>(&self, state: &mut H) {
957        for (k, v) in self.iter() {
958            k.hash(state);
959            v.hash(state);
960        }
961    }
962}
963
964impl<'a, T> Ord for DatumMap<'a, T> {
965    #[inline(always)]
966    fn cmp(&self, other: &DatumMap<'a, T>) -> Ordering {
967        self.iter().cmp(other.iter())
968    }
969}
970
971impl<'a, T> PartialOrd for DatumMap<'a, T> {
972    #[inline(always)]
973    fn partial_cmp(&self, other: &DatumMap<'a, T>) -> Option<Ordering> {
974        Some(self.cmp(other))
975    }
976}
977
978impl<'a> crate::scalar::SqlContainerType for DatumList<'a, Datum<'a>> {
979    fn unwrap_element_type(container: &SqlScalarType) -> &SqlScalarType {
980        container.unwrap_list_element_type()
981    }
982    fn wrap_element_type(element: SqlScalarType) -> SqlScalarType {
983        SqlScalarType::List {
984            element_type: Box::new(element),
985            custom_id: None,
986        }
987    }
988}
989
990impl<'a> crate::scalar::SqlContainerType for DatumMap<'a, Datum<'a>> {
991    fn unwrap_element_type(container: &SqlScalarType) -> &SqlScalarType {
992        container.unwrap_map_value_type()
993    }
994    fn wrap_element_type(element: SqlScalarType) -> SqlScalarType {
995        SqlScalarType::Map {
996            value_type: Box::new(element),
997            custom_id: None,
998        }
999    }
1000}
1001
1002/// Represents a single `Datum`, appropriate to be nested inside other
1003/// `Datum`s.
1004#[derive(Clone, Copy, Eq, PartialEq, Hash)]
1005pub struct DatumNested<'a> {
1006    val: &'a [u8],
1007}
1008
1009impl<'a> std::fmt::Display for DatumNested<'a> {
1010    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1011        std::fmt::Display::fmt(&self.datum(), f)
1012    }
1013}
1014
1015impl<'a> std::fmt::Debug for DatumNested<'a> {
1016    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1017        f.debug_struct("DatumNested")
1018            .field("val", &self.datum())
1019            .finish()
1020    }
1021}
1022
1023impl<'a> DatumNested<'a> {
1024    // Figure out which bytes `read_datum` returns (e.g. including the tag),
1025    // and then store a reference to those bytes, so we can "replay" this same
1026    // call later on without storing the datum itself.
1027    pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
1028        let prev = *data;
1029        let _ = unsafe { read_datum(data) };
1030        DatumNested {
1031            val: &prev[..(prev.len() - data.len())],
1032        }
1033    }
1034
1035    /// Returns the datum `self` contains.
1036    pub fn datum(&self) -> Datum<'a> {
1037        let mut temp = self.val;
1038        unsafe { read_datum(&mut temp) }
1039    }
1040}
1041
1042impl<'a> Ord for DatumNested<'a> {
1043    fn cmp(&self, other: &Self) -> Ordering {
1044        self.datum().cmp(&other.datum())
1045    }
1046}
1047
1048impl<'a> PartialOrd for DatumNested<'a> {
1049    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1050        Some(self.cmp(other))
1051    }
1052}
1053
1054// Prefer adding new tags to the end of the enum. Certain behavior, like row ordering and EXPLAIN
1055// PHYSICAL PLAN, rely on the ordering of this enum. Neither of these are breaking changes, but
1056// it's annoying when they change.
1057#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
1058#[repr(u8)]
1059enum Tag {
1060    Null,
1061    False,
1062    True,
1063    Int16,
1064    Int32,
1065    Int64,
1066    UInt8,
1067    UInt32,
1068    Float32,
1069    Float64,
1070    Date,
1071    Time,
1072    Timestamp,
1073    TimestampTz,
1074    Interval,
1075    BytesTiny,
1076    BytesShort,
1077    BytesLong,
1078    BytesHuge,
1079    StringTiny,
1080    StringShort,
1081    StringLong,
1082    StringHuge,
1083    Uuid,
1084    Array,
1085    ListTiny,
1086    ListShort,
1087    ListLong,
1088    ListHuge,
1089    Dict,
1090    JsonNull,
1091    Dummy,
1092    Numeric,
1093    UInt16,
1094    UInt64,
1095    MzTimestamp,
1096    Range,
1097    MzAclItem,
1098    AclItem,
1099    // Everything except leap seconds and times beyond the range of
1100    // i64 nanoseconds. (Note that Materialize does not support leap
1101    // seconds, but this module does).
1102    CheapTimestamp,
1103    // Everything except leap seconds and times beyond the range of
1104    // i64 nanoseconds. (Note that Materialize does not support leap
1105    // seconds, but this module does).
1106    CheapTimestampTz,
1107    // The next several tags are for variable-length signed integer encoding.
1108    // The basic idea is that `NonNegativeIntN_K` is used to encode a datum of type
1109    // IntN whose actual value is positive or zero and fits in K bits, and similarly for
1110    // NegativeIntN_K with negative values.
1111    //
1112    // The order of these tags matters, because we want to be able to choose the
1113    // tag for a given datum quickly, with arithmetic, rather than slowly, with a
1114    // stack of `if` statements.
1115    //
1116    // Separate tags for non-negative and negative numbers are used to avoid having to
1117    // waste one bit in the actual data space to encode the sign.
1118    NonNegativeInt16_0, // i.e., 0
1119    NonNegativeInt16_8,
1120    NonNegativeInt16_16,
1121
1122    NonNegativeInt32_0,
1123    NonNegativeInt32_8,
1124    NonNegativeInt32_16,
1125    NonNegativeInt32_24,
1126    NonNegativeInt32_32,
1127
1128    NonNegativeInt64_0,
1129    NonNegativeInt64_8,
1130    NonNegativeInt64_16,
1131    NonNegativeInt64_24,
1132    NonNegativeInt64_32,
1133    NonNegativeInt64_40,
1134    NonNegativeInt64_48,
1135    NonNegativeInt64_56,
1136    NonNegativeInt64_64,
1137
1138    NegativeInt16_0, // i.e., -1
1139    NegativeInt16_8,
1140    NegativeInt16_16,
1141
1142    NegativeInt32_0,
1143    NegativeInt32_8,
1144    NegativeInt32_16,
1145    NegativeInt32_24,
1146    NegativeInt32_32,
1147
1148    NegativeInt64_0,
1149    NegativeInt64_8,
1150    NegativeInt64_16,
1151    NegativeInt64_24,
1152    NegativeInt64_32,
1153    NegativeInt64_40,
1154    NegativeInt64_48,
1155    NegativeInt64_56,
1156    NegativeInt64_64,
1157
1158    // These are like the ones above, but for unsigned types. The
1159    // situation is slightly simpler as we don't have negatives.
1160    UInt8_0, // i.e., 0
1161    UInt8_8,
1162
1163    UInt16_0,
1164    UInt16_8,
1165    UInt16_16,
1166
1167    UInt32_0,
1168    UInt32_8,
1169    UInt32_16,
1170    UInt32_24,
1171    UInt32_32,
1172
1173    UInt64_0,
1174    UInt64_8,
1175    UInt64_16,
1176    UInt64_24,
1177    UInt64_32,
1178    UInt64_40,
1179    UInt64_48,
1180    UInt64_56,
1181    UInt64_64,
1182}
1183
1184impl Tag {
1185    fn actual_int_length(self) -> Option<usize> {
1186        use Tag::*;
1187        let val = match self {
1188            NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
1189            | UInt32_0 | UInt64_0 => 0,
1190            NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
1191            | UInt32_8 | UInt64_8 => 1,
1192            NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
1193            | UInt32_16 | UInt64_16 => 2,
1194            NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
1195            NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
1196            NonNegativeInt64_40 | UInt64_40 => 5,
1197            NonNegativeInt64_48 | UInt64_48 => 6,
1198            NonNegativeInt64_56 | UInt64_56 => 7,
1199            NonNegativeInt64_64 | UInt64_64 => 8,
1200            NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
1201            NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
1202            NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
1203            NegativeInt32_24 | NegativeInt64_24 => 3,
1204            NegativeInt32_32 | NegativeInt64_32 => 4,
1205            NegativeInt64_40 => 5,
1206            NegativeInt64_48 => 6,
1207            NegativeInt64_56 => 7,
1208            NegativeInt64_64 => 8,
1209
1210            _ => return None,
1211        };
1212        Some(val)
1213    }
1214}
1215
1216// --------------------------------------------------------------------------------
1217// reading data
1218
1219/// Read a byte slice starting at byte `offset`.
1220///
1221/// Updates `offset` to point to the first byte after the end of the read region.
1222fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
1223    let len = u64::from_le_bytes(read_byte_array(data));
1224    let len = usize::cast_from(len);
1225    let (bytes, next) = data.split_at(len);
1226    *data = next;
1227    bytes
1228}
1229
1230/// Read a data whose length is encoded in the row before its contents.
1231///
1232/// Updates `offset` to point to the first byte after the end of the read region.
1233///
1234/// # Safety
1235///
1236/// This function is safe if the datum's length and contents were previously written by `push_lengthed_bytes`,
1237/// and it was only written with a `String` tag if it was indeed UTF-8.
1238unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
1239    let len = match tag {
1240        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
1241        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1242            usize::from(u16::from_le_bytes(read_byte_array(data)))
1243        }
1244        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1245            usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
1246        }
1247        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1248            usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
1249        }
1250        _ => unreachable!(),
1251    };
1252    let (bytes, next) = data.split_at(len);
1253    *data = next;
1254    match tag {
1255        Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
1256        Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
1257            Datum::String(str::from_utf8_unchecked(bytes))
1258        }
1259        Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
1260            Datum::List(DatumList::new(bytes))
1261        }
1262        _ => unreachable!(),
1263    }
1264}
1265
1266fn read_byte(data: &mut &[u8]) -> u8 {
1267    let byte = data[0];
1268    *data = &data[1..];
1269    byte
1270}
1271
1272/// Read `length` bytes from `data` at `offset`, updating the
1273/// latter. Extend the resulting buffer to an array of `N` bytes by
1274/// inserting `FILL` in the k most significant bytes, where k = N - length.
1275///
1276/// SAFETY:
1277///   * length <= N
1278///   * offset + length <= data.len()
1279fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1280    data: &mut &[u8],
1281    length: usize,
1282) -> [u8; N] {
1283    let mut raw = [FILL; N];
1284    let (prev, next) = data.split_at(length);
1285    (raw[..prev.len()]).copy_from_slice(prev);
1286    *data = next;
1287    raw
1288}
1289/// Read `length` bytes from `data` at `offset`, updating the
1290/// latter. Extend the resulting buffer to a negative `N`-byte
1291/// twos complement integer by filling the remaining bits with 1.
1292///
1293/// SAFETY:
1294///   * length <= N
1295///   * offset + length <= data.len()
1296fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1297    read_byte_array_sign_extending::<N, 255>(data, length)
1298}
1299
1300/// Read `length` bytes from `data` at `offset`, updating the
1301/// latter. Extend the resulting buffer to a positive or zero `N`-byte
1302/// twos complement integer by filling the remaining bits with 0.
1303///
1304/// SAFETY:
1305///   * length <= N
1306///   * offset + length <= data.len()
1307fn read_byte_array_extending_nonnegative<const N: usize>(
1308    data: &mut &[u8],
1309    length: usize,
1310) -> [u8; N] {
1311    read_byte_array_sign_extending::<N, 0>(data, length)
1312}
1313
1314pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1315    let (prev, next) = data.split_first_chunk().unwrap();
1316    *data = next;
1317    *prev
1318}
1319
1320pub(super) fn read_date(data: &mut &[u8]) -> Date {
1321    let days = i32::from_le_bytes(read_byte_array(data));
1322    Date::from_pg_epoch(days).expect("unexpected date")
1323}
1324
1325pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1326    let year = i32::from_le_bytes(read_byte_array(data));
1327    let ordinal = u32::from_le_bytes(read_byte_array(data));
1328    NaiveDate::from_yo_opt(year, ordinal).unwrap()
1329}
1330
1331pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1332    let secs = u32::from_le_bytes(read_byte_array(data));
1333    let nanos = u32::from_le_bytes(read_byte_array(data));
1334    NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1335}
1336
1337/// Read a datum starting at byte `offset`.
1338///
1339/// Updates `offset` to point to the first byte after the end of the read region.
1340///
1341/// # Safety
1342///
1343/// This function is safe if a `Datum` was previously written at this offset by `push_datum`.
1344/// Otherwise it could return invalid values, which is Undefined Behavior.
1345pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1346    let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1347    match tag {
1348        Tag::Null => Datum::Null,
1349        Tag::False => Datum::False,
1350        Tag::True => Datum::True,
1351        Tag::UInt8_0 | Tag::UInt8_8 => {
1352            let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1353                data,
1354                tag.actual_int_length()
1355                    .expect("returns a value for variable-length-encoded integer tags"),
1356            ));
1357            Datum::UInt8(i)
1358        }
1359        Tag::Int16 => {
1360            let i = i16::from_le_bytes(read_byte_array(data));
1361            Datum::Int16(i)
1362        }
1363        Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1364            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1365            // and `data` is big enough because it was encoded validly. These assumptions
1366            // are checked in debug asserts.
1367            let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1368                data,
1369                tag.actual_int_length()
1370                    .expect("returns a value for variable-length-encoded integer tags"),
1371            ));
1372            Datum::Int16(i)
1373        }
1374        Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1375            let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1376                data,
1377                tag.actual_int_length()
1378                    .expect("returns a value for variable-length-encoded integer tags"),
1379            ));
1380            Datum::UInt16(i)
1381        }
1382        Tag::Int32 => {
1383            let i = i32::from_le_bytes(read_byte_array(data));
1384            Datum::Int32(i)
1385        }
1386        Tag::NonNegativeInt32_0
1387        | Tag::NonNegativeInt32_32
1388        | Tag::NonNegativeInt32_8
1389        | Tag::NonNegativeInt32_16
1390        | Tag::NonNegativeInt32_24 => {
1391            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1392            // and `data` is big enough because it was encoded validly. These assumptions
1393            // are checked in debug asserts.
1394            let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1395                data,
1396                tag.actual_int_length()
1397                    .expect("returns a value for variable-length-encoded integer tags"),
1398            ));
1399            Datum::Int32(i)
1400        }
1401        Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1402            let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1403                data,
1404                tag.actual_int_length()
1405                    .expect("returns a value for variable-length-encoded integer tags"),
1406            ));
1407            Datum::UInt32(i)
1408        }
1409        Tag::Int64 => {
1410            let i = i64::from_le_bytes(read_byte_array(data));
1411            Datum::Int64(i)
1412        }
1413        Tag::NonNegativeInt64_0
1414        | Tag::NonNegativeInt64_64
1415        | Tag::NonNegativeInt64_8
1416        | Tag::NonNegativeInt64_16
1417        | Tag::NonNegativeInt64_24
1418        | Tag::NonNegativeInt64_32
1419        | Tag::NonNegativeInt64_40
1420        | Tag::NonNegativeInt64_48
1421        | Tag::NonNegativeInt64_56 => {
1422            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1423            // and `data` is big enough because it was encoded validly. These assumptions
1424            // are checked in debug asserts.
1425
1426            let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1427                data,
1428                tag.actual_int_length()
1429                    .expect("returns a value for variable-length-encoded integer tags"),
1430            ));
1431            Datum::Int64(i)
1432        }
1433        Tag::UInt64_0
1434        | Tag::UInt64_8
1435        | Tag::UInt64_16
1436        | Tag::UInt64_24
1437        | Tag::UInt64_32
1438        | Tag::UInt64_40
1439        | Tag::UInt64_48
1440        | Tag::UInt64_56
1441        | Tag::UInt64_64 => {
1442            let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1443                data,
1444                tag.actual_int_length()
1445                    .expect("returns a value for variable-length-encoded integer tags"),
1446            ));
1447            Datum::UInt64(i)
1448        }
1449        Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1450            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1451            // and `data` is big enough because it was encoded validly. These assumptions
1452            // are checked in debug asserts.
1453            let i = i16::from_le_bytes(read_byte_array_extending_negative(
1454                data,
1455                tag.actual_int_length()
1456                    .expect("returns a value for variable-length-encoded integer tags"),
1457            ));
1458            Datum::Int16(i)
1459        }
1460        Tag::NegativeInt32_0
1461        | Tag::NegativeInt32_32
1462        | Tag::NegativeInt32_8
1463        | Tag::NegativeInt32_16
1464        | Tag::NegativeInt32_24 => {
1465            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1466            // and `data` is big enough because it was encoded validly. These assumptions
1467            // are checked in debug asserts.
1468            let i = i32::from_le_bytes(read_byte_array_extending_negative(
1469                data,
1470                tag.actual_int_length()
1471                    .expect("returns a value for variable-length-encoded integer tags"),
1472            ));
1473            Datum::Int32(i)
1474        }
1475        Tag::NegativeInt64_0
1476        | Tag::NegativeInt64_64
1477        | Tag::NegativeInt64_8
1478        | Tag::NegativeInt64_16
1479        | Tag::NegativeInt64_24
1480        | Tag::NegativeInt64_32
1481        | Tag::NegativeInt64_40
1482        | Tag::NegativeInt64_48
1483        | Tag::NegativeInt64_56 => {
1484            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1485            // and `data` is big enough because the row was encoded validly. These assumptions
1486            // are checked in debug asserts.
1487            let i = i64::from_le_bytes(read_byte_array_extending_negative(
1488                data,
1489                tag.actual_int_length()
1490                    .expect("returns a value for variable-length-encoded integer tags"),
1491            ));
1492            Datum::Int64(i)
1493        }
1494
1495        Tag::UInt8 => {
1496            let i = u8::from_le_bytes(read_byte_array(data));
1497            Datum::UInt8(i)
1498        }
1499        Tag::UInt16 => {
1500            let i = u16::from_le_bytes(read_byte_array(data));
1501            Datum::UInt16(i)
1502        }
1503        Tag::UInt32 => {
1504            let i = u32::from_le_bytes(read_byte_array(data));
1505            Datum::UInt32(i)
1506        }
1507        Tag::UInt64 => {
1508            let i = u64::from_le_bytes(read_byte_array(data));
1509            Datum::UInt64(i)
1510        }
1511        Tag::Float32 => {
1512            let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1513            Datum::Float32(OrderedFloat::from(f))
1514        }
1515        Tag::Float64 => {
1516            let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1517            Datum::Float64(OrderedFloat::from(f))
1518        }
1519        Tag::Date => Datum::Date(read_date(data)),
1520        Tag::Time => Datum::Time(read_time(data)),
1521        Tag::CheapTimestamp => {
1522            let ts = i64::from_le_bytes(read_byte_array(data));
1523            let secs = ts.div_euclid(1_000_000_000);
1524            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1525            let ndt = DateTime::from_timestamp(secs, nsecs)
1526                .expect("We only write round-trippable timestamps")
1527                .naive_utc();
1528            Datum::Timestamp(
1529                CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1530            )
1531        }
1532        Tag::CheapTimestampTz => {
1533            let ts = i64::from_le_bytes(read_byte_array(data));
1534            let secs = ts.div_euclid(1_000_000_000);
1535            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1536            let dt = DateTime::from_timestamp(secs, nsecs)
1537                .expect("We only write round-trippable timestamps");
1538            Datum::TimestampTz(
1539                CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1540            )
1541        }
1542        Tag::Timestamp => {
1543            let date = read_naive_date(data);
1544            let time = read_time(data);
1545            Datum::Timestamp(
1546                CheckedTimestamp::from_timestamplike(date.and_time(time))
1547                    .expect("unexpected timestamp"),
1548            )
1549        }
1550        Tag::TimestampTz => {
1551            let date = read_naive_date(data);
1552            let time = read_time(data);
1553            Datum::TimestampTz(
1554                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1555                    date.and_time(time),
1556                    Utc,
1557                ))
1558                .expect("unexpected timestamptz"),
1559            )
1560        }
1561        Tag::Interval => {
1562            let months = i32::from_le_bytes(read_byte_array(data));
1563            let days = i32::from_le_bytes(read_byte_array(data));
1564            let micros = i64::from_le_bytes(read_byte_array(data));
1565            Datum::Interval(Interval {
1566                months,
1567                days,
1568                micros,
1569            })
1570        }
1571        Tag::BytesTiny
1572        | Tag::BytesShort
1573        | Tag::BytesLong
1574        | Tag::BytesHuge
1575        | Tag::StringTiny
1576        | Tag::StringShort
1577        | Tag::StringLong
1578        | Tag::StringHuge
1579        | Tag::ListTiny
1580        | Tag::ListShort
1581        | Tag::ListLong
1582        | Tag::ListHuge => read_lengthed_datum(data, tag),
1583        Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1584        Tag::Array => {
1585            // See the comment in `Row::push_array` for details on the encoding
1586            // of arrays.
1587            let ndims = read_byte(data);
1588            let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1589            let (dims, next) = data.split_at(dims_size);
1590            *data = next;
1591            let bytes = read_untagged_bytes(data);
1592            Datum::Array(Array {
1593                dims: ArrayDimensions { data: dims },
1594                elements: DatumList::new(bytes),
1595            })
1596        }
1597        Tag::Dict => {
1598            let bytes = read_untagged_bytes(data);
1599            Datum::Map(DatumMap::new(bytes))
1600        }
1601        Tag::JsonNull => Datum::JsonNull,
1602        Tag::Dummy => Datum::Dummy,
1603        Tag::Numeric => {
1604            let digits = read_byte(data).into();
1605            let exponent = i8::reinterpret_cast(read_byte(data));
1606            let bits = read_byte(data);
1607
1608            let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1609            let lsu_u8_len = lsu_u16_len * 2;
1610            let (lsu_u8, next) = data.split_at(lsu_u8_len);
1611            *data = next;
1612
1613            // TODO: if we refactor the decimal library to accept the owned
1614            // array as a parameter to `from_raw_parts` below, we could likely
1615            // avoid a copy because it is exactly the value we want
1616            let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1617            for (i, c) in lsu_u8.chunks(2).enumerate() {
1618                lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1619            }
1620
1621            let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1622            Datum::from(d)
1623        }
1624        Tag::MzTimestamp => {
1625            let t = Timestamp::decode(read_byte_array(data));
1626            Datum::MzTimestamp(t)
1627        }
1628        Tag::Range => {
1629            // See notes on `push_range_with` for details about encoding.
1630            let flag_byte = read_byte(data);
1631            let flags = range::InternalFlags::from_bits(flag_byte)
1632                .expect("range flags must be encoded validly");
1633
1634            if flags.contains(range::InternalFlags::EMPTY) {
1635                assert!(
1636                    flags == range::InternalFlags::EMPTY,
1637                    "empty ranges contain only RANGE_EMPTY flag"
1638                );
1639
1640                return Datum::Range(Range { inner: None });
1641            }
1642
1643            let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1644                None
1645            } else {
1646                Some(DatumNested::extract(data))
1647            };
1648
1649            let lower = RangeBound {
1650                inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1651                bound: lower_bound,
1652            };
1653
1654            let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1655                None
1656            } else {
1657                Some(DatumNested::extract(data))
1658            };
1659
1660            let upper = RangeBound {
1661                inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1662                bound: upper_bound,
1663            };
1664
1665            Datum::Range(Range {
1666                inner: Some(RangeInner { lower, upper }),
1667            })
1668        }
1669        Tag::MzAclItem => {
1670            const N: usize = MzAclItem::binary_size();
1671            let mz_acl_item =
1672                MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1673            Datum::MzAclItem(mz_acl_item)
1674        }
1675        Tag::AclItem => {
1676            const N: usize = AclItem::binary_size();
1677            let acl_item =
1678                AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1679            Datum::AclItem(acl_item)
1680        }
1681    }
1682}
1683
1684// --------------------------------------------------------------------------------
1685// writing data
1686
1687fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1688where
1689    D: Vector<u8>,
1690{
1691    let len = u64::cast_from(bytes.len());
1692    data.extend_from_slice(&len.to_le_bytes());
1693    data.extend_from_slice(bytes);
1694}
1695
1696fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1697where
1698    D: Vector<u8>,
1699{
1700    match tag {
1701        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1702            let len = bytes.len().to_le_bytes();
1703            data.push(len[0]);
1704        }
1705        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1706            let len = bytes.len().to_le_bytes();
1707            data.extend_from_slice(&len[0..2]);
1708        }
1709        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1710            let len = bytes.len().to_le_bytes();
1711            data.extend_from_slice(&len[0..4]);
1712        }
1713        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1714            let len = bytes.len().to_le_bytes();
1715            data.extend_from_slice(&len);
1716        }
1717        _ => unreachable!(),
1718    }
1719    data.extend_from_slice(bytes);
1720}
1721
1722pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1723    i32::to_le_bytes(date.pg_epoch_days())
1724}
1725
1726fn push_date<D>(data: &mut D, date: Date)
1727where
1728    D: Vector<u8>,
1729{
1730    data.extend_from_slice(&date_to_array(date));
1731}
1732
1733pub(super) fn naive_date_to_arrays(
1734    date: NaiveDate,
1735) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1736    (
1737        i32::to_le_bytes(date.year()),
1738        u32::to_le_bytes(date.ordinal()),
1739    )
1740}
1741
1742fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1743where
1744    D: Vector<u8>,
1745{
1746    let (ds1, ds2) = naive_date_to_arrays(date);
1747    data.extend_from_slice(&ds1);
1748    data.extend_from_slice(&ds2);
1749}
1750
1751pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1752    (
1753        u32::to_le_bytes(time.num_seconds_from_midnight()),
1754        u32::to_le_bytes(time.nanosecond()),
1755    )
1756}
1757
1758fn push_time<D>(data: &mut D, time: NaiveTime)
1759where
1760    D: Vector<u8>,
1761{
1762    let (ts1, ts2) = time_to_arrays(time);
1763    data.extend_from_slice(&ts1);
1764    data.extend_from_slice(&ts2);
1765}
1766
1767/// Returns an i64 representing a `NaiveDateTime`, if
1768/// said i64 can be round-tripped back to a `NaiveDateTime`.
1769///
1770/// The only exotic NDTs for which this can't happen are those that
1771/// are hundreds of years in the future or past, or those that
1772/// represent a leap second. (Note that Materialize does not support
1773/// leap seconds, but this module does).
1774// This function is inspired by `NaiveDateTime::timestamp_nanos`,
1775// with extra checking.
1776fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1777    let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1778    if subsec_nanos >= 1_000_000_000 {
1779        return None;
1780    }
1781    let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1782    as_ns.checked_add(i64::from(subsec_nanos))
1783}
1784
1785// This function is extremely hot, so
1786// we just use `as` to avoid the overhead of
1787// `try_into` followed by `unwrap`.
1788// `leading_ones` and `leading_zeros`
1789// can never return values greater than 64, so the conversion is safe.
1790#[inline(always)]
1791#[allow(clippy::as_conversions)]
1792fn min_bytes_signed<T>(i: T) -> u8
1793where
1794    T: Into<i64>,
1795{
1796    let i: i64 = i.into();
1797
1798    // To fit in n bytes, we require that
1799    // everything but the leading sign bits fits in n*8
1800    // bits.
1801    let n_sign_bits = if i.is_negative() {
1802        i.leading_ones() as u8
1803    } else {
1804        i.leading_zeros() as u8
1805    };
1806
1807    (64 - n_sign_bits + 7) / 8
1808}
1809
1810// In principle we could just use `min_bytes_signed`, rather than
1811// having a separate function here, as long as we made that one take
1812// `T: Into<i128>` instead of 64. But LLVM doesn't seem smart enough
1813// to realize that that function is the same as the current version,
1814// and generates worse code.
1815//
1816// Justification for `as` is the same as in `min_bytes_signed`.
1817#[inline(always)]
1818#[allow(clippy::as_conversions)]
1819fn min_bytes_unsigned<T>(i: T) -> u8
1820where
1821    T: Into<u64>,
1822{
1823    let i: u64 = i.into();
1824
1825    let n_sign_bits = i.leading_zeros() as u8;
1826
1827    (64 - n_sign_bits + 7) / 8
1828}
1829
1830const TINY: usize = 1 << 8;
1831const SHORT: usize = 1 << 16;
1832const LONG: usize = 1 << 32;
1833
1834fn push_datum<D>(data: &mut D, datum: Datum)
1835where
1836    D: Vector<u8>,
1837{
1838    match datum {
1839        Datum::Null => data.push(Tag::Null.into()),
1840        Datum::False => data.push(Tag::False.into()),
1841        Datum::True => data.push(Tag::True.into()),
1842        Datum::Int16(i) => {
1843            let mbs = min_bytes_signed(i);
1844            let tag = u8::from(if i.is_negative() {
1845                Tag::NegativeInt16_0
1846            } else {
1847                Tag::NonNegativeInt16_0
1848            }) + mbs;
1849
1850            data.push(tag);
1851            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1852        }
1853        Datum::Int32(i) => {
1854            let mbs = min_bytes_signed(i);
1855            let tag = u8::from(if i.is_negative() {
1856                Tag::NegativeInt32_0
1857            } else {
1858                Tag::NonNegativeInt32_0
1859            }) + mbs;
1860
1861            data.push(tag);
1862            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1863        }
1864        Datum::Int64(i) => {
1865            let mbs = min_bytes_signed(i);
1866            let tag = u8::from(if i.is_negative() {
1867                Tag::NegativeInt64_0
1868            } else {
1869                Tag::NonNegativeInt64_0
1870            }) + mbs;
1871
1872            data.push(tag);
1873            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1874        }
1875        Datum::UInt8(i) => {
1876            let mbu = min_bytes_unsigned(i);
1877            let tag = u8::from(Tag::UInt8_0) + mbu;
1878            data.push(tag);
1879            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1880        }
1881        Datum::UInt16(i) => {
1882            let mbu = min_bytes_unsigned(i);
1883            let tag = u8::from(Tag::UInt16_0) + mbu;
1884            data.push(tag);
1885            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1886        }
1887        Datum::UInt32(i) => {
1888            let mbu = min_bytes_unsigned(i);
1889            let tag = u8::from(Tag::UInt32_0) + mbu;
1890            data.push(tag);
1891            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1892        }
1893        Datum::UInt64(i) => {
1894            let mbu = min_bytes_unsigned(i);
1895            let tag = u8::from(Tag::UInt64_0) + mbu;
1896            data.push(tag);
1897            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1898        }
1899        Datum::Float32(f) => {
1900            data.push(Tag::Float32.into());
1901            data.extend_from_slice(&f.to_bits().to_le_bytes());
1902        }
1903        Datum::Float64(f) => {
1904            data.push(Tag::Float64.into());
1905            data.extend_from_slice(&f.to_bits().to_le_bytes());
1906        }
1907        Datum::Date(d) => {
1908            data.push(Tag::Date.into());
1909            push_date(data, d);
1910        }
1911        Datum::Time(t) => {
1912            data.push(Tag::Time.into());
1913            push_time(data, t);
1914        }
1915        Datum::Timestamp(t) => {
1916            let datetime = t.to_naive();
1917            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1918                data.push(Tag::CheapTimestamp.into());
1919                data.extend_from_slice(&nanos.to_le_bytes());
1920            } else {
1921                data.push(Tag::Timestamp.into());
1922                push_naive_date(data, datetime.date());
1923                push_time(data, datetime.time());
1924            }
1925        }
1926        Datum::TimestampTz(t) => {
1927            let datetime = t.to_naive();
1928            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1929                data.push(Tag::CheapTimestampTz.into());
1930                data.extend_from_slice(&nanos.to_le_bytes());
1931            } else {
1932                data.push(Tag::TimestampTz.into());
1933                push_naive_date(data, datetime.date());
1934                push_time(data, datetime.time());
1935            }
1936        }
1937        Datum::Interval(i) => {
1938            data.push(Tag::Interval.into());
1939            data.extend_from_slice(&i.months.to_le_bytes());
1940            data.extend_from_slice(&i.days.to_le_bytes());
1941            data.extend_from_slice(&i.micros.to_le_bytes());
1942        }
1943        Datum::Bytes(bytes) => {
1944            let tag = match bytes.len() {
1945                0..TINY => Tag::BytesTiny,
1946                TINY..SHORT => Tag::BytesShort,
1947                SHORT..LONG => Tag::BytesLong,
1948                _ => Tag::BytesHuge,
1949            };
1950            data.push(tag.into());
1951            push_lengthed_bytes(data, bytes, tag);
1952        }
1953        Datum::String(string) => {
1954            let tag = match string.len() {
1955                0..TINY => Tag::StringTiny,
1956                TINY..SHORT => Tag::StringShort,
1957                SHORT..LONG => Tag::StringLong,
1958                _ => Tag::StringHuge,
1959            };
1960            data.push(tag.into());
1961            push_lengthed_bytes(data, string.as_bytes(), tag);
1962        }
1963        Datum::List(list) => {
1964            let tag = match list.data.len() {
1965                0..TINY => Tag::ListTiny,
1966                TINY..SHORT => Tag::ListShort,
1967                SHORT..LONG => Tag::ListLong,
1968                _ => Tag::ListHuge,
1969            };
1970            data.push(tag.into());
1971            push_lengthed_bytes(data, list.data, tag);
1972        }
1973        Datum::Uuid(u) => {
1974            data.push(Tag::Uuid.into());
1975            data.extend_from_slice(u.as_bytes());
1976        }
1977        Datum::Array(array) => {
1978            // See the comment in `Row::push_array` for details on the encoding
1979            // of arrays.
1980            data.push(Tag::Array.into());
1981            data.push(array.dims.ndims());
1982            data.extend_from_slice(array.dims.data);
1983            push_untagged_bytes(data, array.elements.data);
1984        }
1985        Datum::Map(dict) => {
1986            data.push(Tag::Dict.into());
1987            push_untagged_bytes(data, dict.data);
1988        }
1989        Datum::JsonNull => data.push(Tag::JsonNull.into()),
1990        Datum::MzTimestamp(t) => {
1991            data.push(Tag::MzTimestamp.into());
1992            data.extend_from_slice(&t.encode());
1993        }
1994        Datum::Dummy => data.push(Tag::Dummy.into()),
1995        Datum::Numeric(mut n) => {
1996            // Pseudo-canonical representation of decimal values with
1997            // insignificant zeroes trimmed. This compresses the number further
1998            // than `Numeric::trim` by removing all zeroes, and not only those in
1999            // the fractional component.
2000            numeric::cx_datum().reduce(&mut n.0);
2001            let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
2002            data.push(Tag::Numeric.into());
2003            data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
2004            data.push(
2005                i8::try_from(exponent)
2006                    .expect("exponent to fit within i8; should not exceed +/- 39")
2007                    .to_le_bytes()[0],
2008            );
2009            data.push(bits);
2010
2011            let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
2012
2013            // Little endian machines can take the lsu directly from u16 to u8.
2014            if cfg!(target_endian = "little") {
2015                // SAFETY: `lsu` (returned by `coefficient_units()`) is a `&[u16]`, so
2016                // each element can safely be transmuted into two `u8`s.
2017                let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
2018                // The `u8` aligned version of the `lsu` should have twice as many
2019                // elements as we expect for the `u16` version.
2020                soft_assert_no_log!(
2021                    lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
2022                    "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
2023                    Numeric::digits_to_lsu_elements_len(digits) * 2,
2024                    lsu_bytes.len()
2025                );
2026                // There should be no unaligned elements in the prefix or suffix.
2027                soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
2028                data.extend_from_slice(lsu_bytes);
2029            } else {
2030                for u in lsu {
2031                    data.extend_from_slice(&u.to_le_bytes());
2032                }
2033            }
2034        }
2035        Datum::Range(range) => {
2036            // See notes on `push_range_with` for details about encoding.
2037            data.push(Tag::Range.into());
2038            data.push(range.internal_flag_bits());
2039
2040            if let Some(RangeInner { lower, upper }) = range.inner {
2041                for bound in [lower.bound, upper.bound] {
2042                    if let Some(bound) = bound {
2043                        match bound.datum() {
2044                            Datum::Null => panic!("cannot push Datum::Null into range"),
2045                            d => push_datum::<D>(data, d),
2046                        }
2047                    }
2048                }
2049            }
2050        }
2051        Datum::MzAclItem(mz_acl_item) => {
2052            data.push(Tag::MzAclItem.into());
2053            data.extend_from_slice(&mz_acl_item.encode_binary());
2054        }
2055        Datum::AclItem(acl_item) => {
2056            data.push(Tag::AclItem.into());
2057            data.extend_from_slice(&acl_item.encode_binary());
2058        }
2059    }
2060}
2061
2062/// Return the number of bytes these Datums would use if packed as a Row.
2063pub fn row_size<'a, I>(a: I) -> usize
2064where
2065    I: IntoIterator<Item = Datum<'a>>,
2066{
2067    // Using datums_size instead of a.data().len() here is safer because it will
2068    // return the size of the datums if they were packed into a Row. Although
2069    // a.data().len() happens to give the correct answer (and is faster), data()
2070    // is documented as for debugging only.
2071    let sz = datums_size::<_, _>(a);
2072    let size_of_row = std::mem::size_of::<Row>();
2073    // The Row struct attempts to inline data until it can't fit in the
2074    // preallocated size. Otherwise it spills to heap, and uses the Row to point
2075    // to that.
2076    if sz > Row::SIZE {
2077        sz + size_of_row
2078    } else {
2079        size_of_row
2080    }
2081}
2082
2083/// Number of bytes required by the datum.
2084/// This is used to optimistically pre-allocate buffers for packing rows.
2085pub fn datum_size(datum: &Datum) -> usize {
2086    match datum {
2087        Datum::Null => 1,
2088        Datum::False => 1,
2089        Datum::True => 1,
2090        Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
2091        Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
2092        Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
2093        Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2094        Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2095        Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2096        Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2097        Datum::Float32(_) => 1 + size_of::<f32>(),
2098        Datum::Float64(_) => 1 + size_of::<f64>(),
2099        Datum::Date(_) => 1 + size_of::<i32>(),
2100        Datum::Time(_) => 1 + 8,
2101        Datum::Timestamp(t) => {
2102            1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
2103                8
2104            } else {
2105                16
2106            }
2107        }
2108        Datum::TimestampTz(t) => {
2109            1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
2110                8
2111            } else {
2112                16
2113            }
2114        }
2115        Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
2116        Datum::Bytes(bytes) => {
2117            // We use a variable length representation of slice length.
2118            let bytes_for_length = match bytes.len() {
2119                0..TINY => 1,
2120                TINY..SHORT => 2,
2121                SHORT..LONG => 4,
2122                _ => 8,
2123            };
2124            1 + bytes_for_length + bytes.len()
2125        }
2126        Datum::String(string) => {
2127            // We use a variable length representation of slice length.
2128            let bytes_for_length = match string.len() {
2129                0..TINY => 1,
2130                TINY..SHORT => 2,
2131                SHORT..LONG => 4,
2132                _ => 8,
2133            };
2134            1 + bytes_for_length + string.len()
2135        }
2136        Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
2137        Datum::Array(array) => {
2138            1 + size_of::<u8>()
2139                + array.dims.data.len()
2140                + size_of::<u64>()
2141                + array.elements.data.len()
2142        }
2143        Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
2144        Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
2145        Datum::JsonNull => 1,
2146        Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
2147        Datum::Dummy => 1,
2148        Datum::Numeric(d) => {
2149            let mut d = d.0.clone();
2150            // Values must be reduced to determine appropriate number of
2151            // coefficient units.
2152            numeric::cx_datum().reduce(&mut d);
2153            // 4 = 1 bit each for tag, digits, exponent, bits
2154            4 + (d.coefficient_units().len() * 2)
2155        }
2156        Datum::Range(Range { inner }) => {
2157            // Tag + flags
2158            2 + match inner {
2159                None => 0,
2160                Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
2161                    .iter()
2162                    .map(|bound| match bound {
2163                        None => 0,
2164                        Some(bound) => bound.val.len(),
2165                    })
2166                    .sum(),
2167            }
2168        }
2169        Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
2170        Datum::AclItem(_) => 1 + AclItem::binary_size(),
2171    }
2172}
2173
2174/// Number of bytes required by a sequence of datums.
2175///
2176/// This method can be used to right-size the allocation for a `Row`
2177/// before calling [`RowPacker::extend`].
2178pub fn datums_size<'a, I, D>(iter: I) -> usize
2179where
2180    I: IntoIterator<Item = D>,
2181    D: Borrow<Datum<'a>>,
2182{
2183    iter.into_iter().map(|d| datum_size(d.borrow())).sum()
2184}
2185
2186/// Number of bytes required by a list of datums. This computes the size that would be required if
2187/// the given datums were packed into a list.
2188///
2189/// This is used to optimistically pre-allocate buffers for packing rows.
2190pub fn datum_list_size<'a, I, D>(iter: I) -> usize
2191where
2192    I: IntoIterator<Item = D>,
2193    D: Borrow<Datum<'a>>,
2194{
2195    1 + size_of::<u64>() + datums_size(iter)
2196}
2197
2198impl RowPacker<'_> {
2199    /// Constructs a row packer that will pack additional datums into the
2200    /// provided row.
2201    ///
2202    /// This function is intentionally somewhat inconvenient to call. You
2203    /// usually want to call [`Row::packer`] instead to start packing from
2204    /// scratch.
2205    pub fn for_existing_row(row: &mut Row) -> RowPacker<'_> {
2206        RowPacker { row }
2207    }
2208
2209    /// Extend an existing `Row` with a `Datum`.
2210    #[inline]
2211    pub fn push<'a, D>(&mut self, datum: D)
2212    where
2213        D: Borrow<Datum<'a>>,
2214    {
2215        push_datum(&mut self.row.data, *datum.borrow());
2216    }
2217
2218    /// Extend an existing `Row` with additional `Datum`s.
2219    #[inline]
2220    pub fn extend<'a, I, D>(&mut self, iter: I)
2221    where
2222        I: IntoIterator<Item = D>,
2223        D: Borrow<Datum<'a>>,
2224    {
2225        for datum in iter {
2226            push_datum(&mut self.row.data, *datum.borrow())
2227        }
2228    }
2229
2230    /// Extend an existing `Row` with additional `Datum`s.
2231    ///
2232    /// In the case the iterator produces an error, the pushing of
2233    /// datums in terminated and the error returned. The `Row` will
2234    /// be incomplete, but it will be safe to read datums from it.
2235    #[inline]
2236    pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
2237    where
2238        I: IntoIterator<Item = Result<D, E>>,
2239        D: Borrow<Datum<'a>>,
2240    {
2241        for datum in iter {
2242            push_datum(&mut self.row.data, *datum?.borrow());
2243        }
2244        Ok(())
2245    }
2246
2247    /// Appends the datums of an entire `Row`.
2248    pub fn extend_by_row(&mut self, row: &Row) {
2249        self.row.data.extend_from_slice(row.data.as_slice());
2250    }
2251
2252    /// Appends the datums of an entire `Row`.
2253    pub fn extend_by_row_ref(&mut self, row: &RowRef) {
2254        self.row.data.extend_from_slice(row.data());
2255    }
2256
2257    /// Appends the slice of data representing an entire `Row`. The data is not validated.
2258    ///
2259    /// # Safety
2260    ///
2261    /// The requirements from [`Row::from_bytes_unchecked`] apply here, too:
2262    /// This method relies on `data` being an appropriate row encoding, and can
2263    /// result in unsafety if this is not the case.
2264    #[inline]
2265    pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
2266        self.row.data.extend_from_slice(data)
2267    }
2268
2269    /// Pushes a [`DatumList`] that is built from a closure.
2270    ///
2271    /// The supplied closure will be invoked once with a `Row` that can be used
2272    /// to populate the list. It is valid to call any method on the
2273    /// [`RowPacker`] except for [`RowPacker::clear`], [`RowPacker::truncate`],
2274    /// or [`RowPacker::truncate_datums`].
2275    ///
2276    /// Returns the value returned by the closure, if any.
2277    ///
2278    /// ```
2279    /// # use mz_repr::{Row, Datum};
2280    /// let mut row = Row::default();
2281    /// row.packer().push_list_with(|row| {
2282    ///     row.push(Datum::String("age"));
2283    ///     row.push(Datum::Int64(42));
2284    /// });
2285    /// assert_eq!(
2286    ///     row.unpack_first().unwrap_list().iter().collect::<Vec<_>>(),
2287    ///     vec![Datum::String("age"), Datum::Int64(42)],
2288    /// );
2289    /// ```
2290    #[inline]
2291    pub fn push_list_with<F, R>(&mut self, f: F) -> R
2292    where
2293        F: FnOnce(&mut RowPacker) -> R,
2294    {
2295        // First, assume that the list will fit in 255 bytes, and thus the length will fit in
2296        // 1 byte. If not, we'll fix it up later.
2297        let start = self.row.data.len();
2298        self.row.data.push(Tag::ListTiny.into());
2299        // Write a dummy len, will fix it up later.
2300        self.row.data.push(0);
2301
2302        let out = f(self);
2303
2304        // The `- 1 - 1` is for the tag and the len.
2305        let len = self.row.data.len() - start - 1 - 1;
2306        // We now know the real len.
2307        if len < TINY {
2308            // If the len fits in 1 byte, we just need to fix up the len.
2309            self.row.data[start + 1] = len.to_le_bytes()[0];
2310        } else {
2311            // Note: We move this code path into its own function, so that the common case can be
2312            // inlined.
2313            long_list(&mut self.row.data, start, len);
2314        }
2315
2316        /// 1. Fix up the tag.
2317        /// 2. Move the actual data a bit (for which we also need to make room at the end).
2318        /// 3. Fix up the len.
2319        /// `data`: The row's backing data.
2320        /// `start`: where `push_list_with` started writing in `data`.
2321        /// `len`: the length of the data, excluding the tag and the length.
2322        #[cold]
2323        fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2324            // `len_len`: the length of the length. (Possible values are: 2, 4, 8. 1 is handled
2325            // elsewhere.) The other parameters are the same as for `long_list`.
2326            let long_list_inner = |data: &mut CompactBytes, len_len| {
2327                // We'll need memory for the new, bigger length, so make the `CompactBytes` bigger.
2328                // The `- 1` is because the old length was 1 byte.
2329                const ZEROS: [u8; 8] = [0; 8];
2330                data.extend_from_slice(&ZEROS[0..len_len - 1]);
2331                // Move the data to the end of the `CompactBytes`, to make space for the new length.
2332                // Originally, it started after the 1-byte tag and the 1-byte length, now it will
2333                // start after the 1-byte tag and the len_len-byte length.
2334                //
2335                // Note that this is the only operation in `long_list` whose cost is proportional
2336                // to `len`. Since `len` is at least 256 here, the other operations' cost are
2337                // negligible. `copy_within` is a memmove, which is probably a fair bit faster per
2338                // Datum than a Datum encoding in the `f` closure.
2339                data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2340                // Write the new length.
2341                data[start + 1..start + 1 + len_len]
2342                    .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2343            };
2344            match len {
2345                0..TINY => {
2346                    unreachable!()
2347                }
2348                TINY..SHORT => {
2349                    data[start] = Tag::ListShort.into();
2350                    long_list_inner(data, 2);
2351                }
2352                SHORT..LONG => {
2353                    data[start] = Tag::ListLong.into();
2354                    long_list_inner(data, 4);
2355                }
2356                _ => {
2357                    data[start] = Tag::ListHuge.into();
2358                    long_list_inner(data, 8);
2359                }
2360            };
2361        }
2362
2363        out
2364    }
2365
2366    /// Pushes a [`DatumMap`] that is built from a closure.
2367    ///
2368    /// The supplied closure will be invoked once with a `Row` that can be used
2369    /// to populate the dict.
2370    ///
2371    /// The closure **must** alternate pushing string keys and arbitrary values,
2372    /// otherwise reading the dict will cause a panic.
2373    ///
2374    /// The closure **must** push keys in ascending order, otherwise equality
2375    /// checks on the resulting `Row` may be wrong and reading the dict IN DEBUG
2376    /// MODE will cause a panic.
2377    ///
2378    /// The closure **must not** call [`RowPacker::clear`],
2379    /// [`RowPacker::truncate`], or [`RowPacker::truncate_datums`].
2380    ///
2381    /// # Example
2382    ///
2383    /// ```
2384    /// # use mz_repr::{Row, Datum};
2385    /// let mut row = Row::default();
2386    /// row.packer().push_dict_with(|row| {
2387    ///
2388    ///     // key
2389    ///     row.push(Datum::String("age"));
2390    ///     // value
2391    ///     row.push(Datum::Int64(42));
2392    ///
2393    ///     // key
2394    ///     row.push(Datum::String("name"));
2395    ///     // value
2396    ///     row.push(Datum::String("bob"));
2397    /// });
2398    /// assert_eq!(
2399    ///     row.unpack_first().unwrap_map().iter().collect::<Vec<_>>(),
2400    ///     vec![("age", Datum::Int64(42)), ("name", Datum::String("bob"))]
2401    /// );
2402    /// ```
2403    pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2404    where
2405        F: FnOnce(&mut RowPacker) -> R,
2406    {
2407        self.row.data.push(Tag::Dict.into());
2408        let start = self.row.data.len();
2409        // write a dummy len, will fix it up later
2410        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2411
2412        let res = f(self);
2413
2414        let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2415        // fix up the len
2416        self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2417
2418        res
2419    }
2420
2421    /// Like [`RowPacker::push_dict_with`], but accepts a fallible closure.
2422    pub fn try_push_dict_with<F, E>(&mut self, f: F) -> Result<(), E>
2423    where
2424        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2425    {
2426        self.push_dict_with(f)
2427    }
2428
2429    /// Convenience function to construct an array from an iter of `Datum`s.
2430    ///
2431    /// Returns an error if the number of elements in `iter` does not match
2432    /// the cardinality of the array as described by `dims`, or if the
2433    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2434    /// occurs, the packer's state will be unchanged.
2435    pub fn try_push_array<'a, I, D>(
2436        &mut self,
2437        dims: &[ArrayDimension],
2438        iter: I,
2439    ) -> Result<(), InvalidArrayError>
2440    where
2441        I: IntoIterator<Item = D>,
2442        D: Borrow<Datum<'a>>,
2443    {
2444        // SAFETY: The function returns the exact number of elements pushed into the array.
2445        unsafe {
2446            self.push_array_with_unchecked(dims, |packer| {
2447                let mut nelements = 0;
2448                for datum in iter {
2449                    packer.push(datum);
2450                    nelements += 1;
2451                }
2452                Ok::<_, InvalidArrayError>(nelements)
2453            })
2454        }
2455    }
2456
2457    /// Like [`RowPacker::try_push_array`], but accepts a fallible iterator of
2458    /// elements.
2459    pub fn try_push_array_fallible<'a, I, D, E>(
2460        &mut self,
2461        dims: &[ArrayDimension],
2462        iter: I,
2463    ) -> Result<Result<(), E>, InvalidArrayError>
2464    where
2465        I: IntoIterator<Item = Result<D, E>>,
2466        D: Borrow<Datum<'a>>,
2467    {
2468        enum Error<E> {
2469            Usage(InvalidArrayError),
2470            Inner(E),
2471        }
2472
2473        impl<E> From<InvalidArrayError> for Error<E> {
2474            fn from(e: InvalidArrayError) -> Self {
2475                Self::Usage(e)
2476            }
2477        }
2478
2479        // SAFETY: The function returns the exact number of elements pushed into the array.
2480        let result = unsafe {
2481            self.push_array_with_unchecked(dims, |packer| {
2482                let mut nelements = 0;
2483                for datum in iter {
2484                    packer.push(datum.map_err(Error::Inner)?);
2485                    nelements += 1;
2486                }
2487                Ok(nelements)
2488            })
2489        };
2490        match result {
2491            Ok(()) => Ok(Ok(())),
2492            Err(Error::Usage(e)) => Err(e),
2493            Err(Error::Inner(e)) => Ok(Err(e)),
2494        }
2495    }
2496
2497    /// Convenience function to construct an array from a function. The function must return the
2498    /// number of elements it pushed into the array. It is undefined behavior if the function returns
2499    /// a number different to the number of elements it pushed.
2500    ///
2501    /// Returns an error if the number of elements pushed by `f` does not match
2502    /// the cardinality of the array as described by `dims`, or if the
2503    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`], or if `f` errors. If an error
2504    /// occurs, the packer's state will be unchanged.
2505    pub unsafe fn push_array_with_unchecked<F, E>(
2506        &mut self,
2507        dims: &[ArrayDimension],
2508        f: F,
2509    ) -> Result<(), E>
2510    where
2511        F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2512        E: From<InvalidArrayError>,
2513    {
2514        // Arrays are encoded as follows.
2515        //
2516        // u8    ndims
2517        // u64   dim_0 lower bound
2518        // u64   dim_0 length
2519        // ...
2520        // u64   dim_n lower bound
2521        // u64   dim_n length
2522        // u64   element data size in bytes
2523        // u8    element data, where elements are encoded in row-major order
2524
2525        if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2526            return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2527        }
2528
2529        let start = self.row.data.len();
2530        self.row.data.push(Tag::Array.into());
2531
2532        // Write dimension information.
2533        self.row
2534            .data
2535            .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2536        for dim in dims {
2537            self.row
2538                .data
2539                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2540            self.row
2541                .data
2542                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2543        }
2544
2545        // Write elements.
2546        let off = self.row.data.len();
2547        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2548        let nelements = match f(self) {
2549            Ok(nelements) => nelements,
2550            Err(e) => {
2551                self.row.data.truncate(start);
2552                return Err(e);
2553            }
2554        };
2555        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2556        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2557
2558        // Check that the number of elements written matches the dimension
2559        // information.
2560        let cardinality = match dims {
2561            [] => 0,
2562            // Saturate the product: a cardinality that overflows `usize` is
2563            // impossibly large (no array can hold that many elements), so it can
2564            // never equal the actual `nelements` and the check below rejects it as
2565            // `WrongCardinality`. A plain `product()` would panic under overflow
2566            // checks (debug/fuzz) and silently wrap in release — and a wrapped
2567            // value could even spuriously match `nelements`, accepting a corrupt
2568            // array (e.g. dims claiming `[2^32, 2^32]` wrap to 0 elements).
2569            dims => dims
2570                .iter()
2571                .map(|d| d.length)
2572                .fold(1usize, usize::saturating_mul),
2573        };
2574        if nelements != cardinality {
2575            self.row.data.truncate(start);
2576            return Err(InvalidArrayError::WrongCardinality {
2577                actual: nelements,
2578                expected: cardinality,
2579            }
2580            .into());
2581        }
2582
2583        Ok(())
2584    }
2585
2586    /// Pushes an [`Array`] that is built from a closure.
2587    ///
2588    /// __WARNING__: This is fairly "sharp" tool that is easy to get wrong. You
2589    /// should prefer [`RowPacker::try_push_array`] when possible.
2590    ///
2591    /// Returns an error if the number of elements pushed does not match
2592    /// the cardinality of the array as described by `dims`, or if the
2593    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2594    /// occurs, the packer's state will be unchanged.
2595    pub fn push_array_with_row_major<F, I>(
2596        &mut self,
2597        dims: I,
2598        f: F,
2599    ) -> Result<(), InvalidArrayError>
2600    where
2601        I: IntoIterator<Item = ArrayDimension>,
2602        F: FnOnce(&mut RowPacker) -> usize,
2603    {
2604        let start = self.row.data.len();
2605        self.row.data.push(Tag::Array.into());
2606
2607        // Write dummy dimension length for now, we'll fix it up.
2608        let dims_start = self.row.data.len();
2609        self.row.data.push(42);
2610
2611        let mut num_dims: u8 = 0;
2612        let mut cardinality: usize = 1;
2613        for dim in dims {
2614            num_dims += 1;
2615            // Saturate: an overflowing cardinality is impossibly large and is
2616            // rejected by the `nelements` check below. See the matching note in
2617            // `push_array_with_unchecked`.
2618            cardinality = cardinality.saturating_mul(dim.length);
2619
2620            self.row
2621                .data
2622                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2623            self.row
2624                .data
2625                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2626        }
2627
2628        if num_dims > MAX_ARRAY_DIMENSIONS {
2629            // Reset the packer state so we don't have invalid data.
2630            self.row.data.truncate(start);
2631            return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2632        }
2633        // Fix up our dimension length.
2634        self.row.data[dims_start..dims_start + size_of::<u8>()]
2635            .copy_from_slice(&num_dims.to_le_bytes());
2636
2637        // Write elements.
2638        let off = self.row.data.len();
2639        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2640
2641        let nelements = f(self);
2642
2643        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2644        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2645
2646        // Check that the number of elements written matches the dimension
2647        // information.
2648        let cardinality = match num_dims {
2649            0 => 0,
2650            _ => cardinality,
2651        };
2652        if nelements != cardinality {
2653            self.row.data.truncate(start);
2654            return Err(InvalidArrayError::WrongCardinality {
2655                actual: nelements,
2656                expected: cardinality,
2657            });
2658        }
2659
2660        Ok(())
2661    }
2662
2663    /// Convenience function to push a `DatumList` from an iter of `Datum`s
2664    ///
2665    /// See [`RowPacker::push_dict_with`] if you need to be able to handle errors
2666    pub fn push_list<'a, I, D>(&mut self, iter: I)
2667    where
2668        I: IntoIterator<Item = D>,
2669        D: Borrow<Datum<'a>>,
2670    {
2671        self.push_list_with(|packer| {
2672            for elem in iter {
2673                packer.push(*elem.borrow())
2674            }
2675        });
2676    }
2677
2678    /// Convenience function to push a `DatumMap` from an iter of `(&str, Datum)` pairs
2679    pub fn push_dict<'a, I, D>(&mut self, iter: I)
2680    where
2681        I: IntoIterator<Item = (&'a str, D)>,
2682        D: Borrow<Datum<'a>>,
2683    {
2684        self.push_dict_with(|packer| {
2685            for (k, v) in iter {
2686                packer.push(Datum::String(k));
2687                packer.push(*v.borrow())
2688            }
2689        })
2690    }
2691
2692    /// Pushes a `Datum::Range` derived from the `Range<Datum<'a>`.
2693    ///
2694    /// # Panics
2695    /// - If lower and upper express finite values and they are datums of
2696    ///   different types.
2697    /// - If lower or upper express finite values and are equal to
2698    ///   `Datum::Null`. To handle `Datum::Null` properly, use
2699    ///   [`RangeBound::new`].
2700    ///
2701    /// # Notes
2702    /// - This function canonicalizes the range before pushing it to the row.
2703    /// - Prefer this function over `push_range_with` because of its
2704    ///   canonicaliztion.
2705    /// - Prefer creating [`RangeBound`]s using [`RangeBound::new`], which
2706    ///   handles `Datum::Null` in a SQL-friendly way.
2707    pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2708        range.canonicalize()?;
2709        match range.inner {
2710            None => {
2711                self.row.data.push(Tag::Range.into());
2712                // Untagged bytes only contains the `RANGE_EMPTY` flag value.
2713                self.row.data.push(range::InternalFlags::EMPTY.bits());
2714                Ok(())
2715            }
2716            Some(inner) => self.push_range_with(
2717                RangeLowerBound {
2718                    inclusive: inner.lower.inclusive,
2719                    bound: inner
2720                        .lower
2721                        .bound
2722                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2723                },
2724                RangeUpperBound {
2725                    inclusive: inner.upper.inclusive,
2726                    bound: inner
2727                        .upper
2728                        .bound
2729                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2730                },
2731            ),
2732        }
2733    }
2734
2735    /// Pushes a `DatumRange` built from the specified arguments.
2736    ///
2737    /// # Warning
2738    /// Unlike `push_range`, `push_range_with` _does not_ canonicalize its
2739    /// inputs. Consequentially, this means it's possible to generate ranges
2740    /// that will not reflect the proper ordering and equality.
2741    ///
2742    /// # Panics
2743    /// - If lower or upper expresses a finite value and does not push exactly
2744    ///   one value into the `RowPacker`.
2745    /// - If lower and upper express finite values and they are datums of
2746    ///   different types.
2747    /// - If lower or upper express finite values and push `Datum::Null`.
2748    ///
2749    /// # Notes
2750    /// - Prefer `push_range_with` over this function. This function should be
2751    ///   used only when you are not pushing `Datum`s to the inner row.
2752    /// - Range encoding is `[<flag bytes>,<lower>?,<upper>?]`, where `lower`
2753    ///   and `upper` are optional, contingent on the flag value expressing an
2754    ///   empty range (where neither will be present) or infinite bounds (where
2755    ///   each infinite bound will be absent).
2756    /// - To push an emtpy range, use `push_range` using `Range { inner: None }`.
2757    pub fn push_range_with<L, U, E>(
2758        &mut self,
2759        lower: RangeLowerBound<L>,
2760        upper: RangeUpperBound<U>,
2761    ) -> Result<(), E>
2762    where
2763        L: FnOnce(&mut RowPacker) -> Result<(), E>,
2764        U: FnOnce(&mut RowPacker) -> Result<(), E>,
2765        E: From<InvalidRangeError>,
2766    {
2767        let start = self.row.data.len();
2768        self.row.data.push(Tag::Range.into());
2769
2770        let mut flags = range::InternalFlags::empty();
2771
2772        flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2773        flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2774        flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2775        flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2776
2777        let mut expected_datums = 0;
2778
2779        self.row.data.push(flags.bits());
2780
2781        let datum_check = self.row.data.len();
2782
2783        if let Some(value) = lower.bound {
2784            let start = self.row.data.len();
2785            value(self)?;
2786            assert!(
2787                start < self.row.data.len(),
2788                "finite values must each push exactly one value; expected 1 but got 0"
2789            );
2790            expected_datums += 1;
2791        }
2792
2793        if let Some(value) = upper.bound {
2794            let start = self.row.data.len();
2795            value(self)?;
2796            assert!(
2797                start < self.row.data.len(),
2798                "finite values must each push exactly one value; expected 1 but got 0"
2799            );
2800            expected_datums += 1;
2801        }
2802
2803        // Validate the invariants that 0, 1, or 2 elements were pushed, none are Null,
2804        // and if two are pushed then the second is not less than the first. Panic in
2805        // some cases and error in others.
2806        let mut actual_datums = 0;
2807        let mut seen = None;
2808        let mut dataz = &self.row.data[datum_check..];
2809        while !dataz.is_empty() {
2810            let d = unsafe { read_datum(&mut dataz) };
2811            assert!(d != Datum::Null, "cannot push Datum::Null into range");
2812
2813            match seen {
2814                None => seen = Some(d),
2815                Some(seen) => {
2816                    let seen_kind = DatumKind::from(seen);
2817                    let d_kind = DatumKind::from(d);
2818                    assert!(
2819                        seen_kind == d_kind,
2820                        "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2821                    );
2822
2823                    if seen > d {
2824                        self.row.data.truncate(start);
2825                        return Err(InvalidRangeError::MisorderedRangeBounds.into());
2826                    }
2827                }
2828            }
2829            actual_datums += 1;
2830        }
2831
2832        assert!(
2833            actual_datums == expected_datums,
2834            "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2835        );
2836
2837        Ok(())
2838    }
2839
2840    /// Clears the contents of the packer without de-allocating its backing memory.
2841    pub fn clear(&mut self) {
2842        self.row.data.clear();
2843    }
2844
2845    /// Truncates the underlying storage to the specified byte position.
2846    ///
2847    /// # Safety
2848    ///
2849    /// `pos` MUST specify a byte offset that lies on a datum boundary.
2850    /// If `pos` specifies a byte offset that is *within* a datum, the row
2851    /// packer will produce an invalid row, the unpacking of which may
2852    /// trigger undefined behavior!
2853    ///
2854    /// To find the byte offset of a datum boundary, inspect the packer's
2855    /// byte length by calling `packer.data().len()` after pushing the desired
2856    /// number of datums onto the packer.
2857    pub unsafe fn truncate(&mut self, pos: usize) {
2858        self.row.data.truncate(pos)
2859    }
2860
2861    /// Truncates the underlying row to contain at most the first `n` datums.
2862    pub fn truncate_datums(&mut self, n: usize) {
2863        let prev_len = self.row.data.len();
2864        let mut iter = self.row.iter();
2865        for _ in iter.by_ref().take(n) {}
2866        let next_len = iter.data.len();
2867        // SAFETY: iterator offsets always lie on a datum boundary.
2868        unsafe { self.truncate(prev_len - next_len) }
2869    }
2870
2871    /// Returns the total amount of bytes used by the underlying row.
2872    pub fn byte_len(&self) -> usize {
2873        self.row.byte_len()
2874    }
2875}
2876
2877impl<'a> IntoIterator for &'a Row {
2878    type Item = Datum<'a>;
2879    type IntoIter = DatumListIter<'a>;
2880    fn into_iter(self) -> DatumListIter<'a> {
2881        self.iter()
2882    }
2883}
2884
2885impl fmt::Debug for Row {
2886    /// Debug representation using the internal datums
2887    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2888        f.write_str("Row{")?;
2889        f.debug_list().entries(self.iter()).finish()?;
2890        f.write_str("}")
2891    }
2892}
2893
2894impl fmt::Display for Row {
2895    /// Display representation using the internal datums
2896    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2897        f.write_str("(")?;
2898        for (i, datum) in self.iter().enumerate() {
2899            if i != 0 {
2900                f.write_str(", ")?;
2901            }
2902            write!(f, "{}", datum)?;
2903        }
2904        f.write_str(")")
2905    }
2906}
2907
2908impl<'a, T> DatumList<'a, T> {
2909    pub fn iter(&self) -> DatumListIter<'a> {
2910        DatumListIter { data: self.data }
2911    }
2912
2913    /// Iterate elements as typed `T` values rather than raw `Datum`s.
2914    ///
2915    /// Each datum is decoded and converted via [`FromDatum`]. Since generic
2916    /// type parameters in `#[sqlfunc]` are erased to `Datum<'a>` before code
2917    /// generation, this is monomorphized to an identity conversion at runtime.
2918    pub fn typed_iter(&self) -> DatumListTypedIter<'a, T>
2919    where
2920        T: FromDatum<'a>,
2921    {
2922        DatumListTypedIter {
2923            inner: self.iter(),
2924            _phantom: PhantomData,
2925        }
2926    }
2927
2928    /// For debugging only
2929    pub fn data(&self) -> &'a [u8] {
2930        self.data
2931    }
2932}
2933
2934impl<T> DatumList<'static, T> {
2935    pub fn empty() -> Self {
2936        DatumList::new(&[])
2937    }
2938}
2939
2940impl<'a> IntoIterator for DatumList<'a> {
2941    type Item = Datum<'a>;
2942    type IntoIter = DatumListIter<'a>;
2943    fn into_iter(self) -> DatumListIter<'a> {
2944        self.iter()
2945    }
2946}
2947
2948impl<'a> Iterator for DatumListIter<'a> {
2949    type Item = Datum<'a>;
2950    fn next(&mut self) -> Option<Self::Item> {
2951        if self.data.is_empty() {
2952            None
2953        } else {
2954            Some(unsafe { read_datum(&mut self.data) })
2955        }
2956    }
2957}
2958
2959impl<'a, T: FromDatum<'a>> Iterator for DatumListTypedIter<'a, T> {
2960    type Item = T;
2961    fn next(&mut self) -> Option<Self::Item> {
2962        self.inner.next().map(T::from_datum)
2963    }
2964}
2965
2966impl<'a, T> DatumMap<'a, T> {
2967    pub fn iter(&self) -> DatumDictIter<'a> {
2968        DatumDictIter {
2969            data: self.data,
2970            prev_key: None,
2971        }
2972    }
2973
2974    /// Iterate entries as `(&str, T)` pairs rather than `(&str, Datum)`.
2975    ///
2976    /// Each value datum is converted via [`FromDatum`]. Since generic type
2977    /// parameters in `#[sqlfunc]` are erased to `Datum<'a>` before code
2978    /// generation, this is monomorphized to an identity conversion at runtime.
2979    pub fn typed_iter(&self) -> DatumDictTypedIter<'a, T>
2980    where
2981        T: FromDatum<'a>,
2982    {
2983        DatumDictTypedIter {
2984            inner: self.iter(),
2985            _phantom: PhantomData,
2986        }
2987    }
2988
2989    /// For debugging only
2990    pub fn data(&self) -> &'a [u8] {
2991        self.data
2992    }
2993}
2994
2995impl<T> DatumMap<'static, T> {
2996    pub fn empty() -> Self {
2997        DatumMap::new(&[])
2998    }
2999}
3000
3001impl<'a, T> Debug for DatumMap<'a, T> {
3002    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3003        f.debug_map().entries(self.iter()).finish()
3004    }
3005}
3006
3007impl<'a> IntoIterator for &'a DatumMap<'a> {
3008    type Item = (&'a str, Datum<'a>);
3009    type IntoIter = DatumDictIter<'a>;
3010    fn into_iter(self) -> DatumDictIter<'a> {
3011        self.iter()
3012    }
3013}
3014
3015impl<'a> Iterator for DatumDictIter<'a> {
3016    type Item = (&'a str, Datum<'a>);
3017    fn next(&mut self) -> Option<Self::Item> {
3018        if self.data.is_empty() {
3019            None
3020        } else {
3021            let key_tag =
3022                Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
3023            assert!(
3024                key_tag == Tag::StringTiny
3025                    || key_tag == Tag::StringShort
3026                    || key_tag == Tag::StringLong
3027                    || key_tag == Tag::StringHuge,
3028                "Dict keys must be strings, got {:?}",
3029                key_tag
3030            );
3031            let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
3032            let val = unsafe { read_datum(&mut self.data) };
3033
3034            // if in debug mode, sanity check keys
3035            if cfg!(debug_assertions) {
3036                if let Some(prev_key) = self.prev_key {
3037                    debug_assert!(
3038                        prev_key < key,
3039                        "Dict keys must be unique and given in ascending order: {} came before {}",
3040                        prev_key,
3041                        key
3042                    );
3043                }
3044                self.prev_key = Some(key);
3045            }
3046
3047            Some((key, val))
3048        }
3049    }
3050}
3051
3052impl<'a, T: FromDatum<'a>> Iterator for DatumDictTypedIter<'a, T> {
3053    type Item = (&'a str, T);
3054    fn next(&mut self) -> Option<Self::Item> {
3055        self.inner.next().map(|(k, v)| (k, T::from_datum(v)))
3056    }
3057}
3058
3059impl RowArena {
3060    pub fn new() -> Self {
3061        RowArena {
3062            inner: RefCell::new(vec![]),
3063            scratch: RefCell::new(Vec::new()),
3064        }
3065    }
3066
3067    /// Creates a `RowArena` with an initial region sized to hold `capacity` bytes, to avoid
3068    /// reallocations as the first datums are created in the arena.
3069    pub fn with_capacity(capacity: usize) -> Self {
3070        let mut inner = Vec::new();
3071        if capacity > 0 {
3072            inner.push(Vec::with_capacity(capacity));
3073        }
3074        RowArena {
3075            inner: RefCell::new(inner),
3076            scratch: RefCell::new(Vec::new()),
3077        }
3078    }
3079
3080    /// Ensures the active region can hold at least `additional` more bytes without allocating a
3081    /// new region. Call this when you expect to push roughly `additional` bytes next.
3082    pub fn reserve(&self, additional: usize) {
3083        if additional == 0 {
3084            return;
3085        }
3086        let mut inner = self.inner.borrow_mut();
3087        match inner.last_mut() {
3088            // The active region is empty, so nothing references it yet and it is safe to grow it
3089            // in place (a reallocation cannot dangle a live reference).
3090            Some(active) if active.is_empty() => {
3091                if active.capacity() < additional {
3092                    active.reserve_exact(additional);
3093                }
3094            }
3095            // The active region holds live data; we cannot grow it without moving those bytes, so
3096            // stage a fresh region. Size it like `push_bytes` does (at least double the current
3097            // region) so a sequence of small `reserve`s still yields at most log-many regions
3098            // rather than many small ones.
3099            Some(active) => {
3100                let new_cap = std::cmp::max(additional, active.capacity().saturating_mul(2));
3101                inner.push(Vec::with_capacity(new_cap));
3102            }
3103            None => inner.push(Vec::with_capacity(additional)),
3104        }
3105    }
3106
3107    /// Copies `bytes` into the arena and returns a reference valid for its lifetime.
3108    ///
3109    /// Accepts anything that derefs to `[u8]` (e.g. `Vec<u8>`, `&[u8]`); the bytes are copied, so
3110    /// the caller's allocation is not retained.
3111    #[allow(clippy::transmute_ptr_to_ptr)]
3112    pub fn push_bytes<'a, B: Deref<Target = [u8]>>(&'a self, bytes: B) -> &'a [u8] {
3113        let bytes: &[u8] = &bytes;
3114        let need = bytes.len();
3115        if need == 0 {
3116            return &[];
3117        }
3118        let mut inner = self.inner.borrow_mut();
3119
3120        // Find or create a region with spare capacity for `need` bytes, never growing a region
3121        // that already holds data (see the type-level comment for why this preserves references).
3122        let has_room = inner
3123            .last()
3124            .map_or(false, |region| region.capacity() - region.len() >= need);
3125        if !has_room {
3126            let last_cap = inner.last().map_or(0, |region| region.capacity());
3127            let new_cap = std::cmp::max(need, last_cap.saturating_mul(2));
3128            inner.push(Vec::with_capacity(new_cap));
3129        }
3130
3131        let region = inner.last_mut().expect("region present");
3132        let start = region.len();
3133        region.extend_from_slice(bytes);
3134        let copied = &region[start..];
3135        unsafe {
3136            // This is safe because:
3137            //   * `copied` references bytes inside `region`'s heap buffer, which we just sized to
3138            //     fit without reallocating; that buffer is never resized again while it holds data
3139            //     (we allocate a new region instead), so the reference stays valid.
3140            //   * The buffer lives as long as the arena: regions are only dropped by `clear`/`drop`,
3141            //     both of which take `&mut`/ownership, so no `&'a self`-tied reference can outlive
3142            //     them.
3143            //   * Pushing further regions may reallocate `self.inner`, but that moves only the
3144            //     `Vec<u8>` headers, not the heap buffers they own.
3145            transmute::<&[u8], &'a [u8]>(copied)
3146        }
3147    }
3148
3149    /// Copies `string` into the arena and returns a reference valid for its lifetime.
3150    pub fn push_string<'a>(&'a self, string: String) -> &'a str {
3151        let copied = self.push_bytes(string.as_bytes());
3152        unsafe {
3153            // This is safe because we just copied the bytes of a valid `String`.
3154            std::str::from_utf8_unchecked(copied)
3155        }
3156    }
3157
3158    /// Returns a growable, writeable byte buffer for assembling a value incrementally.
3159    ///
3160    /// Write into it with [`RowArenaBuf::push`], [`RowArenaBuf::extend_from_slice`], or
3161    /// [`std::io::Write`], then call [`RowArenaBuf::finish`] to copy the result into the arena and
3162    /// obtain a reference valid for the arena's lifetime. The backing buffer is a single scratch
3163    /// allocation reused across writers, so this lets a producer that builds bytes piecewise (e.g.
3164    /// decoding a row) avoid managing its own scratch. Only one writer may be live at a time.
3165    pub fn writer(&self) -> RowArenaBuf<'_> {
3166        let mut scratch = self.scratch.borrow_mut();
3167        scratch.clear();
3168        RowArenaBuf {
3169            arena: self,
3170            scratch,
3171        }
3172    }
3173
3174    /// Take ownership of `row` for the lifetime of the arena, returning a
3175    /// reference to the first datum in the row.
3176    ///
3177    /// If we had an owned datum type, this method would be much clearer, and
3178    /// would be called `push_owned_datum`.
3179    pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
3180        let copied = self.push_bytes(row.data());
3181        unsafe {
3182            // This is safe because `copied` is a valid encoding of a single datum (we just packed
3183            // it into `row`), backed by the arena for the lifetime `'a`. Copying the bytes also
3184            // sidesteps the `Row`'s inline (`SmallVec`) storage entirely.
3185            let datum = read_datum(&mut &copied[..]);
3186            transmute::<Datum<'_>, Datum<'a>>(datum)
3187        }
3188    }
3189
3190    /// Equivalent to `push_unary_row` but returns a `DatumNested` rather than a
3191    /// `Datum`.
3192    fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
3193        let copied = self.push_bytes(row.data());
3194        unsafe {
3195            // Safe for the same reasons as `push_unary_row`.
3196            let nested = DatumNested::extract(&mut &copied[..]);
3197            transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
3198        }
3199    }
3200
3201    /// Convenience function to make a new `Row` containing a single datum, and
3202    /// take ownership of it for the lifetime of the arena
3203    ///
3204    /// ```
3205    /// # use mz_repr::{RowArena, Datum};
3206    /// let arena = RowArena::new();
3207    /// let datum = arena.make_datum(|packer| {
3208    ///   packer.push_list(&[Datum::String("hello"), Datum::String("world")]);
3209    /// });
3210    /// assert_eq!(datum.unwrap_list().iter().collect::<Vec<_>>(), vec![Datum::String("hello"), Datum::String("world")]);
3211    /// ```
3212    pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
3213    where
3214        F: FnOnce(&mut RowPacker),
3215    {
3216        let mut row = Row::default();
3217        f(&mut row.packer());
3218        self.push_unary_row(row)
3219    }
3220
3221    /// Convenience function to build a list datum from an iterator of typed
3222    /// elements and return it as a `DatumList<'a, T>`.
3223    ///
3224    /// By accepting an iterator of `T: Borrow<Datum>` instead of a raw
3225    /// `RowPacker` closure, this guarantees that only elements of type `T`
3226    /// are pushed.
3227    pub fn make_datum_list<'a, T: std::borrow::Borrow<Datum<'a>>>(
3228        &'a self,
3229        iter: impl IntoIterator<Item = T>,
3230    ) -> DatumList<'a, T> {
3231        let datum = self.make_datum(|packer| {
3232            packer.push_list_with(|packer| {
3233                for elem in iter {
3234                    packer.push(*elem.borrow());
3235                }
3236            });
3237        });
3238        DatumList::new(datum.unwrap_list().data())
3239    }
3240
3241    /// Convenience function identical to `make_datum` but instead returns a
3242    /// `DatumNested`.
3243    pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
3244    where
3245        F: FnOnce(&mut RowPacker),
3246    {
3247        let mut row = Row::default();
3248        f(&mut row.packer());
3249        self.push_unary_row_datum_nested(row)
3250    }
3251
3252    /// Like [`RowArena::make_datum`], but the provided closure can return an error.
3253    pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
3254    where
3255        F: FnOnce(&mut RowPacker) -> Result<(), E>,
3256    {
3257        let mut row = Row::default();
3258        f(&mut row.packer())?;
3259        Ok(self.push_unary_row(row))
3260    }
3261
3262    /// Clear the contents of the arena.
3263    ///
3264    /// Retains the single largest region (emptied) so the arena can be reused without
3265    /// reallocating; a workload that clears between uses of similar size becomes allocation-free.
3266    pub fn clear(&mut self) {
3267        let inner = self.inner.get_mut();
3268        // Keep only the largest-capacity region, reset to empty, and drop the rest. Because region
3269        // capacities only ever grow (each new region at least doubles the previous), the largest is
3270        // normally the last; we scan for it defensively, which is cheap given log-many regions.
3271        if let Some(largest) = (0..inner.len()).max_by_key(|&i| inner[i].capacity()) {
3272            inner.swap(0, largest);
3273            inner.truncate(1);
3274            inner[0].clear();
3275        }
3276    }
3277}
3278
3279impl Default for RowArena {
3280    fn default() -> RowArena {
3281        RowArena::new()
3282    }
3283}
3284
3285/// A growable, writeable byte buffer that builds a value into a [`RowArena`]'s scratch space.
3286///
3287/// Obtained from [`RowArena::writer`]. Behaves like a writeable byte slice (push/extend bytes,
3288/// read back as `&[u8]`); [`RowArenaBuf::finish`] copies the assembled bytes into the arena and
3289/// returns a reference valid for the arena's lifetime. The scratch allocation is held for the
3290/// writer's lifetime and reused by later writers.
3291#[derive(Debug)]
3292pub struct RowArenaBuf<'a> {
3293    arena: &'a RowArena,
3294    scratch: RefMut<'a, Vec<u8>>,
3295}
3296
3297impl<'a> RowArenaBuf<'a> {
3298    /// Appends a single byte.
3299    pub fn push(&mut self, byte: u8) {
3300        self.scratch.push(byte);
3301    }
3302
3303    /// Appends a slice of bytes.
3304    pub fn extend_from_slice(&mut self, bytes: &[u8]) {
3305        self.scratch.extend_from_slice(bytes);
3306    }
3307
3308    /// The bytes written so far.
3309    pub fn as_slice(&self) -> &[u8] {
3310        &self.scratch
3311    }
3312
3313    /// The number of bytes written so far.
3314    pub fn len(&self) -> usize {
3315        self.scratch.len()
3316    }
3317
3318    /// Whether no bytes have been written.
3319    pub fn is_empty(&self) -> bool {
3320        self.scratch.is_empty()
3321    }
3322
3323    /// Copies the written bytes into the arena, returning a reference valid for its lifetime.
3324    pub fn finish(self) -> &'a [u8] {
3325        self.arena.push_bytes(self.scratch.as_slice())
3326    }
3327
3328    /// Like [`RowArenaBuf::finish`], but returns the bytes as a `&str`.
3329    ///
3330    /// Intended for buffers written via [`std::fmt::Write`] (e.g. `write!`), whose contents are
3331    /// valid UTF-8. Panics if the bytes are not valid UTF-8.
3332    pub fn finish_str(self) -> &'a str {
3333        let bytes = self.arena.push_bytes(self.scratch.as_slice());
3334        std::str::from_utf8(bytes).expect("RowArenaBuf::finish_str on non-UTF-8 contents")
3335    }
3336}
3337
3338impl<'a> std::ops::Deref for RowArenaBuf<'a> {
3339    type Target = [u8];
3340    fn deref(&self) -> &[u8] {
3341        &self.scratch
3342    }
3343}
3344
3345impl<'a> std::io::Write for RowArenaBuf<'a> {
3346    fn write(&mut self, bytes: &[u8]) -> std::io::Result<usize> {
3347        self.scratch.extend_from_slice(bytes);
3348        Ok(bytes.len())
3349    }
3350
3351    fn flush(&mut self) -> std::io::Result<()> {
3352        Ok(())
3353    }
3354}
3355
3356impl<'a> std::fmt::Write for RowArenaBuf<'a> {
3357    fn write_str(&mut self, s: &str) -> std::fmt::Result {
3358        self.scratch.extend_from_slice(s.as_bytes());
3359        Ok(())
3360    }
3361}
3362
3363/// A thread-local row, which can be borrowed and returned.
3364/// # Example
3365///
3366/// Use this type instead of creating a new row:
3367/// ```
3368/// use mz_repr::SharedRow;
3369///
3370/// let mut row_builder = SharedRow::get();
3371/// ```
3372///
3373/// This allows us to reuse an existing row allocation instead of creating a new one or retaining
3374/// an allocation locally. Additionally, we can observe the size of the local row in a central
3375/// place and potentially reallocate to reduce memory needs.
3376///
3377/// # Panic
3378///
3379/// [`SharedRow::get`] panics when trying to obtain multiple references to the shared row.
3380#[derive(Debug)]
3381pub struct SharedRow(Row);
3382
3383impl SharedRow {
3384    thread_local! {
3385        /// A thread-local slot containing a shared Row that can be temporarily used by a function.
3386        /// There can be at most one active user of this Row, which is tracked by the state of the
3387        /// `Option<_>` wrapper. When it is `Some(..)`, the row is available for using. When it
3388        /// is `None`, it is not, and the constructor will panic if a thread attempts to use it.
3389        static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
3390    }
3391
3392    /// Get the shared row.
3393    ///
3394    /// The row's contents are cleared before returning it.
3395    ///
3396    /// # Panic
3397    ///
3398    /// Panics when the row is already borrowed elsewhere.
3399    pub fn get() -> Self {
3400        let mut row = Self::SHARED_ROW
3401            .take()
3402            .expect("attempted to borrow already borrowed SharedRow");
3403        // Clear row
3404        row.packer();
3405        Self(row)
3406    }
3407
3408    /// Gets the shared row and uses it to pack `iter`.
3409    pub fn pack<'a, I, D>(iter: I) -> Row
3410    where
3411        I: IntoIterator<Item = D>,
3412        D: Borrow<Datum<'a>>,
3413    {
3414        let mut row_builder = Self::get();
3415        let mut row_packer = row_builder.packer();
3416        row_packer.extend(iter);
3417        row_builder.clone()
3418    }
3419}
3420
3421impl std::ops::Deref for SharedRow {
3422    type Target = Row;
3423
3424    fn deref(&self) -> &Self::Target {
3425        &self.0
3426    }
3427}
3428
3429impl std::ops::DerefMut for SharedRow {
3430    fn deref_mut(&mut self) -> &mut Self::Target {
3431        &mut self.0
3432    }
3433}
3434
3435impl Drop for SharedRow {
3436    fn drop(&mut self) {
3437        // Take the Row allocation from this instance and put it back in the thread local slot for
3438        // the next user. The Row in `self` is replaced with an empty Row which does not allocate.
3439        Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
3440    }
3441}
3442
3443#[cfg(test)]
3444mod tests {
3445    use std::cmp::Ordering;
3446    use std::collections::hash_map::DefaultHasher;
3447    use std::hash::{Hash, Hasher};
3448
3449    use chrono::{DateTime, NaiveDate};
3450    use itertools::Itertools;
3451    use mz_ore::{assert_err, assert_none};
3452    use ordered_float::OrderedFloat;
3453
3454    use crate::SqlScalarType;
3455
3456    use super::*;
3457
3458    fn hash<T: Hash>(t: &T) -> u64 {
3459        let mut hasher = DefaultHasher::new();
3460        t.hash(&mut hasher);
3461        hasher.finish()
3462    }
3463
3464    #[mz_ore::test]
3465    fn test_assumptions() {
3466        assert_eq!(size_of::<Tag>(), 1);
3467        #[cfg(target_endian = "big")]
3468        {
3469            // if you want to run this on a big-endian cpu, we'll need big-endian versions of the serialization code
3470            assert!(false);
3471        }
3472    }
3473
3474    #[mz_ore::test]
3475    fn miri_test_arena() {
3476        let arena = RowArena::new();
3477
3478        assert_eq!(arena.push_string("".to_owned()), "");
3479        assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
3480
3481        let empty: &[u8] = &[];
3482        assert_eq!(arena.push_bytes(vec![]), empty);
3483        assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
3484
3485        let mut row = Row::default();
3486        let mut packer = row.packer();
3487        packer.push_dict_with(|row| {
3488            row.push(Datum::String("a"));
3489            row.push_list_with(|row| {
3490                row.push(Datum::String("one"));
3491                row.push(Datum::String("two"));
3492                row.push(Datum::String("three"));
3493            });
3494            row.push(Datum::String("b"));
3495            row.push(Datum::String("c"));
3496        });
3497        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
3498    }
3499
3500    #[mz_ore::test]
3501    fn miri_test_arena_growth_keeps_references() {
3502        // References returned by `push_bytes` must stay valid as later pushes allocate new
3503        // regions; this exercises the "never resize a region that holds data" invariant.
3504        let arena = RowArena::new();
3505        let chunks: Vec<Vec<u8>> = (0..128u16)
3506            .map(|i| vec![u8::try_from(i % 256).unwrap(); usize::from(i % 13) + 1])
3507            .collect();
3508        let refs: Vec<&[u8]> = chunks
3509            .iter()
3510            .map(|c| arena.push_bytes(c.as_slice()))
3511            .collect();
3512        for (i, r) in refs.iter().enumerate() {
3513            assert_eq!(*r, chunks[i].as_slice());
3514        }
3515    }
3516
3517    #[mz_ore::test]
3518    fn miri_test_arena_unary_row_at_offset() {
3519        // A row pushed after other bytes lands at a non-zero offset within a region; reading it
3520        // back must not depend on the row starting at offset zero or on any alignment.
3521        let arena = RowArena::new();
3522        arena.reserve(4096);
3523        let _pad = arena.push_bytes(vec![0xAB; 5]);
3524        let row = Row::pack_slice(&[Datum::String("hello"), Datum::Int64(42), Datum::True]);
3525        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
3526    }
3527
3528    #[mz_ore::test]
3529    fn miri_test_arena_clear_reuse() {
3530        // After `clear` the arena retains a region and remains usable across cycles.
3531        let mut arena = RowArena::new();
3532        for i in 0..100u8 {
3533            let _ = arena.push_bytes(vec![i; 16]);
3534        }
3535        arena.clear();
3536        assert_eq!(arena.push_bytes(vec![7u8; 8]), &[7u8; 8]);
3537        assert_eq!(arena.push_string("after clear".to_owned()), "after clear");
3538        arena.clear();
3539        let empty: &[u8] = &[];
3540        assert_eq!(arena.push_bytes(Vec::<u8>::new()), empty);
3541    }
3542
3543    #[mz_ore::test]
3544    fn miri_test_arena_writer() {
3545        use std::io::Write;
3546
3547        let arena = RowArena::new();
3548
3549        // Build a value incrementally and commit it.
3550        let mut w = arena.writer();
3551        let mut expected = Vec::new();
3552        for i in 0..1000u16 {
3553            let byte = u8::try_from(i % 256).unwrap();
3554            w.push(byte);
3555            expected.push(byte);
3556            w.extend_from_slice(&[byte, byte]);
3557            expected.extend_from_slice(&[byte, byte]);
3558        }
3559        assert_eq!(w.as_slice(), expected.as_slice());
3560        assert_eq!(w.len(), expected.len());
3561        let first = w.finish();
3562        assert_eq!(first, expected.as_slice());
3563
3564        // A second writer reuses the scratch; its result is independent of the first, which stays
3565        // valid because `finish` copied it into the arena.
3566        let mut w2 = arena.writer();
3567        write!(w2, "hello").unwrap();
3568        let second = w2.finish();
3569        assert_eq!(second, b"hello");
3570        assert_eq!(first, expected.as_slice());
3571
3572        // An empty writer commits to an empty slice.
3573        let empty: &[u8] = &[];
3574        assert_eq!(arena.writer().finish(), empty);
3575
3576        // Abandoning a writer without finishing is fine; the next writer starts empty.
3577        {
3578            let mut w3 = arena.writer();
3579            w3.extend_from_slice(b"discarded");
3580        }
3581        assert_eq!(arena.writer().as_slice(), empty);
3582    }
3583
3584    #[mz_ore::test]
3585    fn miri_test_arena_writer_fmt() {
3586        use std::fmt::Write;
3587
3588        // Format text into the writer (e.g. building a cast-to-string result) and commit as `&str`.
3589        let arena = RowArena::new();
3590        let mut w = arena.writer();
3591        for i in 0..5 {
3592            write!(w, "{i},").unwrap();
3593        }
3594        assert_eq!(w.finish_str(), "0,1,2,3,4,");
3595    }
3596
3597    #[mz_ore::test]
3598    fn miri_test_round_trip() {
3599        fn round_trip(datums: Vec<Datum>) {
3600            let row = Row::pack(datums.clone());
3601
3602            // When run under miri this catches undefined bytes written to data
3603            // eg by calling push_copy! on a type which contains undefined padding values
3604            println!("{:?}", row.data());
3605
3606            let datums2 = row.iter().collect::<Vec<_>>();
3607            let datums3 = row.unpack();
3608            assert_eq!(datums, datums2);
3609            assert_eq!(datums, datums3);
3610        }
3611
3612        round_trip(vec![]);
3613        round_trip(
3614            SqlScalarType::enumerate()
3615                .iter()
3616                .flat_map(|r#type| r#type.interesting_datums())
3617                .collect(),
3618        );
3619        round_trip(vec![
3620            Datum::Null,
3621            Datum::Null,
3622            Datum::False,
3623            Datum::True,
3624            Datum::Int16(-21),
3625            Datum::Int32(-42),
3626            Datum::Int64(-2_147_483_648 - 42),
3627            Datum::UInt8(0),
3628            Datum::UInt8(1),
3629            Datum::UInt16(0),
3630            Datum::UInt16(1),
3631            Datum::UInt16(1 << 8),
3632            Datum::UInt32(0),
3633            Datum::UInt32(1),
3634            Datum::UInt32(1 << 8),
3635            Datum::UInt32(1 << 16),
3636            Datum::UInt32(1 << 24),
3637            Datum::UInt64(0),
3638            Datum::UInt64(1),
3639            Datum::UInt64(1 << 8),
3640            Datum::UInt64(1 << 16),
3641            Datum::UInt64(1 << 24),
3642            Datum::UInt64(1 << 32),
3643            Datum::UInt64(1 << 40),
3644            Datum::UInt64(1 << 48),
3645            Datum::UInt64(1 << 56),
3646            Datum::Float32(OrderedFloat::from(-42.12)),
3647            Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
3648            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
3649            Datum::Timestamp(
3650                CheckedTimestamp::from_timestamplike(
3651                    NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
3652                        .unwrap()
3653                        .and_hms_opt(14, 32, 11)
3654                        .unwrap(),
3655                )
3656                .unwrap(),
3657            ),
3658            Datum::TimestampTz(
3659                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3660                    .unwrap(),
3661            ),
3662            Datum::Interval(Interval {
3663                months: 312,
3664                ..Default::default()
3665            }),
3666            Datum::Interval(Interval::new(0, 0, 1_012_312)),
3667            Datum::Bytes(&[]),
3668            Datum::Bytes(&[0, 2, 1, 255]),
3669            Datum::String(""),
3670            Datum::String("العَرَبِيَّة"),
3671        ]);
3672    }
3673
3674    #[mz_ore::test]
3675    fn test_array() {
3676        // Construct an array using `Row::push_array` and verify that it unpacks
3677        // correctly.
3678        const DIM: ArrayDimension = ArrayDimension {
3679            lower_bound: 2,
3680            length: 2,
3681        };
3682        let mut row = Row::default();
3683        let mut packer = row.packer();
3684        packer
3685            .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3686            .unwrap();
3687        let arr1 = row.unpack_first().unwrap_array();
3688        assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3689        assert_eq!(
3690            arr1.elements().into_iter().collect::<Vec<_>>(),
3691            vec![Datum::Int32(1), Datum::Int32(2)]
3692        );
3693
3694        // Pack a previously-constructed `Datum::Array` and verify that it
3695        // unpacks correctly.
3696        let row = Row::pack_slice(&[Datum::Array(arr1)]);
3697        let arr2 = row.unpack_first().unwrap_array();
3698        assert_eq!(arr1, arr2);
3699    }
3700
3701    #[mz_ore::test]
3702    fn test_multidimensional_array() {
3703        let datums = vec![
3704            Datum::Int32(1),
3705            Datum::Int32(2),
3706            Datum::Int32(3),
3707            Datum::Int32(4),
3708            Datum::Int32(5),
3709            Datum::Int32(6),
3710            Datum::Int32(7),
3711            Datum::Int32(8),
3712        ];
3713
3714        let mut row = Row::default();
3715        let mut packer = row.packer();
3716        packer
3717            .try_push_array(
3718                &[
3719                    ArrayDimension {
3720                        lower_bound: 1,
3721                        length: 1,
3722                    },
3723                    ArrayDimension {
3724                        lower_bound: 1,
3725                        length: 4,
3726                    },
3727                    ArrayDimension {
3728                        lower_bound: 1,
3729                        length: 2,
3730                    },
3731                ],
3732                &datums,
3733            )
3734            .unwrap();
3735        let array = row.unpack_first().unwrap_array();
3736        assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3737    }
3738
3739    #[mz_ore::test]
3740    fn test_array_max_dimensions() {
3741        let mut row = Row::default();
3742        let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3743
3744        // An array with one too many dimensions should be rejected.
3745        let res = row.packer().try_push_array(
3746            &vec![
3747                ArrayDimension {
3748                    lower_bound: 1,
3749                    length: 1
3750                };
3751                max_dims + 1
3752            ],
3753            vec![Datum::Int32(4)],
3754        );
3755        assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3756        assert!(row.data.is_empty());
3757
3758        // An array with exactly the maximum allowable dimensions should be
3759        // accepted.
3760        row.packer()
3761            .try_push_array(
3762                &vec![
3763                    ArrayDimension {
3764                        lower_bound: 1,
3765                        length: 1
3766                    };
3767                    max_dims
3768                ],
3769                vec![Datum::Int32(4)],
3770            )
3771            .unwrap();
3772    }
3773
3774    #[mz_ore::test]
3775    fn test_array_wrong_cardinality() {
3776        let mut row = Row::default();
3777        let res = row.packer().try_push_array(
3778            &[
3779                ArrayDimension {
3780                    lower_bound: 1,
3781                    length: 2,
3782                },
3783                ArrayDimension {
3784                    lower_bound: 1,
3785                    length: 3,
3786                },
3787            ],
3788            vec![Datum::Int32(1), Datum::Int32(2)],
3789        );
3790        assert_eq!(
3791            res,
3792            Err(InvalidArrayError::WrongCardinality {
3793                actual: 2,
3794                expected: 6,
3795            })
3796        );
3797        assert!(row.data.is_empty());
3798    }
3799
3800    #[mz_ore::test]
3801    fn test_array_cardinality_overflow() {
3802        // Dimension lengths whose product overflows `usize` must be rejected as
3803        // a `WrongCardinality` error, not panic (under overflow checks) or wrap
3804        // (in release, which could spuriously accept a corrupt array). The
3805        // product saturates to `usize::MAX`, which no real element count matches.
3806        let mut row = Row::default();
3807        let res = row.packer().try_push_array(
3808            &[
3809                ArrayDimension {
3810                    lower_bound: 1,
3811                    length: usize::MAX,
3812                },
3813                ArrayDimension {
3814                    lower_bound: 1,
3815                    length: 2,
3816                },
3817            ],
3818            vec![Datum::Int32(1), Datum::Int32(2)],
3819        );
3820        assert_eq!(
3821            res,
3822            Err(InvalidArrayError::WrongCardinality {
3823                actual: 2,
3824                expected: usize::MAX,
3825            })
3826        );
3827        assert!(row.data.is_empty());
3828    }
3829
3830    #[mz_ore::test]
3831    fn test_nesting() {
3832        let mut row = Row::default();
3833        row.packer().push_dict_with(|row| {
3834            row.push(Datum::String("favourites"));
3835            row.push_list_with(|row| {
3836                row.push(Datum::String("ice cream"));
3837                row.push(Datum::String("oreos"));
3838                row.push(Datum::String("cheesecake"));
3839            });
3840            row.push(Datum::String("name"));
3841            row.push(Datum::String("bob"));
3842        });
3843
3844        let mut iter = row.unpack_first().unwrap_map().iter();
3845
3846        let (k, v) = iter.next().unwrap();
3847        assert_eq!(k, "favourites");
3848        assert_eq!(
3849            v.unwrap_list().iter().collect::<Vec<_>>(),
3850            vec![
3851                Datum::String("ice cream"),
3852                Datum::String("oreos"),
3853                Datum::String("cheesecake"),
3854            ]
3855        );
3856
3857        let (k, v) = iter.next().unwrap();
3858        assert_eq!(k, "name");
3859        assert_eq!(v, Datum::String("bob"));
3860    }
3861
3862    #[mz_ore::test]
3863    fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3864        let pack = |ok| {
3865            let mut row = Row::default();
3866            row.packer().push_dict_with(|row| {
3867                if ok {
3868                    row.push(Datum::String("key"));
3869                    row.push(Datum::Int32(42));
3870                    Ok(7)
3871                } else {
3872                    Err("fail")
3873                }
3874            })?;
3875            Ok(row)
3876        };
3877
3878        assert_eq!(pack(false), Err("fail"));
3879
3880        let row = pack(true)?;
3881        let mut dict = row.unpack_first().unwrap_map().iter();
3882        assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3883        assert_eq!(dict.next(), None);
3884
3885        Ok(())
3886    }
3887
3888    #[mz_ore::test]
3889    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
3890    fn test_datum_sizes() {
3891        let arena = RowArena::new();
3892
3893        // Test the claims about various datum sizes.
3894        let values_of_interest = vec![
3895            Datum::Null,
3896            Datum::False,
3897            Datum::Int16(0),
3898            Datum::Int32(0),
3899            Datum::Int64(0),
3900            Datum::UInt8(0),
3901            Datum::UInt8(1),
3902            Datum::UInt16(0),
3903            Datum::UInt16(1),
3904            Datum::UInt16(1 << 8),
3905            Datum::UInt32(0),
3906            Datum::UInt32(1),
3907            Datum::UInt32(1 << 8),
3908            Datum::UInt32(1 << 16),
3909            Datum::UInt32(1 << 24),
3910            Datum::UInt64(0),
3911            Datum::UInt64(1),
3912            Datum::UInt64(1 << 8),
3913            Datum::UInt64(1 << 16),
3914            Datum::UInt64(1 << 24),
3915            Datum::UInt64(1 << 32),
3916            Datum::UInt64(1 << 40),
3917            Datum::UInt64(1 << 48),
3918            Datum::UInt64(1 << 56),
3919            Datum::Float32(OrderedFloat(0.0)),
3920            Datum::Float64(OrderedFloat(0.0)),
3921            Datum::from(numeric::Numeric::from(0)),
3922            Datum::from(numeric::Numeric::from(1000)),
3923            Datum::from(numeric::Numeric::from(9999)),
3924            Datum::Date(
3925                NaiveDate::from_ymd_opt(1, 1, 1)
3926                    .unwrap()
3927                    .try_into()
3928                    .unwrap(),
3929            ),
3930            Datum::Timestamp(
3931                CheckedTimestamp::from_timestamplike(
3932                    DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3933                )
3934                .unwrap(),
3935            ),
3936            Datum::TimestampTz(
3937                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3938                    .unwrap(),
3939            ),
3940            Datum::Interval(Interval::default()),
3941            Datum::Bytes(&[]),
3942            Datum::String(""),
3943            Datum::JsonNull,
3944            Datum::Range(Range { inner: None }),
3945            arena.make_datum(|packer| {
3946                packer
3947                    .push_range(Range::new(Some((
3948                        RangeLowerBound::new(Datum::Int32(-1), true),
3949                        RangeUpperBound::new(Datum::Int32(1), true),
3950                    ))))
3951                    .unwrap();
3952            }),
3953        ];
3954        for value in values_of_interest {
3955            if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3956                panic!("Disparity in claimed size for {:?}", value);
3957            }
3958        }
3959    }
3960
3961    #[mz_ore::test]
3962    fn test_range_errors() {
3963        fn test_range_errors_inner<'a>(
3964            datums: Vec<Vec<Datum<'a>>>,
3965        ) -> Result<(), InvalidRangeError> {
3966            let mut row = Row::default();
3967            let row_len = row.byte_len();
3968            let mut packer = row.packer();
3969            let r = packer.push_range_with(
3970                RangeLowerBound {
3971                    inclusive: true,
3972                    bound: Some(|row: &mut RowPacker| {
3973                        for d in &datums[0] {
3974                            row.push(d);
3975                        }
3976                        Ok(())
3977                    }),
3978                },
3979                RangeUpperBound {
3980                    inclusive: true,
3981                    bound: Some(|row: &mut RowPacker| {
3982                        for d in &datums[1] {
3983                            row.push(d);
3984                        }
3985                        Ok(())
3986                    }),
3987                },
3988            );
3989
3990            assert_eq!(row_len, row.byte_len());
3991
3992            r
3993        }
3994
3995        for panicking_case in [
3996            vec![vec![Datum::Int32(1)], vec![]],
3997            vec![
3998                vec![Datum::Int32(1), Datum::Int32(2)],
3999                vec![Datum::Int32(3)],
4000            ],
4001            vec![
4002                vec![Datum::Int32(1)],
4003                vec![Datum::Int32(2), Datum::Int32(3)],
4004            ],
4005            vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
4006            vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
4007            vec![vec![Datum::Null], vec![Datum::Int32(2)]],
4008            vec![vec![Datum::Int32(1)], vec![Datum::Null]],
4009        ] {
4010            #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
4011            let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
4012            assert_err!(result);
4013        }
4014
4015        let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
4016        assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
4017    }
4018
4019    /// Lists have a variable-length encoding for their lengths. We test each case here.
4020    #[mz_ore::test]
4021    #[cfg_attr(miri, ignore)] // slow
4022    fn test_list_encoding() {
4023        fn test_list_encoding_inner(len: usize) {
4024            let list_elem = |i: usize| {
4025                if i % 2 == 0 {
4026                    Datum::False
4027                } else {
4028                    Datum::True
4029                }
4030            };
4031            let mut row = Row::default();
4032            {
4033                // Push some stuff.
4034                let mut packer = row.packer();
4035                packer.push(Datum::String("start"));
4036                packer.push_list_with(|packer| {
4037                    for i in 0..len {
4038                        packer.push(list_elem(i));
4039                    }
4040                });
4041                packer.push(Datum::String("end"));
4042            }
4043            // Check that we read back exactly what we pushed.
4044            let mut row_it = row.iter();
4045            assert_eq!(row_it.next().unwrap(), Datum::String("start"));
4046            match row_it.next().unwrap() {
4047                Datum::List(list) => {
4048                    let mut list_it = list.iter();
4049                    for i in 0..len {
4050                        assert_eq!(list_it.next().unwrap(), list_elem(i));
4051                    }
4052                    assert_none!(list_it.next());
4053                }
4054                _ => panic!("expected Datum::List"),
4055            }
4056            assert_eq!(row_it.next().unwrap(), Datum::String("end"));
4057            assert_none!(row_it.next());
4058        }
4059
4060        test_list_encoding_inner(0);
4061        test_list_encoding_inner(1);
4062        test_list_encoding_inner(10);
4063        test_list_encoding_inner(TINY - 1); // tiny
4064        test_list_encoding_inner(TINY + 1); // short
4065        test_list_encoding_inner(SHORT + 1); // long
4066
4067        // The biggest one takes 40 s on my laptop, probably not worth it.
4068        //test_list_encoding_inner(LONG + 1); // huge
4069    }
4070
4071    /// Demonstrates that DatumList's Eq (bytewise) and Ord (datum-by-datum) are now consistent.
4072    /// A list containing -0.0 and one containing +0.0 have different byte representations
4073    /// (IEEE 754 distinguishes them), originally Eq says they are not equal. But after
4074    /// using the new Datum::cmp, Eq says they are equal, which matches what Ord
4075    /// compares via iter().cmp(other.iter()), and them as equal.
4076    #[mz_ore::test]
4077    fn test_datum_list_eq_ord_consistency() {
4078        // Build list containing +0.0
4079        let mut row_pos = Row::default();
4080        row_pos.packer().push_list_with(|p| {
4081            p.push(Datum::Float64(OrderedFloat::from(0.0)));
4082        });
4083        let list_pos = row_pos.unpack_first().unwrap_list();
4084
4085        // Build list containing -0.0 (distinct bit pattern from +0.0)
4086        let mut row_neg = Row::default();
4087        row_neg.packer().push_list_with(|p| {
4088            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
4089        });
4090        let list_neg = row_neg.unpack_first().unwrap_list();
4091
4092        // Eq is bytewise: different encodings => not equal
4093        // This was a bug in the past, so we test it.
4094        assert_eq!(
4095            list_pos, list_neg,
4096            "Eq should see different encodings as equal"
4097        );
4098
4099        // Ord is datum-by-datum: -0.0 and +0.0 compare equal as Datums
4100        assert_eq!(
4101            list_pos.cmp(&list_neg),
4102            Ordering::Equal,
4103            "Ord (datum-by-datum) should see -0.0 and +0.0 as equal"
4104        );
4105    }
4106
4107    /// Demonstrates that DatumMap's derived Eq (bytewise) can make maps with equal keys and
4108    /// values compare equal when values have different encodings (e.g. -0.0 vs +0.0).
4109    #[mz_ore::test]
4110    fn test_datum_map_eq_bytewise_consistency() {
4111        // Build map {"k": +0.0}
4112        let mut row_pos = Row::default();
4113        row_pos.packer().push_dict_with(|p| {
4114            p.push(Datum::String("k"));
4115            p.push(Datum::Float64(OrderedFloat::from(0.0)));
4116        });
4117        let map_pos = row_pos.unpack_first().unwrap_map();
4118
4119        // Build map {"k": -0.0}
4120        let mut row_neg = Row::default();
4121        row_neg.packer().push_dict_with(|p| {
4122            p.push(Datum::String("k"));
4123            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
4124        });
4125        let map_neg = row_neg.unpack_first().unwrap_map();
4126
4127        // Same keys and semantically equal values, but Eq (bytewise) says not equal
4128        assert_eq!(
4129            map_pos, map_neg,
4130            "DatumMap Eq is semantic; -0.0 and +0.0 have different encodings but are equal"
4131        );
4132        // Verify they have the same logical content
4133        let entries_pos: Vec<_> = map_pos.iter().collect();
4134        let entries_neg: Vec<_> = map_neg.iter().collect();
4135        assert_eq!(entries_pos.len(), entries_neg.len());
4136        for ((k1, v1), (k2, v2)) in entries_pos.iter().zip_eq(entries_neg.iter()) {
4137            assert_eq!(k1, k2);
4138            assert_eq!(
4139                v1, v2,
4140                "Datum-level comparison treats -0.0 and +0.0 as equal"
4141            );
4142        }
4143    }
4144
4145    /// Hash must agree with Eq: equal lists must have the same hash.
4146    #[mz_ore::test]
4147    fn test_datum_list_hash_consistency() {
4148        // Equal lists (including -0.0 vs +0.0) must hash the same
4149        let mut row_pos = Row::default();
4150        row_pos.packer().push_list_with(|p| {
4151            p.push(Datum::Float64(OrderedFloat::from(0.0)));
4152        });
4153        let list_pos = row_pos.unpack_first().unwrap_list();
4154
4155        let mut row_neg = Row::default();
4156        row_neg.packer().push_list_with(|p| {
4157            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
4158        });
4159        let list_neg = row_neg.unpack_first().unwrap_list();
4160
4161        assert_eq!(list_pos, list_neg);
4162        assert_eq!(
4163            hash(&list_pos),
4164            hash(&list_neg),
4165            "equal lists must have same hash"
4166        );
4167
4168        // Unequal lists should have different hashes (with asymptotic probability 1)
4169        let mut row_a = Row::default();
4170        row_a.packer().push_list_with(|p| {
4171            p.push(Datum::Int32(1));
4172            p.push(Datum::Int32(2));
4173        });
4174        let list_a = row_a.unpack_first().unwrap_list();
4175
4176        let mut row_b = Row::default();
4177        row_b.packer().push_list_with(|p| {
4178            p.push(Datum::Int32(1));
4179            p.push(Datum::Int32(3));
4180        });
4181        let list_b = row_b.unpack_first().unwrap_list();
4182
4183        assert_ne!(list_a, list_b);
4184        assert_ne!(
4185            hash(&list_a),
4186            hash(&list_b),
4187            "unequal lists must have different hashes"
4188        );
4189    }
4190
4191    /// Ord/PartialOrd for DatumList: less, equal, greater.
4192    #[mz_ore::test]
4193    fn test_datum_list_ordering() {
4194        let mut row_12 = Row::default();
4195        row_12.packer().push_list_with(|p| {
4196            p.push(Datum::Int32(1));
4197            p.push(Datum::Int32(2));
4198        });
4199        let list_12 = row_12.unpack_first().unwrap_list();
4200
4201        let mut row_13 = Row::default();
4202        row_13.packer().push_list_with(|p| {
4203            p.push(Datum::Int32(1));
4204            p.push(Datum::Int32(3));
4205        });
4206        let list_13 = row_13.unpack_first().unwrap_list();
4207
4208        let mut row_123 = Row::default();
4209        row_123.packer().push_list_with(|p| {
4210            p.push(Datum::Int32(1));
4211            p.push(Datum::Int32(2));
4212            p.push(Datum::Int32(3));
4213        });
4214        let list_123 = row_123.unpack_first().unwrap_list();
4215
4216        // [1, 2] < [1, 3] due to the second element being different
4217        assert_eq!(list_12.cmp(&list_13), Ordering::Less);
4218        assert_eq!(list_13.cmp(&list_12), Ordering::Greater);
4219        assert_eq!(list_12.cmp(&list_12), Ordering::Equal);
4220        // shorter prefix compares less
4221        assert_eq!(list_12.cmp(&list_123), Ordering::Less);
4222    }
4223
4224    /// Hash must agree with Eq: equal maps must have the same hash.
4225    #[mz_ore::test]
4226    fn test_datum_map_hash_consistency() {
4227        let mut row_pos = Row::default();
4228        row_pos.packer().push_dict_with(|p| {
4229            p.push(Datum::String("x"));
4230            p.push(Datum::Float64(OrderedFloat::from(0.0)));
4231        });
4232        let map_pos = row_pos.unpack_first().unwrap_map();
4233
4234        let mut row_neg = Row::default();
4235        row_neg.packer().push_dict_with(|p| {
4236            p.push(Datum::String("x"));
4237            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
4238        });
4239        let map_neg = row_neg.unpack_first().unwrap_map();
4240
4241        assert_eq!(map_pos, map_neg);
4242        assert_eq!(
4243            hash(&map_pos),
4244            hash(&map_neg),
4245            "equal maps must have same hash"
4246        );
4247
4248        let mut row_a = Row::default();
4249        row_a.packer().push_dict_with(|p| {
4250            p.push(Datum::String("a"));
4251            p.push(Datum::Int32(1));
4252        });
4253        let map_a = row_a.unpack_first().unwrap_map();
4254
4255        let mut row_b = Row::default();
4256        row_b.packer().push_dict_with(|p| {
4257            p.push(Datum::String("a"));
4258            p.push(Datum::Int32(2));
4259        });
4260        let map_b = row_b.unpack_first().unwrap_map();
4261
4262        assert_ne!(map_a, map_b);
4263        assert_ne!(
4264            hash(&map_a),
4265            hash(&map_b),
4266            "unequal maps must have different hashes"
4267        );
4268    }
4269
4270    /// Ord/PartialOrd for DatumMap: less, equal, greater (by key then value).
4271    #[mz_ore::test]
4272    fn test_datum_map_ordering() {
4273        let mut row_a1 = Row::default();
4274        row_a1.packer().push_dict_with(|p| {
4275            p.push(Datum::String("a"));
4276            p.push(Datum::Int32(1));
4277        });
4278        let map_a1 = row_a1.unpack_first().unwrap_map();
4279
4280        let mut row_a2 = Row::default();
4281        row_a2.packer().push_dict_with(|p| {
4282            p.push(Datum::String("a"));
4283            p.push(Datum::Int32(2));
4284        });
4285        let map_a2 = row_a2.unpack_first().unwrap_map();
4286
4287        let mut row_b1 = Row::default();
4288        row_b1.packer().push_dict_with(|p| {
4289            p.push(Datum::String("b"));
4290            p.push(Datum::Int32(1));
4291        });
4292        let map_b1 = row_b1.unpack_first().unwrap_map();
4293
4294        assert_eq!(map_a1.cmp(&map_a2), Ordering::Less);
4295        assert_eq!(map_a2.cmp(&map_a1), Ordering::Greater);
4296        assert_eq!(map_a1.cmp(&map_a1), Ordering::Equal);
4297        assert_eq!(map_a1.cmp(&map_b1), Ordering::Less); // "a" < "b"
4298    }
4299
4300    /// Datum puts Null last in the enum so that nulls sort last (PostgreSQL default).
4301    /// This ordering is used when comparing DatumList/DatumMap (e.g. jsonb_agg tiebreaker).
4302    #[mz_ore::test]
4303    fn test_datum_list_and_map_null_sorts_last() {
4304        // DatumList: [1] < [null] so non-null sorts before null
4305        let mut row_list_1 = Row::default();
4306        row_list_1
4307            .packer()
4308            .push_list_with(|p| p.push(Datum::Int32(1)));
4309        let list_1 = row_list_1.unpack_first().unwrap_list();
4310
4311        let mut row_list_null = Row::default();
4312        row_list_null
4313            .packer()
4314            .push_list_with(|p| p.push(Datum::Null));
4315        let list_null = row_list_null.unpack_first().unwrap_list();
4316
4317        assert_eq!(list_1.cmp(&list_null), Ordering::Less);
4318        assert_eq!(list_null.cmp(&list_1), Ordering::Greater);
4319
4320        // DatumMap: {"k": 1} < {"k": null} so non-null sorts before null (same as jsonb_agg)
4321        let mut row_map_1 = Row::default();
4322        row_map_1.packer().push_dict_with(|p| {
4323            p.push(Datum::String("k"));
4324            p.push(Datum::Int32(1));
4325        });
4326        let map_1 = row_map_1.unpack_first().unwrap_map();
4327
4328        let mut row_map_null = Row::default();
4329        row_map_null.packer().push_dict_with(|p| {
4330            p.push(Datum::String("k"));
4331            p.push(Datum::Null);
4332        });
4333        let map_null = row_map_null.unpack_first().unwrap_map();
4334
4335        assert_eq!(map_1.cmp(&map_null), Ordering::Less);
4336        assert_eq!(map_null.cmp(&map_1), Ordering::Greater);
4337    }
4338}