Skip to main content

mz_repr/
row.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::cell::{Cell, RefCell};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::hash::{Hash, Hasher};
16use std::marker::PhantomData;
17use std::mem::{size_of, transmute};
18use std::ops::Deref;
19use std::str;
20
21use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
22use compact_bytes::CompactBytes;
23use mz_ore::cast::{CastFrom, ReinterpretCast};
24use mz_ore::soft_assert_no_log;
25use mz_ore::vec::Vector;
26use mz_persist_types::Codec64;
27use num_enum::{IntoPrimitive, TryFromPrimitive};
28use ordered_float::OrderedFloat;
29use proptest::prelude::*;
30use proptest::strategy::{BoxedStrategy, Strategy};
31use serde::{Deserialize, Serialize};
32use uuid::Uuid;
33
34use crate::adt::array::{
35    Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
36};
37use crate::adt::date::Date;
38use crate::adt::interval::Interval;
39use crate::adt::mz_acl_item::{AclItem, MzAclItem};
40use crate::adt::numeric;
41use crate::adt::numeric::Numeric;
42use crate::adt::range::{
43    self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
44};
45use crate::adt::timestamp::CheckedTimestamp;
46use crate::scalar::{DatumKind, SqlScalarType, arb_datum};
47use crate::{Datum, RelationDesc, Timestamp};
48
49pub(crate) mod encode;
50pub mod iter;
51
52include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
53
54/// A packed representation for `Datum`s.
55///
56/// `Datum` is easy to work with but very space inefficient. A `Datum::Int32(42)`
57/// is laid out in memory like this:
58///
59///   tag: 3
60///   padding: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
61///   data: 0 0 0 42
62///   padding: 0 0 0 0 0 0 0 0 0 0 0 0
63///
64/// For a total of 32 bytes! The second set of padding is needed in case we were
65/// to write a 16-byte datum into this location. The first set of padding is
66/// needed to align that hypothetical decimal to a 16 bytes boundary.
67///
68/// A `Row` stores zero or more `Datum`s without any padding. We avoid the need
69/// for the first set of padding by only providing access to the `Datum`s via
70/// calls to `ptr::read_unaligned`, which on modern x86 is barely penalized. We
71/// avoid the need for the second set of padding by not providing mutable access
72/// to the `Datum`. Instead, `Row` is append-only.
73///
74/// A `Row` can be built from a collection of `Datum`s using `Row::pack`, but it
75/// is more efficient to use `Row::pack_slice` so that a right-sized allocation
76/// can be created. If that is not possible, consider using the row buffer
77/// pattern: allocate one row, pack into it, and then call [`Row::clone`] to
78/// receive a copy of that row, leaving behind the original allocation to pack
79/// future rows.
80///
81/// Creating a row via [`Row::pack_slice`]:
82///
83/// ```
84/// # use mz_repr::{Row, Datum};
85/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
86/// assert_eq!(row.unpack(), vec![Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)])
87/// ```
88///
89/// `Row`s can be unpacked by iterating over them:
90///
91/// ```
92/// # use mz_repr::{Row, Datum};
93/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
94/// assert_eq!(row.iter().nth(1).unwrap(), Datum::Int32(1));
95/// ```
96///
97/// If you want random access to the `Datum`s in a `Row`, use `Row::unpack` to create a `Vec<Datum>`
98/// ```
99/// # use mz_repr::{Row, Datum};
100/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
101/// let datums = row.unpack();
102/// assert_eq!(datums[1], Datum::Int32(1));
103/// ```
104///
105/// # Performance
106///
107/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
108/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
109/// avoids the allocations involved in `Row::new()`.
110#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
111pub struct Row {
112    data: CompactBytes,
113}
114
115impl Row {
116    const SIZE: usize = CompactBytes::MAX_INLINE;
117
118    /// A variant of `Row::from_proto` that allows for reuse of internal allocs
119    /// and validates the decoding against a provided [`RelationDesc`].
120    pub fn decode_from_proto(
121        &mut self,
122        proto: &ProtoRow,
123        desc: &RelationDesc,
124    ) -> Result<(), String> {
125        let mut packer = self.packer();
126        for (col_idx, _, _) in desc.iter_all() {
127            let d = match proto.datums.get(col_idx.to_raw()) {
128                Some(x) => x,
129                None => {
130                    packer.push(Datum::Null);
131                    continue;
132                }
133            };
134            packer.try_push_proto(d)?;
135        }
136
137        Ok(())
138    }
139
140    /// Allocate an empty `Row` with a pre-allocated capacity.
141    #[inline]
142    pub fn with_capacity(cap: usize) -> Self {
143        Self {
144            data: CompactBytes::with_capacity(cap),
145        }
146    }
147
148    /// Create an empty `Row`.
149    #[inline]
150    pub const fn empty() -> Self {
151        Self {
152            data: CompactBytes::empty(),
153        }
154    }
155
156    /// Creates a new row from supplied bytes.
157    ///
158    /// # Safety
159    ///
160    /// This method relies on `data` being an appropriate row encoding, and can
161    /// result in unsafety if this is not the case.
162    pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
163        Row {
164            data: CompactBytes::new(data),
165        }
166    }
167
168    /// Constructs a [`RowPacker`] that will pack datums into this row's
169    /// allocation.
170    ///
171    /// This method clears the existing contents of the row, but retains the
172    /// allocation.
173    pub fn packer(&mut self) -> RowPacker<'_> {
174        self.clear();
175        RowPacker { row: self }
176    }
177
178    /// Take some `Datum`s and pack them into a `Row`.
179    ///
180    /// This method builds a `Row` by repeatedly increasing the backing
181    /// allocation. If the contents of the iterator are known ahead of
182    /// time, consider [`Row::with_capacity`] to right-size the allocation
183    /// first, and then [`RowPacker::extend`] to populate it with `Datum`s.
184    /// This avoids the repeated allocation resizing and copying.
185    pub fn pack<'a, I, D>(iter: I) -> Row
186    where
187        I: IntoIterator<Item = D>,
188        D: Borrow<Datum<'a>>,
189    {
190        let mut row = Row::default();
191        row.packer().extend(iter);
192        row
193    }
194
195    /// Use `self` to pack `iter`, and then clone the result.
196    ///
197    /// This is a convenience method meant to reduce boilerplate around row
198    /// formation.
199    pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
200    where
201        I: IntoIterator<Item = D>,
202        D: Borrow<Datum<'a>>,
203    {
204        self.packer().extend(iter);
205        self.clone()
206    }
207
208    /// Like [`Row::pack`], but the provided iterator is allowed to produce an
209    /// error, in which case the packing operation is aborted and the error
210    /// returned.
211    pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
212    where
213        I: IntoIterator<Item = Result<D, E>>,
214        D: Borrow<Datum<'a>>,
215    {
216        let mut row = Row::default();
217        row.packer().try_extend(iter)?;
218        Ok(row)
219    }
220
221    /// Pack a slice of `Datum`s into a `Row`.
222    ///
223    /// This method has the advantage over `pack` that it can determine the required
224    /// allocation before packing the elements, ensuring only one allocation and no
225    /// redundant copies required.
226    pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
227        // Pre-allocate the needed number of bytes.
228        let mut row = Row::with_capacity(datums_size(slice.iter()));
229        row.packer().extend(slice.iter());
230        row
231    }
232
233    /// Returns the total amount of bytes used by this row.
234    pub fn byte_len(&self) -> usize {
235        let heap_size = if self.data.spilled() {
236            self.data.len()
237        } else {
238            0
239        };
240        let inline_size = std::mem::size_of::<Self>();
241        inline_size.saturating_add(heap_size)
242    }
243
244    /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself.
245    pub fn data_len(&self) -> usize {
246        self.data.len()
247    }
248
249    /// Returns the total capacity in bytes used by this row.
250    pub fn byte_capacity(&self) -> usize {
251        self.data.capacity()
252    }
253
254    /// Extracts a Row slice containing the entire [`Row`].
255    #[inline]
256    pub fn as_row_ref(&self) -> &RowRef {
257        // SAFETY: `Row` contains valid row data, by construction.
258        unsafe { RowRef::from_slice(self.data.as_slice()) }
259    }
260
261    /// Clear the contents of the [`Row`], leaving any allocation in place.
262    #[inline]
263    fn clear(&mut self) {
264        self.data.clear();
265    }
266}
267
268impl Borrow<RowRef> for Row {
269    #[inline]
270    fn borrow(&self) -> &RowRef {
271        self.as_row_ref()
272    }
273}
274
275impl AsRef<RowRef> for Row {
276    #[inline]
277    fn as_ref(&self) -> &RowRef {
278        self.as_row_ref()
279    }
280}
281
282impl Deref for Row {
283    type Target = RowRef;
284
285    #[inline]
286    fn deref(&self) -> &Self::Target {
287        self.as_row_ref()
288    }
289}
290
291// Nothing depends on Row being exactly 24, we just want to add visibility to the size.
292static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
293
294impl Clone for Row {
295    fn clone(&self) -> Self {
296        Row {
297            data: self.data.clone(),
298        }
299    }
300
301    fn clone_from(&mut self, source: &Self) {
302        self.data.clone_from(&source.data);
303    }
304}
305
306// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
307impl std::hash::Hash for Row {
308    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
309        self.as_row_ref().hash(state)
310    }
311}
312
313impl Arbitrary for Row {
314    type Parameters = prop::collection::SizeRange;
315    type Strategy = BoxedStrategy<Row>;
316
317    fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
318        prop::collection::vec(arb_datum(true), size)
319            .prop_map(|items| {
320                let mut row = Row::default();
321                let mut packer = row.packer();
322                for item in items.iter() {
323                    let datum: Datum<'_> = item.into();
324                    packer.push(datum);
325                }
326                row
327            })
328            .boxed()
329    }
330}
331
332impl PartialOrd for Row {
333    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
334        Some(self.cmp(other))
335    }
336}
337
338impl Ord for Row {
339    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
340        self.as_ref().cmp(other.as_ref())
341    }
342}
343
344#[allow(missing_debug_implementations)]
345mod columnation {
346    use columnation::{Columnation, Region};
347    use mz_ore::region::LgAllocRegion;
348
349    use crate::Row;
350
351    /// Region allocation for `Row` data.
352    ///
353    /// Content bytes are stored in stable contiguous memory locations,
354    /// and then a `Row` referencing them is falsified.
355    pub struct RowStack {
356        region: LgAllocRegion<u8>,
357    }
358
359    impl RowStack {
360        const LIMIT: usize = 2 << 20;
361    }
362
363    // Implement `Default` manually to specify a region allocation limit.
364    impl Default for RowStack {
365        fn default() -> Self {
366            Self {
367                // Limit the region size to 2MiB.
368                region: LgAllocRegion::with_limit(Self::LIMIT),
369            }
370        }
371    }
372
373    impl Columnation for Row {
374        type InnerRegion = RowStack;
375    }
376
377    impl Region for RowStack {
378        type Item = Row;
379        #[inline]
380        fn clear(&mut self) {
381            self.region.clear();
382        }
383        #[inline(always)]
384        unsafe fn copy(&mut self, item: &Row) -> Row {
385            if item.data.spilled() {
386                let bytes = self.region.copy_slice(&item.data[..]);
387                Row {
388                    data: compact_bytes::CompactBytes::from_raw_parts(
389                        bytes.as_mut_ptr(),
390                        item.data.len(),
391                        item.data.capacity(),
392                    ),
393                }
394            } else {
395                item.clone()
396            }
397        }
398
399        fn reserve_items<'a, I>(&mut self, items: I)
400        where
401            Self: 'a,
402            I: Iterator<Item = &'a Self::Item> + Clone,
403        {
404            let size = items
405                .filter(|row| row.data.spilled())
406                .map(|row| row.data.len())
407                .sum();
408            let size = std::cmp::min(size, Self::LIMIT);
409            self.region.reserve(size);
410        }
411
412        fn reserve_regions<'a, I>(&mut self, regions: I)
413        where
414            Self: 'a,
415            I: Iterator<Item = &'a Self> + Clone,
416        {
417            let size = regions.map(|r| r.region.len()).sum();
418            let size = std::cmp::min(size, Self::LIMIT);
419            self.region.reserve(size);
420        }
421
422        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
423            self.region.heap_size(callback)
424        }
425    }
426}
427
428mod columnar {
429    use columnar::common::PushIndexAs;
430    use columnar::{
431        AsBytes, Borrow, Clear, Columnar, Container, FromBytes, Index, IndexAs, Len, Push,
432    };
433    use mz_ore::cast::CastFrom;
434    use std::ops::Range;
435
436    use crate::{Row, RowRef};
437
438    #[derive(
439        Copy,
440        Clone,
441        Debug,
442        Default,
443        PartialEq,
444        serde::Serialize,
445        serde::Deserialize
446    )]
447    pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
448        /// Bounds container; provides indexed access to offsets.
449        bounds: BC,
450        /// Values container; provides slice access to bytes.
451        values: VC,
452    }
453
454    impl Columnar for Row {
455        #[inline(always)]
456        fn copy_from(&mut self, other: columnar::Ref<'_, Self>) {
457            self.clear();
458            self.data.extend_from_slice(other.data());
459        }
460        #[inline(always)]
461        fn into_owned(other: columnar::Ref<'_, Self>) -> Self {
462            other.to_owned()
463        }
464        type Container = Rows;
465        #[inline(always)]
466        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
467        where
468            Self: 'a,
469        {
470            thing
471        }
472    }
473
474    impl<BC: PushIndexAs<u64>> Borrow for Rows<BC, Vec<u8>> {
475        type Ref<'a> = &'a RowRef;
476        type Borrowed<'a>
477            = Rows<BC::Borrowed<'a>, &'a [u8]>
478        where
479            Self: 'a;
480        #[inline(always)]
481        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
482            Rows {
483                bounds: self.bounds.borrow(),
484                values: self.values.borrow(),
485            }
486        }
487        #[inline(always)]
488        fn reborrow<'c, 'a: 'c>(item: Self::Borrowed<'a>) -> Self::Borrowed<'c>
489        where
490            Self: 'a,
491        {
492            Rows {
493                bounds: BC::reborrow(item.bounds),
494                values: item.values,
495            }
496        }
497
498        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
499        where
500            Self: 'a,
501        {
502            item
503        }
504    }
505
506    impl<BC: PushIndexAs<u64>> Container for Rows<BC, Vec<u8>> {
507        fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
508            if !range.is_empty() {
509                // Imported bounds will be relative to this starting offset.
510                let values_len: u64 = self.values.len().try_into().expect("must fit");
511
512                // Push all bytes that we can, all at once.
513                let other_lower = if range.start == 0 {
514                    0
515                } else {
516                    other.bounds.index_as(range.start - 1)
517                };
518                let other_upper = other.bounds.index_as(range.end - 1);
519                self.values.extend_from_self(
520                    other.values,
521                    usize::try_from(other_lower).expect("must fit")
522                        ..usize::try_from(other_upper).expect("must fit"),
523                );
524
525                // Each bound needs to be shifted by `values_len - other_lower`.
526                if values_len == other_lower {
527                    self.bounds.extend_from_self(other.bounds, range);
528                } else {
529                    for index in range {
530                        let shifted = other.bounds.index_as(index) - other_lower + values_len;
531                        self.bounds.push(&shifted)
532                    }
533                }
534            }
535        }
536        fn reserve_for<'a, I>(&mut self, selves: I)
537        where
538            Self: 'a,
539            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
540        {
541            self.bounds.reserve_for(selves.clone().map(|r| r.bounds));
542            self.values.reserve_for(selves.map(|r| r.values));
543        }
544    }
545
546    impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
547        #[inline(always)]
548        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
549            columnar::chain(self.bounds.as_bytes(), self.values.as_bytes())
550        }
551    }
552    impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
553        const SLICE_COUNT: usize = BC::SLICE_COUNT + VC::SLICE_COUNT;
554        #[inline(always)]
555        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
556            Self {
557                bounds: FromBytes::from_bytes(bytes),
558                values: FromBytes::from_bytes(bytes),
559            }
560        }
561    }
562
563    impl<BC: Len, VC> Len for Rows<BC, VC> {
564        #[inline(always)]
565        fn len(&self) -> usize {
566            self.bounds.len()
567        }
568    }
569
570    impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
571        type Ref = &'a RowRef;
572        #[inline(always)]
573        fn get(&self, index: usize) -> Self::Ref {
574            let lower = if index == 0 {
575                0
576            } else {
577                self.bounds.index_as(index - 1)
578            };
579            let upper = self.bounds.index_as(index);
580            let lower = usize::cast_from(lower);
581            let upper = usize::cast_from(upper);
582            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
583            // that correspond to the original rows.
584            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
585        }
586    }
587    impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
588        type Ref = &'a RowRef;
589        #[inline(always)]
590        fn get(&self, index: usize) -> Self::Ref {
591            let lower = if index == 0 {
592                0
593            } else {
594                self.bounds.index_as(index - 1)
595            };
596            let upper = self.bounds.index_as(index);
597            let lower = usize::cast_from(lower);
598            let upper = usize::cast_from(upper);
599            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
600            // that correspond to the original rows.
601            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
602        }
603    }
604
605    impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
606        #[inline(always)]
607        fn push(&mut self, item: &Row) {
608            self.values.extend_from_slice(item.data.as_slice());
609            self.bounds.push(u64::cast_from(self.values.len()));
610        }
611    }
612    impl<BC: for<'a> Push<&'a u64>> Push<&RowRef> for Rows<BC> {
613        #[inline(always)]
614        fn push(&mut self, item: &RowRef) {
615            self.values.extend_from_slice(item.data());
616            self.bounds.push(&u64::cast_from(self.values.len()));
617        }
618    }
619    impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
620        #[inline(always)]
621        fn clear(&mut self) {
622            self.bounds.clear();
623            self.values.clear();
624        }
625    }
626}
627
628/// A contiguous slice of bytes that are row data.
629///
630/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
631#[derive(PartialEq, Eq, Hash)]
632#[repr(transparent)]
633pub struct RowRef([u8]);
634
635impl RowRef {
636    /// Create a [`RowRef`] from a slice of data.
637    ///
638    /// # Safety
639    ///
640    /// We do not check that the provided slice is valid [`Row`] data; the caller is required to
641    /// ensure this.
642    pub unsafe fn from_slice(row: &[u8]) -> &RowRef {
643        #[allow(clippy::as_conversions)]
644        let ptr = row as *const [u8] as *const RowRef;
645        // SAFETY: We know `ptr` is non-null and aligned because it came from a &[u8].
646        unsafe { &*ptr }
647    }
648
649    /// Unpack `self` into a `Vec<Datum>` for efficient random access.
650    pub fn unpack(&self) -> Vec<Datum<'_>> {
651        // It's usually cheaper to unpack twice to figure out the right length than it is to grow the vec as we go
652        let len = self.iter().count();
653        let mut vec = Vec::with_capacity(len);
654        vec.extend(self.iter());
655        vec
656    }
657
658    /// Return the first [`Datum`] in `self`
659    ///
660    /// Panics if the [`RowRef`] is empty.
661    pub fn unpack_first(&self) -> Datum<'_> {
662        self.iter().next().unwrap()
663    }
664
665    /// Iterate the [`Datum`] elements of the [`RowRef`].
666    pub fn iter(&self) -> DatumListIter<'_> {
667        DatumListIter { data: &self.0 }
668    }
669
670    /// Return the byte length of this [`RowRef`].
671    pub fn byte_len(&self) -> usize {
672        self.0.len()
673    }
674
675    /// For debugging only.
676    pub fn data(&self) -> &[u8] {
677        &self.0
678    }
679
680    /// True iff there is no data in this [`RowRef`].
681    pub fn is_empty(&self) -> bool {
682        self.0.is_empty()
683    }
684}
685
686impl ToOwned for RowRef {
687    type Owned = Row;
688
689    fn to_owned(&self) -> Self::Owned {
690        // SAFETY: RowRef has the invariant that the wrapped data must be a valid Row encoding.
691        unsafe { Row::from_bytes_unchecked(&self.0) }
692    }
693}
694
695impl<'a> IntoIterator for &'a RowRef {
696    type Item = Datum<'a>;
697    type IntoIter = DatumListIter<'a>;
698
699    fn into_iter(self) -> DatumListIter<'a> {
700        DatumListIter { data: &self.0 }
701    }
702}
703
704/// These implementations order first by length, and then by slice contents.
705/// This allows many comparisons to complete without dereferencing memory.
706/// Warning: These order by the u8 array representation, and NOT by Datum::cmp.
707impl PartialOrd for RowRef {
708    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
709        Some(self.cmp(other))
710    }
711}
712
713impl Ord for RowRef {
714    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
715        match self.0.len().cmp(&other.0.len()) {
716            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
717            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
718            std::cmp::Ordering::Equal => self.0.cmp(&other.0),
719        }
720    }
721}
722
723impl fmt::Debug for RowRef {
724    /// Debug representation using the internal datums
725    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
726        f.write_str("RowRef{")?;
727        f.debug_list().entries(self.into_iter()).finish()?;
728        f.write_str("}")
729    }
730}
731
732/// Packs datums into a [`Row`].
733///
734/// Creating a `RowPacker` via [`Row::packer`] starts a packing operation on the
735/// row. A packing operation always starts from scratch: the existing contents
736/// of the underlying row are cleared.
737///
738/// To complete a packing operation, drop the `RowPacker`.
739#[derive(Debug)]
740pub struct RowPacker<'a> {
741    row: &'a mut Row,
742}
743
744/// Infallible conversion from a [`Datum`] to a typed value.
745///
746/// Used by [`DatumList::typed_iter`] to yield elements as `T` rather than
747/// raw `Datum`s. At runtime, `T` is always `Datum<'a>`, so the conversion
748/// is identity.
749///
750/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
751/// behind the generic type parameter and type erasure.
752///
753/// This trait is sealed and cannot be implemented outside of this crate.
754pub trait FromDatum<'a>:
755    Sized + PartialEq + std::borrow::Borrow<Datum<'a>> + sealed::Sealed
756{
757    fn from_datum(datum: Datum<'a>) -> Self;
758}
759
760mod sealed {
761    use crate::Datum;
762
763    pub trait Sealed {}
764    impl<'a> Sealed for Datum<'a> {}
765}
766
767impl<'a> FromDatum<'a> for Datum<'a> {
768    #[inline]
769    fn from_datum(datum: Datum<'a>) -> Self {
770        datum
771    }
772}
773
774#[derive(Debug, Clone)]
775pub struct DatumListIter<'a> {
776    data: &'a [u8],
777}
778
779#[derive(Debug, Clone)]
780pub struct DatumListTypedIter<'a, T> {
781    inner: DatumListIter<'a>,
782    _phantom: PhantomData<fn() -> T>,
783}
784
785#[derive(Debug, Clone)]
786pub struct DatumDictIter<'a> {
787    data: &'a [u8],
788    prev_key: Option<&'a str>,
789}
790
791#[derive(Debug, Clone)]
792pub struct DatumDictTypedIter<'a, T> {
793    inner: DatumDictIter<'a>,
794    _phantom: PhantomData<fn() -> T>,
795}
796
797/// `RowArena` is used to hold on to temporary `Row`s for functions like `eval` that need to create complex `Datum`s but don't have a `Row` to put them in yet.
798#[derive(Debug)]
799pub struct RowArena {
800    // Semantically, this field would be better represented by a `Vec<Box<[u8]>>`,
801    // as once the arena takes ownership of a byte vector the vector is never
802    // modified. But `RowArena::push_bytes` takes ownership of a `Vec<u8>`, so
803    // storing that `Vec<u8>` directly avoids an allocation. The cost is
804    // additional memory use, as the vector may have spare capacity, but row
805    // arenas are short lived so this is the better tradeoff.
806    inner: RefCell<Vec<Vec<u8>>>,
807}
808
809// DatumList and DatumDict defined here rather than near Datum because we need private access to the unsafe data field
810
811/// A sequence of Datums
812///
813/// The type parameter `T` represents the element type of the list. It is a
814/// phantom parameter that carries no runtime data — the actual elements are
815/// stored as serialized bytes and `T` is not enforced at runtime. It is up
816/// to the caller to ensure `T` matches the actual element type. The default
817/// `T = Datum<'a>` means existing code that writes `DatumList<'a>` continues
818/// to work unchanged.
819///
820/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
821/// behind the generic type parameter.
822pub struct DatumList<'a, T = Datum<'a>> {
823    /// Points at the serialized datums
824    data: &'a [u8],
825    _phantom: PhantomData<fn() -> T>,
826}
827
828impl<'a, T> DatumList<'a, T> {
829    /// Private constructor. All `DatumList` values should be created through
830    /// this function to keep the `PhantomData` bookkeeping in one place.
831    pub(crate) fn new(data: &'a [u8]) -> Self {
832        DatumList {
833            data,
834            _phantom: PhantomData,
835        }
836    }
837}
838
839impl<'a, T> Clone for DatumList<'a, T> {
840    fn clone(&self) -> Self {
841        *self
842    }
843}
844
845impl<'a, T> Copy for DatumList<'a, T> {}
846
847impl<'a, T> Debug for DatumList<'a, T> {
848    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
849        f.debug_list().entries(self.iter()).finish()
850    }
851}
852
853impl<'a, T> PartialEq for DatumList<'a, T> {
854    #[inline(always)]
855    fn eq(&self, other: &DatumList<'a, T>) -> bool {
856        self.iter().eq(other.iter())
857    }
858}
859
860impl<'a, T> Eq for DatumList<'a, T> {}
861
862impl<'a, T> Hash for DatumList<'a, T> {
863    #[inline(always)]
864    fn hash<H: Hasher>(&self, state: &mut H) {
865        for d in self.iter() {
866            d.hash(state);
867        }
868    }
869}
870
871impl<T> Ord for DatumList<'_, T> {
872    #[inline(always)]
873    fn cmp(&self, other: &DatumList<'_, T>) -> Ordering {
874        self.iter().cmp(other.iter())
875    }
876}
877
878impl<T> PartialOrd for DatumList<'_, T> {
879    #[inline(always)]
880    fn partial_cmp(&self, other: &DatumList<'_, T>) -> Option<Ordering> {
881        Some(self.cmp(other))
882    }
883}
884
885/// A mapping from string keys to Datums
886///
887/// The type parameter `T` represents the value type of the map. It is a
888/// phantom parameter — the actual values are stored as serialized bytes and
889/// `T` is not enforced at runtime. It is up to the caller to ensure `T`
890/// matches the actual value type. The default `T = Datum<'a>` means existing
891/// code that writes `DatumMap<'a>` continues to work unchanged.
892///
893/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
894/// behind the generic type parameter.
895pub struct DatumMap<'a, T = Datum<'a>> {
896    /// Points at the serialized datums, which should be sorted in key order
897    data: &'a [u8],
898    _phantom: PhantomData<fn() -> T>,
899}
900
901impl<'a, T> DatumMap<'a, T> {
902    /// Private constructor. All `DatumMap` values should be created through
903    /// this function to keep the `PhantomData` bookkeeping in one place.
904    pub(crate) fn new(data: &'a [u8]) -> Self {
905        DatumMap {
906            data,
907            _phantom: PhantomData,
908        }
909    }
910}
911
912impl<'a, T> Clone for DatumMap<'a, T> {
913    fn clone(&self) -> Self {
914        *self
915    }
916}
917
918impl<'a, T> Copy for DatumMap<'a, T> {}
919
920impl<'a, T> PartialEq for DatumMap<'a, T> {
921    #[inline(always)]
922    fn eq(&self, other: &DatumMap<'a, T>) -> bool {
923        self.iter().eq(other.iter())
924    }
925}
926
927impl<'a, T> Eq for DatumMap<'a, T> {}
928
929impl<'a, T> Hash for DatumMap<'a, T> {
930    #[inline(always)]
931    fn hash<H: Hasher>(&self, state: &mut H) {
932        for (k, v) in self.iter() {
933            k.hash(state);
934            v.hash(state);
935        }
936    }
937}
938
939impl<'a, T> Ord for DatumMap<'a, T> {
940    #[inline(always)]
941    fn cmp(&self, other: &DatumMap<'a, T>) -> Ordering {
942        self.iter().cmp(other.iter())
943    }
944}
945
946impl<'a, T> PartialOrd for DatumMap<'a, T> {
947    #[inline(always)]
948    fn partial_cmp(&self, other: &DatumMap<'a, T>) -> Option<Ordering> {
949        Some(self.cmp(other))
950    }
951}
952
953impl<'a> crate::scalar::SqlContainerType for DatumList<'a, Datum<'a>> {
954    fn unwrap_element_type(container: &SqlScalarType) -> &SqlScalarType {
955        container.unwrap_list_element_type()
956    }
957    fn wrap_element_type(element: SqlScalarType) -> SqlScalarType {
958        SqlScalarType::List {
959            element_type: Box::new(element),
960            custom_id: None,
961        }
962    }
963}
964
965impl<'a> crate::scalar::SqlContainerType for DatumMap<'a, Datum<'a>> {
966    fn unwrap_element_type(container: &SqlScalarType) -> &SqlScalarType {
967        container.unwrap_map_value_type()
968    }
969    fn wrap_element_type(element: SqlScalarType) -> SqlScalarType {
970        SqlScalarType::Map {
971            value_type: Box::new(element),
972            custom_id: None,
973        }
974    }
975}
976
977/// Represents a single `Datum`, appropriate to be nested inside other
978/// `Datum`s.
979#[derive(Clone, Copy, Eq, PartialEq, Hash)]
980pub struct DatumNested<'a> {
981    val: &'a [u8],
982}
983
984impl<'a> std::fmt::Display for DatumNested<'a> {
985    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
986        std::fmt::Display::fmt(&self.datum(), f)
987    }
988}
989
990impl<'a> std::fmt::Debug for DatumNested<'a> {
991    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
992        f.debug_struct("DatumNested")
993            .field("val", &self.datum())
994            .finish()
995    }
996}
997
998impl<'a> DatumNested<'a> {
999    // Figure out which bytes `read_datum` returns (e.g. including the tag),
1000    // and then store a reference to those bytes, so we can "replay" this same
1001    // call later on without storing the datum itself.
1002    pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
1003        let prev = *data;
1004        let _ = unsafe { read_datum(data) };
1005        DatumNested {
1006            val: &prev[..(prev.len() - data.len())],
1007        }
1008    }
1009
1010    /// Returns the datum `self` contains.
1011    pub fn datum(&self) -> Datum<'a> {
1012        let mut temp = self.val;
1013        unsafe { read_datum(&mut temp) }
1014    }
1015}
1016
1017impl<'a> Ord for DatumNested<'a> {
1018    fn cmp(&self, other: &Self) -> Ordering {
1019        self.datum().cmp(&other.datum())
1020    }
1021}
1022
1023impl<'a> PartialOrd for DatumNested<'a> {
1024    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1025        Some(self.cmp(other))
1026    }
1027}
1028
1029// Prefer adding new tags to the end of the enum. Certain behavior, like row ordering and EXPLAIN
1030// PHYSICAL PLAN, rely on the ordering of this enum. Neither of these are breaking changes, but
1031// it's annoying when they change.
1032#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
1033#[repr(u8)]
1034enum Tag {
1035    Null,
1036    False,
1037    True,
1038    Int16,
1039    Int32,
1040    Int64,
1041    UInt8,
1042    UInt32,
1043    Float32,
1044    Float64,
1045    Date,
1046    Time,
1047    Timestamp,
1048    TimestampTz,
1049    Interval,
1050    BytesTiny,
1051    BytesShort,
1052    BytesLong,
1053    BytesHuge,
1054    StringTiny,
1055    StringShort,
1056    StringLong,
1057    StringHuge,
1058    Uuid,
1059    Array,
1060    ListTiny,
1061    ListShort,
1062    ListLong,
1063    ListHuge,
1064    Dict,
1065    JsonNull,
1066    Dummy,
1067    Numeric,
1068    UInt16,
1069    UInt64,
1070    MzTimestamp,
1071    Range,
1072    MzAclItem,
1073    AclItem,
1074    // Everything except leap seconds and times beyond the range of
1075    // i64 nanoseconds. (Note that Materialize does not support leap
1076    // seconds, but this module does).
1077    CheapTimestamp,
1078    // Everything except leap seconds and times beyond the range of
1079    // i64 nanoseconds. (Note that Materialize does not support leap
1080    // seconds, but this module does).
1081    CheapTimestampTz,
1082    // The next several tags are for variable-length signed integer encoding.
1083    // The basic idea is that `NonNegativeIntN_K` is used to encode a datum of type
1084    // IntN whose actual value is positive or zero and fits in K bits, and similarly for
1085    // NegativeIntN_K with negative values.
1086    //
1087    // The order of these tags matters, because we want to be able to choose the
1088    // tag for a given datum quickly, with arithmetic, rather than slowly, with a
1089    // stack of `if` statements.
1090    //
1091    // Separate tags for non-negative and negative numbers are used to avoid having to
1092    // waste one bit in the actual data space to encode the sign.
1093    NonNegativeInt16_0, // i.e., 0
1094    NonNegativeInt16_8,
1095    NonNegativeInt16_16,
1096
1097    NonNegativeInt32_0,
1098    NonNegativeInt32_8,
1099    NonNegativeInt32_16,
1100    NonNegativeInt32_24,
1101    NonNegativeInt32_32,
1102
1103    NonNegativeInt64_0,
1104    NonNegativeInt64_8,
1105    NonNegativeInt64_16,
1106    NonNegativeInt64_24,
1107    NonNegativeInt64_32,
1108    NonNegativeInt64_40,
1109    NonNegativeInt64_48,
1110    NonNegativeInt64_56,
1111    NonNegativeInt64_64,
1112
1113    NegativeInt16_0, // i.e., -1
1114    NegativeInt16_8,
1115    NegativeInt16_16,
1116
1117    NegativeInt32_0,
1118    NegativeInt32_8,
1119    NegativeInt32_16,
1120    NegativeInt32_24,
1121    NegativeInt32_32,
1122
1123    NegativeInt64_0,
1124    NegativeInt64_8,
1125    NegativeInt64_16,
1126    NegativeInt64_24,
1127    NegativeInt64_32,
1128    NegativeInt64_40,
1129    NegativeInt64_48,
1130    NegativeInt64_56,
1131    NegativeInt64_64,
1132
1133    // These are like the ones above, but for unsigned types. The
1134    // situation is slightly simpler as we don't have negatives.
1135    UInt8_0, // i.e., 0
1136    UInt8_8,
1137
1138    UInt16_0,
1139    UInt16_8,
1140    UInt16_16,
1141
1142    UInt32_0,
1143    UInt32_8,
1144    UInt32_16,
1145    UInt32_24,
1146    UInt32_32,
1147
1148    UInt64_0,
1149    UInt64_8,
1150    UInt64_16,
1151    UInt64_24,
1152    UInt64_32,
1153    UInt64_40,
1154    UInt64_48,
1155    UInt64_56,
1156    UInt64_64,
1157}
1158
1159impl Tag {
1160    fn actual_int_length(self) -> Option<usize> {
1161        use Tag::*;
1162        let val = match self {
1163            NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
1164            | UInt32_0 | UInt64_0 => 0,
1165            NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
1166            | UInt32_8 | UInt64_8 => 1,
1167            NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
1168            | UInt32_16 | UInt64_16 => 2,
1169            NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
1170            NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
1171            NonNegativeInt64_40 | UInt64_40 => 5,
1172            NonNegativeInt64_48 | UInt64_48 => 6,
1173            NonNegativeInt64_56 | UInt64_56 => 7,
1174            NonNegativeInt64_64 | UInt64_64 => 8,
1175            NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
1176            NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
1177            NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
1178            NegativeInt32_24 | NegativeInt64_24 => 3,
1179            NegativeInt32_32 | NegativeInt64_32 => 4,
1180            NegativeInt64_40 => 5,
1181            NegativeInt64_48 => 6,
1182            NegativeInt64_56 => 7,
1183            NegativeInt64_64 => 8,
1184
1185            _ => return None,
1186        };
1187        Some(val)
1188    }
1189}
1190
1191// --------------------------------------------------------------------------------
1192// reading data
1193
1194/// Read a byte slice starting at byte `offset`.
1195///
1196/// Updates `offset` to point to the first byte after the end of the read region.
1197fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
1198    let len = u64::from_le_bytes(read_byte_array(data));
1199    let len = usize::cast_from(len);
1200    let (bytes, next) = data.split_at(len);
1201    *data = next;
1202    bytes
1203}
1204
1205/// Read a data whose length is encoded in the row before its contents.
1206///
1207/// Updates `offset` to point to the first byte after the end of the read region.
1208///
1209/// # Safety
1210///
1211/// This function is safe if the datum's length and contents were previously written by `push_lengthed_bytes`,
1212/// and it was only written with a `String` tag if it was indeed UTF-8.
1213unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
1214    let len = match tag {
1215        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
1216        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1217            usize::from(u16::from_le_bytes(read_byte_array(data)))
1218        }
1219        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1220            usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
1221        }
1222        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1223            usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
1224        }
1225        _ => unreachable!(),
1226    };
1227    let (bytes, next) = data.split_at(len);
1228    *data = next;
1229    match tag {
1230        Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
1231        Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
1232            Datum::String(str::from_utf8_unchecked(bytes))
1233        }
1234        Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
1235            Datum::List(DatumList::new(bytes))
1236        }
1237        _ => unreachable!(),
1238    }
1239}
1240
1241fn read_byte(data: &mut &[u8]) -> u8 {
1242    let byte = data[0];
1243    *data = &data[1..];
1244    byte
1245}
1246
1247/// Read `length` bytes from `data` at `offset`, updating the
1248/// latter. Extend the resulting buffer to an array of `N` bytes by
1249/// inserting `FILL` in the k most significant bytes, where k = N - length.
1250///
1251/// SAFETY:
1252///   * length <= N
1253///   * offset + length <= data.len()
1254fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1255    data: &mut &[u8],
1256    length: usize,
1257) -> [u8; N] {
1258    let mut raw = [FILL; N];
1259    let (prev, next) = data.split_at(length);
1260    (raw[..prev.len()]).copy_from_slice(prev);
1261    *data = next;
1262    raw
1263}
1264/// Read `length` bytes from `data` at `offset`, updating the
1265/// latter. Extend the resulting buffer to a negative `N`-byte
1266/// twos complement integer by filling the remaining bits with 1.
1267///
1268/// SAFETY:
1269///   * length <= N
1270///   * offset + length <= data.len()
1271fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1272    read_byte_array_sign_extending::<N, 255>(data, length)
1273}
1274
1275/// Read `length` bytes from `data` at `offset`, updating the
1276/// latter. Extend the resulting buffer to a positive or zero `N`-byte
1277/// twos complement integer by filling the remaining bits with 0.
1278///
1279/// SAFETY:
1280///   * length <= N
1281///   * offset + length <= data.len()
1282fn read_byte_array_extending_nonnegative<const N: usize>(
1283    data: &mut &[u8],
1284    length: usize,
1285) -> [u8; N] {
1286    read_byte_array_sign_extending::<N, 0>(data, length)
1287}
1288
1289pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1290    let (prev, next) = data.split_first_chunk().unwrap();
1291    *data = next;
1292    *prev
1293}
1294
1295pub(super) fn read_date(data: &mut &[u8]) -> Date {
1296    let days = i32::from_le_bytes(read_byte_array(data));
1297    Date::from_pg_epoch(days).expect("unexpected date")
1298}
1299
1300pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1301    let year = i32::from_le_bytes(read_byte_array(data));
1302    let ordinal = u32::from_le_bytes(read_byte_array(data));
1303    NaiveDate::from_yo_opt(year, ordinal).unwrap()
1304}
1305
1306pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1307    let secs = u32::from_le_bytes(read_byte_array(data));
1308    let nanos = u32::from_le_bytes(read_byte_array(data));
1309    NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1310}
1311
1312/// Read a datum starting at byte `offset`.
1313///
1314/// Updates `offset` to point to the first byte after the end of the read region.
1315///
1316/// # Safety
1317///
1318/// This function is safe if a `Datum` was previously written at this offset by `push_datum`.
1319/// Otherwise it could return invalid values, which is Undefined Behavior.
1320pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1321    let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1322    match tag {
1323        Tag::Null => Datum::Null,
1324        Tag::False => Datum::False,
1325        Tag::True => Datum::True,
1326        Tag::UInt8_0 | Tag::UInt8_8 => {
1327            let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1328                data,
1329                tag.actual_int_length()
1330                    .expect("returns a value for variable-length-encoded integer tags"),
1331            ));
1332            Datum::UInt8(i)
1333        }
1334        Tag::Int16 => {
1335            let i = i16::from_le_bytes(read_byte_array(data));
1336            Datum::Int16(i)
1337        }
1338        Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1339            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1340            // and `data` is big enough because it was encoded validly. These assumptions
1341            // are checked in debug asserts.
1342            let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1343                data,
1344                tag.actual_int_length()
1345                    .expect("returns a value for variable-length-encoded integer tags"),
1346            ));
1347            Datum::Int16(i)
1348        }
1349        Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1350            let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1351                data,
1352                tag.actual_int_length()
1353                    .expect("returns a value for variable-length-encoded integer tags"),
1354            ));
1355            Datum::UInt16(i)
1356        }
1357        Tag::Int32 => {
1358            let i = i32::from_le_bytes(read_byte_array(data));
1359            Datum::Int32(i)
1360        }
1361        Tag::NonNegativeInt32_0
1362        | Tag::NonNegativeInt32_32
1363        | Tag::NonNegativeInt32_8
1364        | Tag::NonNegativeInt32_16
1365        | Tag::NonNegativeInt32_24 => {
1366            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1367            // and `data` is big enough because it was encoded validly. These assumptions
1368            // are checked in debug asserts.
1369            let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1370                data,
1371                tag.actual_int_length()
1372                    .expect("returns a value for variable-length-encoded integer tags"),
1373            ));
1374            Datum::Int32(i)
1375        }
1376        Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1377            let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1378                data,
1379                tag.actual_int_length()
1380                    .expect("returns a value for variable-length-encoded integer tags"),
1381            ));
1382            Datum::UInt32(i)
1383        }
1384        Tag::Int64 => {
1385            let i = i64::from_le_bytes(read_byte_array(data));
1386            Datum::Int64(i)
1387        }
1388        Tag::NonNegativeInt64_0
1389        | Tag::NonNegativeInt64_64
1390        | Tag::NonNegativeInt64_8
1391        | Tag::NonNegativeInt64_16
1392        | Tag::NonNegativeInt64_24
1393        | Tag::NonNegativeInt64_32
1394        | Tag::NonNegativeInt64_40
1395        | Tag::NonNegativeInt64_48
1396        | Tag::NonNegativeInt64_56 => {
1397            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1398            // and `data` is big enough because it was encoded validly. These assumptions
1399            // are checked in debug asserts.
1400
1401            let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1402                data,
1403                tag.actual_int_length()
1404                    .expect("returns a value for variable-length-encoded integer tags"),
1405            ));
1406            Datum::Int64(i)
1407        }
1408        Tag::UInt64_0
1409        | Tag::UInt64_8
1410        | Tag::UInt64_16
1411        | Tag::UInt64_24
1412        | Tag::UInt64_32
1413        | Tag::UInt64_40
1414        | Tag::UInt64_48
1415        | Tag::UInt64_56
1416        | Tag::UInt64_64 => {
1417            let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1418                data,
1419                tag.actual_int_length()
1420                    .expect("returns a value for variable-length-encoded integer tags"),
1421            ));
1422            Datum::UInt64(i)
1423        }
1424        Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1425            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1426            // and `data` is big enough because it was encoded validly. These assumptions
1427            // are checked in debug asserts.
1428            let i = i16::from_le_bytes(read_byte_array_extending_negative(
1429                data,
1430                tag.actual_int_length()
1431                    .expect("returns a value for variable-length-encoded integer tags"),
1432            ));
1433            Datum::Int16(i)
1434        }
1435        Tag::NegativeInt32_0
1436        | Tag::NegativeInt32_32
1437        | Tag::NegativeInt32_8
1438        | Tag::NegativeInt32_16
1439        | Tag::NegativeInt32_24 => {
1440            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1441            // and `data` is big enough because it was encoded validly. These assumptions
1442            // are checked in debug asserts.
1443            let i = i32::from_le_bytes(read_byte_array_extending_negative(
1444                data,
1445                tag.actual_int_length()
1446                    .expect("returns a value for variable-length-encoded integer tags"),
1447            ));
1448            Datum::Int32(i)
1449        }
1450        Tag::NegativeInt64_0
1451        | Tag::NegativeInt64_64
1452        | Tag::NegativeInt64_8
1453        | Tag::NegativeInt64_16
1454        | Tag::NegativeInt64_24
1455        | Tag::NegativeInt64_32
1456        | Tag::NegativeInt64_40
1457        | Tag::NegativeInt64_48
1458        | Tag::NegativeInt64_56 => {
1459            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1460            // and `data` is big enough because the row was encoded validly. These assumptions
1461            // are checked in debug asserts.
1462            let i = i64::from_le_bytes(read_byte_array_extending_negative(
1463                data,
1464                tag.actual_int_length()
1465                    .expect("returns a value for variable-length-encoded integer tags"),
1466            ));
1467            Datum::Int64(i)
1468        }
1469
1470        Tag::UInt8 => {
1471            let i = u8::from_le_bytes(read_byte_array(data));
1472            Datum::UInt8(i)
1473        }
1474        Tag::UInt16 => {
1475            let i = u16::from_le_bytes(read_byte_array(data));
1476            Datum::UInt16(i)
1477        }
1478        Tag::UInt32 => {
1479            let i = u32::from_le_bytes(read_byte_array(data));
1480            Datum::UInt32(i)
1481        }
1482        Tag::UInt64 => {
1483            let i = u64::from_le_bytes(read_byte_array(data));
1484            Datum::UInt64(i)
1485        }
1486        Tag::Float32 => {
1487            let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1488            Datum::Float32(OrderedFloat::from(f))
1489        }
1490        Tag::Float64 => {
1491            let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1492            Datum::Float64(OrderedFloat::from(f))
1493        }
1494        Tag::Date => Datum::Date(read_date(data)),
1495        Tag::Time => Datum::Time(read_time(data)),
1496        Tag::CheapTimestamp => {
1497            let ts = i64::from_le_bytes(read_byte_array(data));
1498            let secs = ts.div_euclid(1_000_000_000);
1499            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1500            let ndt = DateTime::from_timestamp(secs, nsecs)
1501                .expect("We only write round-trippable timestamps")
1502                .naive_utc();
1503            Datum::Timestamp(
1504                CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1505            )
1506        }
1507        Tag::CheapTimestampTz => {
1508            let ts = i64::from_le_bytes(read_byte_array(data));
1509            let secs = ts.div_euclid(1_000_000_000);
1510            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1511            let dt = DateTime::from_timestamp(secs, nsecs)
1512                .expect("We only write round-trippable timestamps");
1513            Datum::TimestampTz(
1514                CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1515            )
1516        }
1517        Tag::Timestamp => {
1518            let date = read_naive_date(data);
1519            let time = read_time(data);
1520            Datum::Timestamp(
1521                CheckedTimestamp::from_timestamplike(date.and_time(time))
1522                    .expect("unexpected timestamp"),
1523            )
1524        }
1525        Tag::TimestampTz => {
1526            let date = read_naive_date(data);
1527            let time = read_time(data);
1528            Datum::TimestampTz(
1529                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1530                    date.and_time(time),
1531                    Utc,
1532                ))
1533                .expect("unexpected timestamptz"),
1534            )
1535        }
1536        Tag::Interval => {
1537            let months = i32::from_le_bytes(read_byte_array(data));
1538            let days = i32::from_le_bytes(read_byte_array(data));
1539            let micros = i64::from_le_bytes(read_byte_array(data));
1540            Datum::Interval(Interval {
1541                months,
1542                days,
1543                micros,
1544            })
1545        }
1546        Tag::BytesTiny
1547        | Tag::BytesShort
1548        | Tag::BytesLong
1549        | Tag::BytesHuge
1550        | Tag::StringTiny
1551        | Tag::StringShort
1552        | Tag::StringLong
1553        | Tag::StringHuge
1554        | Tag::ListTiny
1555        | Tag::ListShort
1556        | Tag::ListLong
1557        | Tag::ListHuge => read_lengthed_datum(data, tag),
1558        Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1559        Tag::Array => {
1560            // See the comment in `Row::push_array` for details on the encoding
1561            // of arrays.
1562            let ndims = read_byte(data);
1563            let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1564            let (dims, next) = data.split_at(dims_size);
1565            *data = next;
1566            let bytes = read_untagged_bytes(data);
1567            Datum::Array(Array {
1568                dims: ArrayDimensions { data: dims },
1569                elements: DatumList::new(bytes),
1570            })
1571        }
1572        Tag::Dict => {
1573            let bytes = read_untagged_bytes(data);
1574            Datum::Map(DatumMap::new(bytes))
1575        }
1576        Tag::JsonNull => Datum::JsonNull,
1577        Tag::Dummy => Datum::Dummy,
1578        Tag::Numeric => {
1579            let digits = read_byte(data).into();
1580            let exponent = i8::reinterpret_cast(read_byte(data));
1581            let bits = read_byte(data);
1582
1583            let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1584            let lsu_u8_len = lsu_u16_len * 2;
1585            let (lsu_u8, next) = data.split_at(lsu_u8_len);
1586            *data = next;
1587
1588            // TODO: if we refactor the decimal library to accept the owned
1589            // array as a parameter to `from_raw_parts` below, we could likely
1590            // avoid a copy because it is exactly the value we want
1591            let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1592            for (i, c) in lsu_u8.chunks(2).enumerate() {
1593                lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1594            }
1595
1596            let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1597            Datum::from(d)
1598        }
1599        Tag::MzTimestamp => {
1600            let t = Timestamp::decode(read_byte_array(data));
1601            Datum::MzTimestamp(t)
1602        }
1603        Tag::Range => {
1604            // See notes on `push_range_with` for details about encoding.
1605            let flag_byte = read_byte(data);
1606            let flags = range::InternalFlags::from_bits(flag_byte)
1607                .expect("range flags must be encoded validly");
1608
1609            if flags.contains(range::InternalFlags::EMPTY) {
1610                assert!(
1611                    flags == range::InternalFlags::EMPTY,
1612                    "empty ranges contain only RANGE_EMPTY flag"
1613                );
1614
1615                return Datum::Range(Range { inner: None });
1616            }
1617
1618            let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1619                None
1620            } else {
1621                Some(DatumNested::extract(data))
1622            };
1623
1624            let lower = RangeBound {
1625                inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1626                bound: lower_bound,
1627            };
1628
1629            let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1630                None
1631            } else {
1632                Some(DatumNested::extract(data))
1633            };
1634
1635            let upper = RangeBound {
1636                inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1637                bound: upper_bound,
1638            };
1639
1640            Datum::Range(Range {
1641                inner: Some(RangeInner { lower, upper }),
1642            })
1643        }
1644        Tag::MzAclItem => {
1645            const N: usize = MzAclItem::binary_size();
1646            let mz_acl_item =
1647                MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1648            Datum::MzAclItem(mz_acl_item)
1649        }
1650        Tag::AclItem => {
1651            const N: usize = AclItem::binary_size();
1652            let acl_item =
1653                AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1654            Datum::AclItem(acl_item)
1655        }
1656    }
1657}
1658
1659// --------------------------------------------------------------------------------
1660// writing data
1661
1662fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1663where
1664    D: Vector<u8>,
1665{
1666    let len = u64::cast_from(bytes.len());
1667    data.extend_from_slice(&len.to_le_bytes());
1668    data.extend_from_slice(bytes);
1669}
1670
1671fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1672where
1673    D: Vector<u8>,
1674{
1675    match tag {
1676        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1677            let len = bytes.len().to_le_bytes();
1678            data.push(len[0]);
1679        }
1680        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1681            let len = bytes.len().to_le_bytes();
1682            data.extend_from_slice(&len[0..2]);
1683        }
1684        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1685            let len = bytes.len().to_le_bytes();
1686            data.extend_from_slice(&len[0..4]);
1687        }
1688        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1689            let len = bytes.len().to_le_bytes();
1690            data.extend_from_slice(&len);
1691        }
1692        _ => unreachable!(),
1693    }
1694    data.extend_from_slice(bytes);
1695}
1696
1697pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1698    i32::to_le_bytes(date.pg_epoch_days())
1699}
1700
1701fn push_date<D>(data: &mut D, date: Date)
1702where
1703    D: Vector<u8>,
1704{
1705    data.extend_from_slice(&date_to_array(date));
1706}
1707
1708pub(super) fn naive_date_to_arrays(
1709    date: NaiveDate,
1710) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1711    (
1712        i32::to_le_bytes(date.year()),
1713        u32::to_le_bytes(date.ordinal()),
1714    )
1715}
1716
1717fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1718where
1719    D: Vector<u8>,
1720{
1721    let (ds1, ds2) = naive_date_to_arrays(date);
1722    data.extend_from_slice(&ds1);
1723    data.extend_from_slice(&ds2);
1724}
1725
1726pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1727    (
1728        u32::to_le_bytes(time.num_seconds_from_midnight()),
1729        u32::to_le_bytes(time.nanosecond()),
1730    )
1731}
1732
1733fn push_time<D>(data: &mut D, time: NaiveTime)
1734where
1735    D: Vector<u8>,
1736{
1737    let (ts1, ts2) = time_to_arrays(time);
1738    data.extend_from_slice(&ts1);
1739    data.extend_from_slice(&ts2);
1740}
1741
1742/// Returns an i64 representing a `NaiveDateTime`, if
1743/// said i64 can be round-tripped back to a `NaiveDateTime`.
1744///
1745/// The only exotic NDTs for which this can't happen are those that
1746/// are hundreds of years in the future or past, or those that
1747/// represent a leap second. (Note that Materialize does not support
1748/// leap seconds, but this module does).
1749// This function is inspired by `NaiveDateTime::timestamp_nanos`,
1750// with extra checking.
1751fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1752    let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1753    if subsec_nanos >= 1_000_000_000 {
1754        return None;
1755    }
1756    let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1757    as_ns.checked_add(i64::from(subsec_nanos))
1758}
1759
1760// This function is extremely hot, so
1761// we just use `as` to avoid the overhead of
1762// `try_into` followed by `unwrap`.
1763// `leading_ones` and `leading_zeros`
1764// can never return values greater than 64, so the conversion is safe.
1765#[inline(always)]
1766#[allow(clippy::as_conversions)]
1767fn min_bytes_signed<T>(i: T) -> u8
1768where
1769    T: Into<i64>,
1770{
1771    let i: i64 = i.into();
1772
1773    // To fit in n bytes, we require that
1774    // everything but the leading sign bits fits in n*8
1775    // bits.
1776    let n_sign_bits = if i.is_negative() {
1777        i.leading_ones() as u8
1778    } else {
1779        i.leading_zeros() as u8
1780    };
1781
1782    (64 - n_sign_bits + 7) / 8
1783}
1784
1785// In principle we could just use `min_bytes_signed`, rather than
1786// having a separate function here, as long as we made that one take
1787// `T: Into<i128>` instead of 64. But LLVM doesn't seem smart enough
1788// to realize that that function is the same as the current version,
1789// and generates worse code.
1790//
1791// Justification for `as` is the same as in `min_bytes_signed`.
1792#[inline(always)]
1793#[allow(clippy::as_conversions)]
1794fn min_bytes_unsigned<T>(i: T) -> u8
1795where
1796    T: Into<u64>,
1797{
1798    let i: u64 = i.into();
1799
1800    let n_sign_bits = i.leading_zeros() as u8;
1801
1802    (64 - n_sign_bits + 7) / 8
1803}
1804
1805const TINY: usize = 1 << 8;
1806const SHORT: usize = 1 << 16;
1807const LONG: usize = 1 << 32;
1808
1809fn push_datum<D>(data: &mut D, datum: Datum)
1810where
1811    D: Vector<u8>,
1812{
1813    match datum {
1814        Datum::Null => data.push(Tag::Null.into()),
1815        Datum::False => data.push(Tag::False.into()),
1816        Datum::True => data.push(Tag::True.into()),
1817        Datum::Int16(i) => {
1818            let mbs = min_bytes_signed(i);
1819            let tag = u8::from(if i.is_negative() {
1820                Tag::NegativeInt16_0
1821            } else {
1822                Tag::NonNegativeInt16_0
1823            }) + mbs;
1824
1825            data.push(tag);
1826            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1827        }
1828        Datum::Int32(i) => {
1829            let mbs = min_bytes_signed(i);
1830            let tag = u8::from(if i.is_negative() {
1831                Tag::NegativeInt32_0
1832            } else {
1833                Tag::NonNegativeInt32_0
1834            }) + mbs;
1835
1836            data.push(tag);
1837            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1838        }
1839        Datum::Int64(i) => {
1840            let mbs = min_bytes_signed(i);
1841            let tag = u8::from(if i.is_negative() {
1842                Tag::NegativeInt64_0
1843            } else {
1844                Tag::NonNegativeInt64_0
1845            }) + mbs;
1846
1847            data.push(tag);
1848            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1849        }
1850        Datum::UInt8(i) => {
1851            let mbu = min_bytes_unsigned(i);
1852            let tag = u8::from(Tag::UInt8_0) + mbu;
1853            data.push(tag);
1854            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1855        }
1856        Datum::UInt16(i) => {
1857            let mbu = min_bytes_unsigned(i);
1858            let tag = u8::from(Tag::UInt16_0) + mbu;
1859            data.push(tag);
1860            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1861        }
1862        Datum::UInt32(i) => {
1863            let mbu = min_bytes_unsigned(i);
1864            let tag = u8::from(Tag::UInt32_0) + mbu;
1865            data.push(tag);
1866            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1867        }
1868        Datum::UInt64(i) => {
1869            let mbu = min_bytes_unsigned(i);
1870            let tag = u8::from(Tag::UInt64_0) + mbu;
1871            data.push(tag);
1872            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1873        }
1874        Datum::Float32(f) => {
1875            data.push(Tag::Float32.into());
1876            data.extend_from_slice(&f.to_bits().to_le_bytes());
1877        }
1878        Datum::Float64(f) => {
1879            data.push(Tag::Float64.into());
1880            data.extend_from_slice(&f.to_bits().to_le_bytes());
1881        }
1882        Datum::Date(d) => {
1883            data.push(Tag::Date.into());
1884            push_date(data, d);
1885        }
1886        Datum::Time(t) => {
1887            data.push(Tag::Time.into());
1888            push_time(data, t);
1889        }
1890        Datum::Timestamp(t) => {
1891            let datetime = t.to_naive();
1892            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1893                data.push(Tag::CheapTimestamp.into());
1894                data.extend_from_slice(&nanos.to_le_bytes());
1895            } else {
1896                data.push(Tag::Timestamp.into());
1897                push_naive_date(data, datetime.date());
1898                push_time(data, datetime.time());
1899            }
1900        }
1901        Datum::TimestampTz(t) => {
1902            let datetime = t.to_naive();
1903            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1904                data.push(Tag::CheapTimestampTz.into());
1905                data.extend_from_slice(&nanos.to_le_bytes());
1906            } else {
1907                data.push(Tag::TimestampTz.into());
1908                push_naive_date(data, datetime.date());
1909                push_time(data, datetime.time());
1910            }
1911        }
1912        Datum::Interval(i) => {
1913            data.push(Tag::Interval.into());
1914            data.extend_from_slice(&i.months.to_le_bytes());
1915            data.extend_from_slice(&i.days.to_le_bytes());
1916            data.extend_from_slice(&i.micros.to_le_bytes());
1917        }
1918        Datum::Bytes(bytes) => {
1919            let tag = match bytes.len() {
1920                0..TINY => Tag::BytesTiny,
1921                TINY..SHORT => Tag::BytesShort,
1922                SHORT..LONG => Tag::BytesLong,
1923                _ => Tag::BytesHuge,
1924            };
1925            data.push(tag.into());
1926            push_lengthed_bytes(data, bytes, tag);
1927        }
1928        Datum::String(string) => {
1929            let tag = match string.len() {
1930                0..TINY => Tag::StringTiny,
1931                TINY..SHORT => Tag::StringShort,
1932                SHORT..LONG => Tag::StringLong,
1933                _ => Tag::StringHuge,
1934            };
1935            data.push(tag.into());
1936            push_lengthed_bytes(data, string.as_bytes(), tag);
1937        }
1938        Datum::List(list) => {
1939            let tag = match list.data.len() {
1940                0..TINY => Tag::ListTiny,
1941                TINY..SHORT => Tag::ListShort,
1942                SHORT..LONG => Tag::ListLong,
1943                _ => Tag::ListHuge,
1944            };
1945            data.push(tag.into());
1946            push_lengthed_bytes(data, list.data, tag);
1947        }
1948        Datum::Uuid(u) => {
1949            data.push(Tag::Uuid.into());
1950            data.extend_from_slice(u.as_bytes());
1951        }
1952        Datum::Array(array) => {
1953            // See the comment in `Row::push_array` for details on the encoding
1954            // of arrays.
1955            data.push(Tag::Array.into());
1956            data.push(array.dims.ndims());
1957            data.extend_from_slice(array.dims.data);
1958            push_untagged_bytes(data, array.elements.data);
1959        }
1960        Datum::Map(dict) => {
1961            data.push(Tag::Dict.into());
1962            push_untagged_bytes(data, dict.data);
1963        }
1964        Datum::JsonNull => data.push(Tag::JsonNull.into()),
1965        Datum::MzTimestamp(t) => {
1966            data.push(Tag::MzTimestamp.into());
1967            data.extend_from_slice(&t.encode());
1968        }
1969        Datum::Dummy => data.push(Tag::Dummy.into()),
1970        Datum::Numeric(mut n) => {
1971            // Pseudo-canonical representation of decimal values with
1972            // insignificant zeroes trimmed. This compresses the number further
1973            // than `Numeric::trim` by removing all zeroes, and not only those in
1974            // the fractional component.
1975            numeric::cx_datum().reduce(&mut n.0);
1976            let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1977            data.push(Tag::Numeric.into());
1978            data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1979            data.push(
1980                i8::try_from(exponent)
1981                    .expect("exponent to fit within i8; should not exceed +/- 39")
1982                    .to_le_bytes()[0],
1983            );
1984            data.push(bits);
1985
1986            let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1987
1988            // Little endian machines can take the lsu directly from u16 to u8.
1989            if cfg!(target_endian = "little") {
1990                // SAFETY: `lsu` (returned by `coefficient_units()`) is a `&[u16]`, so
1991                // each element can safely be transmuted into two `u8`s.
1992                let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
1993                // The `u8` aligned version of the `lsu` should have twice as many
1994                // elements as we expect for the `u16` version.
1995                soft_assert_no_log!(
1996                    lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
1997                    "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
1998                    Numeric::digits_to_lsu_elements_len(digits) * 2,
1999                    lsu_bytes.len()
2000                );
2001                // There should be no unaligned elements in the prefix or suffix.
2002                soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
2003                data.extend_from_slice(lsu_bytes);
2004            } else {
2005                for u in lsu {
2006                    data.extend_from_slice(&u.to_le_bytes());
2007                }
2008            }
2009        }
2010        Datum::Range(range) => {
2011            // See notes on `push_range_with` for details about encoding.
2012            data.push(Tag::Range.into());
2013            data.push(range.internal_flag_bits());
2014
2015            if let Some(RangeInner { lower, upper }) = range.inner {
2016                for bound in [lower.bound, upper.bound] {
2017                    if let Some(bound) = bound {
2018                        match bound.datum() {
2019                            Datum::Null => panic!("cannot push Datum::Null into range"),
2020                            d => push_datum::<D>(data, d),
2021                        }
2022                    }
2023                }
2024            }
2025        }
2026        Datum::MzAclItem(mz_acl_item) => {
2027            data.push(Tag::MzAclItem.into());
2028            data.extend_from_slice(&mz_acl_item.encode_binary());
2029        }
2030        Datum::AclItem(acl_item) => {
2031            data.push(Tag::AclItem.into());
2032            data.extend_from_slice(&acl_item.encode_binary());
2033        }
2034    }
2035}
2036
2037/// Return the number of bytes these Datums would use if packed as a Row.
2038pub fn row_size<'a, I>(a: I) -> usize
2039where
2040    I: IntoIterator<Item = Datum<'a>>,
2041{
2042    // Using datums_size instead of a.data().len() here is safer because it will
2043    // return the size of the datums if they were packed into a Row. Although
2044    // a.data().len() happens to give the correct answer (and is faster), data()
2045    // is documented as for debugging only.
2046    let sz = datums_size::<_, _>(a);
2047    let size_of_row = std::mem::size_of::<Row>();
2048    // The Row struct attempts to inline data until it can't fit in the
2049    // preallocated size. Otherwise it spills to heap, and uses the Row to point
2050    // to that.
2051    if sz > Row::SIZE {
2052        sz + size_of_row
2053    } else {
2054        size_of_row
2055    }
2056}
2057
2058/// Number of bytes required by the datum.
2059/// This is used to optimistically pre-allocate buffers for packing rows.
2060pub fn datum_size(datum: &Datum) -> usize {
2061    match datum {
2062        Datum::Null => 1,
2063        Datum::False => 1,
2064        Datum::True => 1,
2065        Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
2066        Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
2067        Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
2068        Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2069        Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2070        Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2071        Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2072        Datum::Float32(_) => 1 + size_of::<f32>(),
2073        Datum::Float64(_) => 1 + size_of::<f64>(),
2074        Datum::Date(_) => 1 + size_of::<i32>(),
2075        Datum::Time(_) => 1 + 8,
2076        Datum::Timestamp(t) => {
2077            1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
2078                8
2079            } else {
2080                16
2081            }
2082        }
2083        Datum::TimestampTz(t) => {
2084            1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
2085                8
2086            } else {
2087                16
2088            }
2089        }
2090        Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
2091        Datum::Bytes(bytes) => {
2092            // We use a variable length representation of slice length.
2093            let bytes_for_length = match bytes.len() {
2094                0..TINY => 1,
2095                TINY..SHORT => 2,
2096                SHORT..LONG => 4,
2097                _ => 8,
2098            };
2099            1 + bytes_for_length + bytes.len()
2100        }
2101        Datum::String(string) => {
2102            // We use a variable length representation of slice length.
2103            let bytes_for_length = match string.len() {
2104                0..TINY => 1,
2105                TINY..SHORT => 2,
2106                SHORT..LONG => 4,
2107                _ => 8,
2108            };
2109            1 + bytes_for_length + string.len()
2110        }
2111        Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
2112        Datum::Array(array) => {
2113            1 + size_of::<u8>()
2114                + array.dims.data.len()
2115                + size_of::<u64>()
2116                + array.elements.data.len()
2117        }
2118        Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
2119        Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
2120        Datum::JsonNull => 1,
2121        Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
2122        Datum::Dummy => 1,
2123        Datum::Numeric(d) => {
2124            let mut d = d.0.clone();
2125            // Values must be reduced to determine appropriate number of
2126            // coefficient units.
2127            numeric::cx_datum().reduce(&mut d);
2128            // 4 = 1 bit each for tag, digits, exponent, bits
2129            4 + (d.coefficient_units().len() * 2)
2130        }
2131        Datum::Range(Range { inner }) => {
2132            // Tag + flags
2133            2 + match inner {
2134                None => 0,
2135                Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
2136                    .iter()
2137                    .map(|bound| match bound {
2138                        None => 0,
2139                        Some(bound) => bound.val.len(),
2140                    })
2141                    .sum(),
2142            }
2143        }
2144        Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
2145        Datum::AclItem(_) => 1 + AclItem::binary_size(),
2146    }
2147}
2148
2149/// Number of bytes required by a sequence of datums.
2150///
2151/// This method can be used to right-size the allocation for a `Row`
2152/// before calling [`RowPacker::extend`].
2153pub fn datums_size<'a, I, D>(iter: I) -> usize
2154where
2155    I: IntoIterator<Item = D>,
2156    D: Borrow<Datum<'a>>,
2157{
2158    iter.into_iter().map(|d| datum_size(d.borrow())).sum()
2159}
2160
2161/// Number of bytes required by a list of datums. This computes the size that would be required if
2162/// the given datums were packed into a list.
2163///
2164/// This is used to optimistically pre-allocate buffers for packing rows.
2165pub fn datum_list_size<'a, I, D>(iter: I) -> usize
2166where
2167    I: IntoIterator<Item = D>,
2168    D: Borrow<Datum<'a>>,
2169{
2170    1 + size_of::<u64>() + datums_size(iter)
2171}
2172
2173impl RowPacker<'_> {
2174    /// Constructs a row packer that will pack additional datums into the
2175    /// provided row.
2176    ///
2177    /// This function is intentionally somewhat inconvenient to call. You
2178    /// usually want to call [`Row::packer`] instead to start packing from
2179    /// scratch.
2180    pub fn for_existing_row(row: &mut Row) -> RowPacker<'_> {
2181        RowPacker { row }
2182    }
2183
2184    /// Extend an existing `Row` with a `Datum`.
2185    #[inline]
2186    pub fn push<'a, D>(&mut self, datum: D)
2187    where
2188        D: Borrow<Datum<'a>>,
2189    {
2190        push_datum(&mut self.row.data, *datum.borrow());
2191    }
2192
2193    /// Extend an existing `Row` with additional `Datum`s.
2194    #[inline]
2195    pub fn extend<'a, I, D>(&mut self, iter: I)
2196    where
2197        I: IntoIterator<Item = D>,
2198        D: Borrow<Datum<'a>>,
2199    {
2200        for datum in iter {
2201            push_datum(&mut self.row.data, *datum.borrow())
2202        }
2203    }
2204
2205    /// Extend an existing `Row` with additional `Datum`s.
2206    ///
2207    /// In the case the iterator produces an error, the pushing of
2208    /// datums in terminated and the error returned. The `Row` will
2209    /// be incomplete, but it will be safe to read datums from it.
2210    #[inline]
2211    pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
2212    where
2213        I: IntoIterator<Item = Result<D, E>>,
2214        D: Borrow<Datum<'a>>,
2215    {
2216        for datum in iter {
2217            push_datum(&mut self.row.data, *datum?.borrow());
2218        }
2219        Ok(())
2220    }
2221
2222    /// Appends the datums of an entire `Row`.
2223    pub fn extend_by_row(&mut self, row: &Row) {
2224        self.row.data.extend_from_slice(row.data.as_slice());
2225    }
2226
2227    /// Appends the datums of an entire `Row`.
2228    pub fn extend_by_row_ref(&mut self, row: &RowRef) {
2229        self.row.data.extend_from_slice(row.data());
2230    }
2231
2232    /// Appends the slice of data representing an entire `Row`. The data is not validated.
2233    ///
2234    /// # Safety
2235    ///
2236    /// The requirements from [`Row::from_bytes_unchecked`] apply here, too:
2237    /// This method relies on `data` being an appropriate row encoding, and can
2238    /// result in unsafety if this is not the case.
2239    #[inline]
2240    pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
2241        self.row.data.extend_from_slice(data)
2242    }
2243
2244    /// Pushes a [`DatumList`] that is built from a closure.
2245    ///
2246    /// The supplied closure will be invoked once with a `Row` that can be used
2247    /// to populate the list. It is valid to call any method on the
2248    /// [`RowPacker`] except for [`RowPacker::clear`], [`RowPacker::truncate`],
2249    /// or [`RowPacker::truncate_datums`].
2250    ///
2251    /// Returns the value returned by the closure, if any.
2252    ///
2253    /// ```
2254    /// # use mz_repr::{Row, Datum};
2255    /// let mut row = Row::default();
2256    /// row.packer().push_list_with(|row| {
2257    ///     row.push(Datum::String("age"));
2258    ///     row.push(Datum::Int64(42));
2259    /// });
2260    /// assert_eq!(
2261    ///     row.unpack_first().unwrap_list().iter().collect::<Vec<_>>(),
2262    ///     vec![Datum::String("age"), Datum::Int64(42)],
2263    /// );
2264    /// ```
2265    #[inline]
2266    pub fn push_list_with<F, R>(&mut self, f: F) -> R
2267    where
2268        F: FnOnce(&mut RowPacker) -> R,
2269    {
2270        // First, assume that the list will fit in 255 bytes, and thus the length will fit in
2271        // 1 byte. If not, we'll fix it up later.
2272        let start = self.row.data.len();
2273        self.row.data.push(Tag::ListTiny.into());
2274        // Write a dummy len, will fix it up later.
2275        self.row.data.push(0);
2276
2277        let out = f(self);
2278
2279        // The `- 1 - 1` is for the tag and the len.
2280        let len = self.row.data.len() - start - 1 - 1;
2281        // We now know the real len.
2282        if len < TINY {
2283            // If the len fits in 1 byte, we just need to fix up the len.
2284            self.row.data[start + 1] = len.to_le_bytes()[0];
2285        } else {
2286            // Note: We move this code path into its own function, so that the common case can be
2287            // inlined.
2288            long_list(&mut self.row.data, start, len);
2289        }
2290
2291        /// 1. Fix up the tag.
2292        /// 2. Move the actual data a bit (for which we also need to make room at the end).
2293        /// 3. Fix up the len.
2294        /// `data`: The row's backing data.
2295        /// `start`: where `push_list_with` started writing in `data`.
2296        /// `len`: the length of the data, excluding the tag and the length.
2297        #[cold]
2298        fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2299            // `len_len`: the length of the length. (Possible values are: 2, 4, 8. 1 is handled
2300            // elsewhere.) The other parameters are the same as for `long_list`.
2301            let long_list_inner = |data: &mut CompactBytes, len_len| {
2302                // We'll need memory for the new, bigger length, so make the `CompactBytes` bigger.
2303                // The `- 1` is because the old length was 1 byte.
2304                const ZEROS: [u8; 8] = [0; 8];
2305                data.extend_from_slice(&ZEROS[0..len_len - 1]);
2306                // Move the data to the end of the `CompactBytes`, to make space for the new length.
2307                // Originally, it started after the 1-byte tag and the 1-byte length, now it will
2308                // start after the 1-byte tag and the len_len-byte length.
2309                //
2310                // Note that this is the only operation in `long_list` whose cost is proportional
2311                // to `len`. Since `len` is at least 256 here, the other operations' cost are
2312                // negligible. `copy_within` is a memmove, which is probably a fair bit faster per
2313                // Datum than a Datum encoding in the `f` closure.
2314                data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2315                // Write the new length.
2316                data[start + 1..start + 1 + len_len]
2317                    .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2318            };
2319            match len {
2320                0..TINY => {
2321                    unreachable!()
2322                }
2323                TINY..SHORT => {
2324                    data[start] = Tag::ListShort.into();
2325                    long_list_inner(data, 2);
2326                }
2327                SHORT..LONG => {
2328                    data[start] = Tag::ListLong.into();
2329                    long_list_inner(data, 4);
2330                }
2331                _ => {
2332                    data[start] = Tag::ListHuge.into();
2333                    long_list_inner(data, 8);
2334                }
2335            };
2336        }
2337
2338        out
2339    }
2340
2341    /// Pushes a [`DatumMap`] that is built from a closure.
2342    ///
2343    /// The supplied closure will be invoked once with a `Row` that can be used
2344    /// to populate the dict.
2345    ///
2346    /// The closure **must** alternate pushing string keys and arbitrary values,
2347    /// otherwise reading the dict will cause a panic.
2348    ///
2349    /// The closure **must** push keys in ascending order, otherwise equality
2350    /// checks on the resulting `Row` may be wrong and reading the dict IN DEBUG
2351    /// MODE will cause a panic.
2352    ///
2353    /// The closure **must not** call [`RowPacker::clear`],
2354    /// [`RowPacker::truncate`], or [`RowPacker::truncate_datums`].
2355    ///
2356    /// # Example
2357    ///
2358    /// ```
2359    /// # use mz_repr::{Row, Datum};
2360    /// let mut row = Row::default();
2361    /// row.packer().push_dict_with(|row| {
2362    ///
2363    ///     // key
2364    ///     row.push(Datum::String("age"));
2365    ///     // value
2366    ///     row.push(Datum::Int64(42));
2367    ///
2368    ///     // key
2369    ///     row.push(Datum::String("name"));
2370    ///     // value
2371    ///     row.push(Datum::String("bob"));
2372    /// });
2373    /// assert_eq!(
2374    ///     row.unpack_first().unwrap_map().iter().collect::<Vec<_>>(),
2375    ///     vec![("age", Datum::Int64(42)), ("name", Datum::String("bob"))]
2376    /// );
2377    /// ```
2378    pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2379    where
2380        F: FnOnce(&mut RowPacker) -> R,
2381    {
2382        self.row.data.push(Tag::Dict.into());
2383        let start = self.row.data.len();
2384        // write a dummy len, will fix it up later
2385        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2386
2387        let res = f(self);
2388
2389        let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2390        // fix up the len
2391        self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2392
2393        res
2394    }
2395
2396    /// Like [`RowPacker::push_dict_with`], but accepts a fallible closure.
2397    pub fn try_push_dict_with<F, E>(&mut self, f: F) -> Result<(), E>
2398    where
2399        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2400    {
2401        self.push_dict_with(f)
2402    }
2403
2404    /// Convenience function to construct an array from an iter of `Datum`s.
2405    ///
2406    /// Returns an error if the number of elements in `iter` does not match
2407    /// the cardinality of the array as described by `dims`, or if the
2408    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2409    /// occurs, the packer's state will be unchanged.
2410    pub fn try_push_array<'a, I, D>(
2411        &mut self,
2412        dims: &[ArrayDimension],
2413        iter: I,
2414    ) -> Result<(), InvalidArrayError>
2415    where
2416        I: IntoIterator<Item = D>,
2417        D: Borrow<Datum<'a>>,
2418    {
2419        // SAFETY: The function returns the exact number of elements pushed into the array.
2420        unsafe {
2421            self.push_array_with_unchecked(dims, |packer| {
2422                let mut nelements = 0;
2423                for datum in iter {
2424                    packer.push(datum);
2425                    nelements += 1;
2426                }
2427                Ok::<_, InvalidArrayError>(nelements)
2428            })
2429        }
2430    }
2431
2432    /// Like [`RowPacker::try_push_array`], but accepts a fallible iterator of
2433    /// elements.
2434    pub fn try_push_array_fallible<'a, I, D, E>(
2435        &mut self,
2436        dims: &[ArrayDimension],
2437        iter: I,
2438    ) -> Result<Result<(), E>, InvalidArrayError>
2439    where
2440        I: IntoIterator<Item = Result<D, E>>,
2441        D: Borrow<Datum<'a>>,
2442    {
2443        enum Error<E> {
2444            Usage(InvalidArrayError),
2445            Inner(E),
2446        }
2447
2448        impl<E> From<InvalidArrayError> for Error<E> {
2449            fn from(e: InvalidArrayError) -> Self {
2450                Self::Usage(e)
2451            }
2452        }
2453
2454        // SAFETY: The function returns the exact number of elements pushed into the array.
2455        let result = unsafe {
2456            self.push_array_with_unchecked(dims, |packer| {
2457                let mut nelements = 0;
2458                for datum in iter {
2459                    packer.push(datum.map_err(Error::Inner)?);
2460                    nelements += 1;
2461                }
2462                Ok(nelements)
2463            })
2464        };
2465        match result {
2466            Ok(()) => Ok(Ok(())),
2467            Err(Error::Usage(e)) => Err(e),
2468            Err(Error::Inner(e)) => Ok(Err(e)),
2469        }
2470    }
2471
2472    /// Convenience function to construct an array from a function. The function must return the
2473    /// number of elements it pushed into the array. It is undefined behavior if the function returns
2474    /// a number different to the number of elements it pushed.
2475    ///
2476    /// Returns an error if the number of elements pushed by `f` does not match
2477    /// the cardinality of the array as described by `dims`, or if the
2478    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`], or if `f` errors. If an error
2479    /// occurs, the packer's state will be unchanged.
2480    pub unsafe fn push_array_with_unchecked<F, E>(
2481        &mut self,
2482        dims: &[ArrayDimension],
2483        f: F,
2484    ) -> Result<(), E>
2485    where
2486        F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2487        E: From<InvalidArrayError>,
2488    {
2489        // Arrays are encoded as follows.
2490        //
2491        // u8    ndims
2492        // u64   dim_0 lower bound
2493        // u64   dim_0 length
2494        // ...
2495        // u64   dim_n lower bound
2496        // u64   dim_n length
2497        // u64   element data size in bytes
2498        // u8    element data, where elements are encoded in row-major order
2499
2500        if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2501            return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2502        }
2503
2504        let start = self.row.data.len();
2505        self.row.data.push(Tag::Array.into());
2506
2507        // Write dimension information.
2508        self.row
2509            .data
2510            .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2511        for dim in dims {
2512            self.row
2513                .data
2514                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2515            self.row
2516                .data
2517                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2518        }
2519
2520        // Write elements.
2521        let off = self.row.data.len();
2522        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2523        let nelements = match f(self) {
2524            Ok(nelements) => nelements,
2525            Err(e) => {
2526                self.row.data.truncate(start);
2527                return Err(e);
2528            }
2529        };
2530        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2531        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2532
2533        // Check that the number of elements written matches the dimension
2534        // information.
2535        let cardinality = match dims {
2536            [] => 0,
2537            dims => dims.iter().map(|d| d.length).product(),
2538        };
2539        if nelements != cardinality {
2540            self.row.data.truncate(start);
2541            return Err(InvalidArrayError::WrongCardinality {
2542                actual: nelements,
2543                expected: cardinality,
2544            }
2545            .into());
2546        }
2547
2548        Ok(())
2549    }
2550
2551    /// Pushes an [`Array`] that is built from a closure.
2552    ///
2553    /// __WARNING__: This is fairly "sharp" tool that is easy to get wrong. You
2554    /// should prefer [`RowPacker::try_push_array`] when possible.
2555    ///
2556    /// Returns an error if the number of elements pushed does not match
2557    /// the cardinality of the array as described by `dims`, or if the
2558    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2559    /// occurs, the packer's state will be unchanged.
2560    pub fn push_array_with_row_major<F, I>(
2561        &mut self,
2562        dims: I,
2563        f: F,
2564    ) -> Result<(), InvalidArrayError>
2565    where
2566        I: IntoIterator<Item = ArrayDimension>,
2567        F: FnOnce(&mut RowPacker) -> usize,
2568    {
2569        let start = self.row.data.len();
2570        self.row.data.push(Tag::Array.into());
2571
2572        // Write dummy dimension length for now, we'll fix it up.
2573        let dims_start = self.row.data.len();
2574        self.row.data.push(42);
2575
2576        let mut num_dims: u8 = 0;
2577        let mut cardinality: usize = 1;
2578        for dim in dims {
2579            num_dims += 1;
2580            cardinality *= dim.length;
2581
2582            self.row
2583                .data
2584                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2585            self.row
2586                .data
2587                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2588        }
2589
2590        if num_dims > MAX_ARRAY_DIMENSIONS {
2591            // Reset the packer state so we don't have invalid data.
2592            self.row.data.truncate(start);
2593            return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2594        }
2595        // Fix up our dimension length.
2596        self.row.data[dims_start..dims_start + size_of::<u8>()]
2597            .copy_from_slice(&num_dims.to_le_bytes());
2598
2599        // Write elements.
2600        let off = self.row.data.len();
2601        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2602
2603        let nelements = f(self);
2604
2605        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2606        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2607
2608        // Check that the number of elements written matches the dimension
2609        // information.
2610        let cardinality = match num_dims {
2611            0 => 0,
2612            _ => cardinality,
2613        };
2614        if nelements != cardinality {
2615            self.row.data.truncate(start);
2616            return Err(InvalidArrayError::WrongCardinality {
2617                actual: nelements,
2618                expected: cardinality,
2619            });
2620        }
2621
2622        Ok(())
2623    }
2624
2625    /// Convenience function to push a `DatumList` from an iter of `Datum`s
2626    ///
2627    /// See [`RowPacker::push_dict_with`] if you need to be able to handle errors
2628    pub fn push_list<'a, I, D>(&mut self, iter: I)
2629    where
2630        I: IntoIterator<Item = D>,
2631        D: Borrow<Datum<'a>>,
2632    {
2633        self.push_list_with(|packer| {
2634            for elem in iter {
2635                packer.push(*elem.borrow())
2636            }
2637        });
2638    }
2639
2640    /// Convenience function to push a `DatumMap` from an iter of `(&str, Datum)` pairs
2641    pub fn push_dict<'a, I, D>(&mut self, iter: I)
2642    where
2643        I: IntoIterator<Item = (&'a str, D)>,
2644        D: Borrow<Datum<'a>>,
2645    {
2646        self.push_dict_with(|packer| {
2647            for (k, v) in iter {
2648                packer.push(Datum::String(k));
2649                packer.push(*v.borrow())
2650            }
2651        })
2652    }
2653
2654    /// Pushes a `Datum::Range` derived from the `Range<Datum<'a>`.
2655    ///
2656    /// # Panics
2657    /// - If lower and upper express finite values and they are datums of
2658    ///   different types.
2659    /// - If lower or upper express finite values and are equal to
2660    ///   `Datum::Null`. To handle `Datum::Null` properly, use
2661    ///   [`RangeBound::new`].
2662    ///
2663    /// # Notes
2664    /// - This function canonicalizes the range before pushing it to the row.
2665    /// - Prefer this function over `push_range_with` because of its
2666    ///   canonicaliztion.
2667    /// - Prefer creating [`RangeBound`]s using [`RangeBound::new`], which
2668    ///   handles `Datum::Null` in a SQL-friendly way.
2669    pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2670        range.canonicalize()?;
2671        match range.inner {
2672            None => {
2673                self.row.data.push(Tag::Range.into());
2674                // Untagged bytes only contains the `RANGE_EMPTY` flag value.
2675                self.row.data.push(range::InternalFlags::EMPTY.bits());
2676                Ok(())
2677            }
2678            Some(inner) => self.push_range_with(
2679                RangeLowerBound {
2680                    inclusive: inner.lower.inclusive,
2681                    bound: inner
2682                        .lower
2683                        .bound
2684                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2685                },
2686                RangeUpperBound {
2687                    inclusive: inner.upper.inclusive,
2688                    bound: inner
2689                        .upper
2690                        .bound
2691                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2692                },
2693            ),
2694        }
2695    }
2696
2697    /// Pushes a `DatumRange` built from the specified arguments.
2698    ///
2699    /// # Warning
2700    /// Unlike `push_range`, `push_range_with` _does not_ canonicalize its
2701    /// inputs. Consequentially, this means it's possible to generate ranges
2702    /// that will not reflect the proper ordering and equality.
2703    ///
2704    /// # Panics
2705    /// - If lower or upper expresses a finite value and does not push exactly
2706    ///   one value into the `RowPacker`.
2707    /// - If lower and upper express finite values and they are datums of
2708    ///   different types.
2709    /// - If lower or upper express finite values and push `Datum::Null`.
2710    ///
2711    /// # Notes
2712    /// - Prefer `push_range_with` over this function. This function should be
2713    ///   used only when you are not pushing `Datum`s to the inner row.
2714    /// - Range encoding is `[<flag bytes>,<lower>?,<upper>?]`, where `lower`
2715    ///   and `upper` are optional, contingent on the flag value expressing an
2716    ///   empty range (where neither will be present) or infinite bounds (where
2717    ///   each infinite bound will be absent).
2718    /// - To push an emtpy range, use `push_range` using `Range { inner: None }`.
2719    pub fn push_range_with<L, U, E>(
2720        &mut self,
2721        lower: RangeLowerBound<L>,
2722        upper: RangeUpperBound<U>,
2723    ) -> Result<(), E>
2724    where
2725        L: FnOnce(&mut RowPacker) -> Result<(), E>,
2726        U: FnOnce(&mut RowPacker) -> Result<(), E>,
2727        E: From<InvalidRangeError>,
2728    {
2729        let start = self.row.data.len();
2730        self.row.data.push(Tag::Range.into());
2731
2732        let mut flags = range::InternalFlags::empty();
2733
2734        flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2735        flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2736        flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2737        flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2738
2739        let mut expected_datums = 0;
2740
2741        self.row.data.push(flags.bits());
2742
2743        let datum_check = self.row.data.len();
2744
2745        if let Some(value) = lower.bound {
2746            let start = self.row.data.len();
2747            value(self)?;
2748            assert!(
2749                start < self.row.data.len(),
2750                "finite values must each push exactly one value; expected 1 but got 0"
2751            );
2752            expected_datums += 1;
2753        }
2754
2755        if let Some(value) = upper.bound {
2756            let start = self.row.data.len();
2757            value(self)?;
2758            assert!(
2759                start < self.row.data.len(),
2760                "finite values must each push exactly one value; expected 1 but got 0"
2761            );
2762            expected_datums += 1;
2763        }
2764
2765        // Validate the invariants that 0, 1, or 2 elements were pushed, none are Null,
2766        // and if two are pushed then the second is not less than the first. Panic in
2767        // some cases and error in others.
2768        let mut actual_datums = 0;
2769        let mut seen = None;
2770        let mut dataz = &self.row.data[datum_check..];
2771        while !dataz.is_empty() {
2772            let d = unsafe { read_datum(&mut dataz) };
2773            assert!(d != Datum::Null, "cannot push Datum::Null into range");
2774
2775            match seen {
2776                None => seen = Some(d),
2777                Some(seen) => {
2778                    let seen_kind = DatumKind::from(seen);
2779                    let d_kind = DatumKind::from(d);
2780                    assert!(
2781                        seen_kind == d_kind,
2782                        "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2783                    );
2784
2785                    if seen > d {
2786                        self.row.data.truncate(start);
2787                        return Err(InvalidRangeError::MisorderedRangeBounds.into());
2788                    }
2789                }
2790            }
2791            actual_datums += 1;
2792        }
2793
2794        assert!(
2795            actual_datums == expected_datums,
2796            "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2797        );
2798
2799        Ok(())
2800    }
2801
2802    /// Clears the contents of the packer without de-allocating its backing memory.
2803    pub fn clear(&mut self) {
2804        self.row.data.clear();
2805    }
2806
2807    /// Truncates the underlying storage to the specified byte position.
2808    ///
2809    /// # Safety
2810    ///
2811    /// `pos` MUST specify a byte offset that lies on a datum boundary.
2812    /// If `pos` specifies a byte offset that is *within* a datum, the row
2813    /// packer will produce an invalid row, the unpacking of which may
2814    /// trigger undefined behavior!
2815    ///
2816    /// To find the byte offset of a datum boundary, inspect the packer's
2817    /// byte length by calling `packer.data().len()` after pushing the desired
2818    /// number of datums onto the packer.
2819    pub unsafe fn truncate(&mut self, pos: usize) {
2820        self.row.data.truncate(pos)
2821    }
2822
2823    /// Truncates the underlying row to contain at most the first `n` datums.
2824    pub fn truncate_datums(&mut self, n: usize) {
2825        let prev_len = self.row.data.len();
2826        let mut iter = self.row.iter();
2827        for _ in iter.by_ref().take(n) {}
2828        let next_len = iter.data.len();
2829        // SAFETY: iterator offsets always lie on a datum boundary.
2830        unsafe { self.truncate(prev_len - next_len) }
2831    }
2832
2833    /// Returns the total amount of bytes used by the underlying row.
2834    pub fn byte_len(&self) -> usize {
2835        self.row.byte_len()
2836    }
2837}
2838
2839impl<'a> IntoIterator for &'a Row {
2840    type Item = Datum<'a>;
2841    type IntoIter = DatumListIter<'a>;
2842    fn into_iter(self) -> DatumListIter<'a> {
2843        self.iter()
2844    }
2845}
2846
2847impl fmt::Debug for Row {
2848    /// Debug representation using the internal datums
2849    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2850        f.write_str("Row{")?;
2851        f.debug_list().entries(self.iter()).finish()?;
2852        f.write_str("}")
2853    }
2854}
2855
2856impl fmt::Display for Row {
2857    /// Display representation using the internal datums
2858    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2859        f.write_str("(")?;
2860        for (i, datum) in self.iter().enumerate() {
2861            if i != 0 {
2862                f.write_str(", ")?;
2863            }
2864            write!(f, "{}", datum)?;
2865        }
2866        f.write_str(")")
2867    }
2868}
2869
2870impl<'a, T> DatumList<'a, T> {
2871    pub fn iter(&self) -> DatumListIter<'a> {
2872        DatumListIter { data: self.data }
2873    }
2874
2875    /// Iterate elements as typed `T` values rather than raw `Datum`s.
2876    ///
2877    /// Each datum is decoded and converted via [`FromDatum`]. Since generic
2878    /// type parameters in `#[sqlfunc]` are erased to `Datum<'a>` before code
2879    /// generation, this is monomorphized to an identity conversion at runtime.
2880    pub fn typed_iter(&self) -> DatumListTypedIter<'a, T>
2881    where
2882        T: FromDatum<'a>,
2883    {
2884        DatumListTypedIter {
2885            inner: self.iter(),
2886            _phantom: PhantomData,
2887        }
2888    }
2889
2890    /// For debugging only
2891    pub fn data(&self) -> &'a [u8] {
2892        self.data
2893    }
2894}
2895
2896impl<T> DatumList<'static, T> {
2897    pub fn empty() -> Self {
2898        DatumList::new(&[])
2899    }
2900}
2901
2902impl<'a> IntoIterator for DatumList<'a> {
2903    type Item = Datum<'a>;
2904    type IntoIter = DatumListIter<'a>;
2905    fn into_iter(self) -> DatumListIter<'a> {
2906        self.iter()
2907    }
2908}
2909
2910impl<'a> Iterator for DatumListIter<'a> {
2911    type Item = Datum<'a>;
2912    fn next(&mut self) -> Option<Self::Item> {
2913        if self.data.is_empty() {
2914            None
2915        } else {
2916            Some(unsafe { read_datum(&mut self.data) })
2917        }
2918    }
2919}
2920
2921impl<'a, T: FromDatum<'a>> Iterator for DatumListTypedIter<'a, T> {
2922    type Item = T;
2923    fn next(&mut self) -> Option<Self::Item> {
2924        self.inner.next().map(T::from_datum)
2925    }
2926}
2927
2928impl<'a, T> DatumMap<'a, T> {
2929    pub fn iter(&self) -> DatumDictIter<'a> {
2930        DatumDictIter {
2931            data: self.data,
2932            prev_key: None,
2933        }
2934    }
2935
2936    /// Iterate entries as `(&str, T)` pairs rather than `(&str, Datum)`.
2937    ///
2938    /// Each value datum is converted via [`FromDatum`]. Since generic type
2939    /// parameters in `#[sqlfunc]` are erased to `Datum<'a>` before code
2940    /// generation, this is monomorphized to an identity conversion at runtime.
2941    pub fn typed_iter(&self) -> DatumDictTypedIter<'a, T>
2942    where
2943        T: FromDatum<'a>,
2944    {
2945        DatumDictTypedIter {
2946            inner: self.iter(),
2947            _phantom: PhantomData,
2948        }
2949    }
2950
2951    /// For debugging only
2952    pub fn data(&self) -> &'a [u8] {
2953        self.data
2954    }
2955}
2956
2957impl<T> DatumMap<'static, T> {
2958    pub fn empty() -> Self {
2959        DatumMap::new(&[])
2960    }
2961}
2962
2963impl<'a, T> Debug for DatumMap<'a, T> {
2964    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2965        f.debug_map().entries(self.iter()).finish()
2966    }
2967}
2968
2969impl<'a> IntoIterator for &'a DatumMap<'a> {
2970    type Item = (&'a str, Datum<'a>);
2971    type IntoIter = DatumDictIter<'a>;
2972    fn into_iter(self) -> DatumDictIter<'a> {
2973        self.iter()
2974    }
2975}
2976
2977impl<'a> Iterator for DatumDictIter<'a> {
2978    type Item = (&'a str, Datum<'a>);
2979    fn next(&mut self) -> Option<Self::Item> {
2980        if self.data.is_empty() {
2981            None
2982        } else {
2983            let key_tag =
2984                Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2985            assert!(
2986                key_tag == Tag::StringTiny
2987                    || key_tag == Tag::StringShort
2988                    || key_tag == Tag::StringLong
2989                    || key_tag == Tag::StringHuge,
2990                "Dict keys must be strings, got {:?}",
2991                key_tag
2992            );
2993            let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
2994            let val = unsafe { read_datum(&mut self.data) };
2995
2996            // if in debug mode, sanity check keys
2997            if cfg!(debug_assertions) {
2998                if let Some(prev_key) = self.prev_key {
2999                    debug_assert!(
3000                        prev_key < key,
3001                        "Dict keys must be unique and given in ascending order: {} came before {}",
3002                        prev_key,
3003                        key
3004                    );
3005                }
3006                self.prev_key = Some(key);
3007            }
3008
3009            Some((key, val))
3010        }
3011    }
3012}
3013
3014impl<'a, T: FromDatum<'a>> Iterator for DatumDictTypedIter<'a, T> {
3015    type Item = (&'a str, T);
3016    fn next(&mut self) -> Option<Self::Item> {
3017        self.inner.next().map(|(k, v)| (k, T::from_datum(v)))
3018    }
3019}
3020
3021impl RowArena {
3022    pub fn new() -> Self {
3023        RowArena {
3024            inner: RefCell::new(vec![]),
3025        }
3026    }
3027
3028    /// Creates a `RowArena` with a hint of how many rows will be created in the arena, to avoid
3029    /// reallocations of its internal vector.
3030    pub fn with_capacity(capacity: usize) -> Self {
3031        RowArena {
3032            inner: RefCell::new(Vec::with_capacity(capacity)),
3033        }
3034    }
3035
3036    /// Does a `reserve` on the underlying `Vec`. Call this when you expect `additional` more datums
3037    /// to be created in this arena.
3038    pub fn reserve(&self, additional: usize) {
3039        self.inner.borrow_mut().reserve(additional);
3040    }
3041
3042    /// Take ownership of `bytes` for the lifetime of the arena.
3043    #[allow(clippy::transmute_ptr_to_ptr)]
3044    pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
3045        let mut inner = self.inner.borrow_mut();
3046        inner.push(bytes);
3047        let owned_bytes = &inner[inner.len() - 1];
3048        unsafe {
3049            // This is safe because:
3050            //   * We only ever append to self.inner, so the byte vector
3051            //     will live as long as the arena.
3052            //   * We return a reference to the byte vector's contents, so it's
3053            //     okay if self.inner reallocates and moves the byte
3054            //     vector.
3055            //   * We don't allow access to the byte vector itself, so it will
3056            //     never reallocate.
3057            transmute::<&[u8], &'a [u8]>(owned_bytes)
3058        }
3059    }
3060
3061    /// Take ownership of `string` for the lifetime of the arena.
3062    pub fn push_string<'a>(&'a self, string: String) -> &'a str {
3063        let owned_bytes = self.push_bytes(string.into_bytes());
3064        unsafe {
3065            // This is safe because we know it was a `String` just before.
3066            std::str::from_utf8_unchecked(owned_bytes)
3067        }
3068    }
3069
3070    /// Take ownership of `row` for the lifetime of the arena, returning a
3071    /// reference to the first datum in the row.
3072    ///
3073    /// If we had an owned datum type, this method would be much clearer, and
3074    /// would be called `push_owned_datum`.
3075    pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
3076        let mut inner = self.inner.borrow_mut();
3077        inner.push(row.data.into_vec());
3078        unsafe {
3079            // This is safe because:
3080            //   * We only ever append to self.inner, so the row data will live
3081            //     as long as the arena.
3082            //   * We force the row data into its own heap allocation--
3083            //     importantly, we do NOT store the SmallVec, which might be
3084            //     storing data inline--so it's okay if self.inner reallocates
3085            //     and moves the row.
3086            //   * We don't allow access to the byte vector itself, so it will
3087            //     never reallocate.
3088            let datum = read_datum(&mut &inner[inner.len() - 1][..]);
3089            transmute::<Datum<'_>, Datum<'a>>(datum)
3090        }
3091    }
3092
3093    /// Equivalent to `push_unary_row` but returns a `DatumNested` rather than a
3094    /// `Datum`.
3095    fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
3096        let mut inner = self.inner.borrow_mut();
3097        inner.push(row.data.into_vec());
3098        unsafe {
3099            // This is safe because:
3100            //   * We only ever append to self.inner, so the row data will live
3101            //     as long as the arena.
3102            //   * We force the row data into its own heap allocation--
3103            //     importantly, we do NOT store the SmallVec, which might be
3104            //     storing data inline--so it's okay if self.inner reallocates
3105            //     and moves the row.
3106            //   * We don't allow access to the byte vector itself, so it will
3107            //     never reallocate.
3108            let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
3109            transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
3110        }
3111    }
3112
3113    /// Convenience function to make a new `Row` containing a single datum, and
3114    /// take ownership of it for the lifetime of the arena
3115    ///
3116    /// ```
3117    /// # use mz_repr::{RowArena, Datum};
3118    /// let arena = RowArena::new();
3119    /// let datum = arena.make_datum(|packer| {
3120    ///   packer.push_list(&[Datum::String("hello"), Datum::String("world")]);
3121    /// });
3122    /// assert_eq!(datum.unwrap_list().iter().collect::<Vec<_>>(), vec![Datum::String("hello"), Datum::String("world")]);
3123    /// ```
3124    pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
3125    where
3126        F: FnOnce(&mut RowPacker),
3127    {
3128        let mut row = Row::default();
3129        f(&mut row.packer());
3130        self.push_unary_row(row)
3131    }
3132
3133    /// Convenience function to build a list datum from an iterator of typed
3134    /// elements and return it as a `DatumList<'a, T>`.
3135    ///
3136    /// By accepting an iterator of `T: Borrow<Datum>` instead of a raw
3137    /// `RowPacker` closure, this guarantees that only elements of type `T`
3138    /// are pushed.
3139    pub fn make_datum_list<'a, T: std::borrow::Borrow<Datum<'a>>>(
3140        &'a self,
3141        iter: impl IntoIterator<Item = T>,
3142    ) -> DatumList<'a, T> {
3143        let datum = self.make_datum(|packer| {
3144            packer.push_list_with(|packer| {
3145                for elem in iter {
3146                    packer.push(*elem.borrow());
3147                }
3148            });
3149        });
3150        DatumList::new(datum.unwrap_list().data())
3151    }
3152
3153    /// Convenience function identical to `make_datum` but instead returns a
3154    /// `DatumNested`.
3155    pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
3156    where
3157        F: FnOnce(&mut RowPacker),
3158    {
3159        let mut row = Row::default();
3160        f(&mut row.packer());
3161        self.push_unary_row_datum_nested(row)
3162    }
3163
3164    /// Like [`RowArena::make_datum`], but the provided closure can return an error.
3165    pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
3166    where
3167        F: FnOnce(&mut RowPacker) -> Result<(), E>,
3168    {
3169        let mut row = Row::default();
3170        f(&mut row.packer())?;
3171        Ok(self.push_unary_row(row))
3172    }
3173
3174    /// Clear the contents of the arena.
3175    pub fn clear(&mut self) {
3176        self.inner.borrow_mut().clear();
3177    }
3178}
3179
3180impl Default for RowArena {
3181    fn default() -> RowArena {
3182        RowArena::new()
3183    }
3184}
3185
3186/// A thread-local row, which can be borrowed and returned.
3187/// # Example
3188///
3189/// Use this type instead of creating a new row:
3190/// ```
3191/// use mz_repr::SharedRow;
3192///
3193/// let mut row_builder = SharedRow::get();
3194/// ```
3195///
3196/// This allows us to reuse an existing row allocation instead of creating a new one or retaining
3197/// an allocation locally. Additionally, we can observe the size of the local row in a central
3198/// place and potentially reallocate to reduce memory needs.
3199///
3200/// # Panic
3201///
3202/// [`SharedRow::get`] panics when trying to obtain multiple references to the shared row.
3203#[derive(Debug)]
3204pub struct SharedRow(Row);
3205
3206impl SharedRow {
3207    thread_local! {
3208        /// A thread-local slot containing a shared Row that can be temporarily used by a function.
3209        /// There can be at most one active user of this Row, which is tracked by the state of the
3210        /// `Option<_>` wrapper. When it is `Some(..)`, the row is available for using. When it
3211        /// is `None`, it is not, and the constructor will panic if a thread attempts to use it.
3212        static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
3213    }
3214
3215    /// Get the shared row.
3216    ///
3217    /// The row's contents are cleared before returning it.
3218    ///
3219    /// # Panic
3220    ///
3221    /// Panics when the row is already borrowed elsewhere.
3222    pub fn get() -> Self {
3223        let mut row = Self::SHARED_ROW
3224            .take()
3225            .expect("attempted to borrow already borrowed SharedRow");
3226        // Clear row
3227        row.packer();
3228        Self(row)
3229    }
3230
3231    /// Gets the shared row and uses it to pack `iter`.
3232    pub fn pack<'a, I, D>(iter: I) -> Row
3233    where
3234        I: IntoIterator<Item = D>,
3235        D: Borrow<Datum<'a>>,
3236    {
3237        let mut row_builder = Self::get();
3238        let mut row_packer = row_builder.packer();
3239        row_packer.extend(iter);
3240        row_builder.clone()
3241    }
3242}
3243
3244impl std::ops::Deref for SharedRow {
3245    type Target = Row;
3246
3247    fn deref(&self) -> &Self::Target {
3248        &self.0
3249    }
3250}
3251
3252impl std::ops::DerefMut for SharedRow {
3253    fn deref_mut(&mut self) -> &mut Self::Target {
3254        &mut self.0
3255    }
3256}
3257
3258impl Drop for SharedRow {
3259    fn drop(&mut self) {
3260        // Take the Row allocation from this instance and put it back in the thread local slot for
3261        // the next user. The Row in `self` is replaced with an empty Row which does not allocate.
3262        Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
3263    }
3264}
3265
3266#[cfg(test)]
3267mod tests {
3268    use std::cmp::Ordering;
3269    use std::collections::hash_map::DefaultHasher;
3270    use std::hash::{Hash, Hasher};
3271
3272    use chrono::{DateTime, NaiveDate};
3273    use itertools::Itertools;
3274    use mz_ore::{assert_err, assert_none};
3275    use ordered_float::OrderedFloat;
3276
3277    use crate::SqlScalarType;
3278
3279    use super::*;
3280
3281    fn hash<T: Hash>(t: &T) -> u64 {
3282        let mut hasher = DefaultHasher::new();
3283        t.hash(&mut hasher);
3284        hasher.finish()
3285    }
3286
3287    #[mz_ore::test]
3288    fn test_assumptions() {
3289        assert_eq!(size_of::<Tag>(), 1);
3290        #[cfg(target_endian = "big")]
3291        {
3292            // if you want to run this on a big-endian cpu, we'll need big-endian versions of the serialization code
3293            assert!(false);
3294        }
3295    }
3296
3297    #[mz_ore::test]
3298    fn miri_test_arena() {
3299        let arena = RowArena::new();
3300
3301        assert_eq!(arena.push_string("".to_owned()), "");
3302        assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
3303
3304        let empty: &[u8] = &[];
3305        assert_eq!(arena.push_bytes(vec![]), empty);
3306        assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
3307
3308        let mut row = Row::default();
3309        let mut packer = row.packer();
3310        packer.push_dict_with(|row| {
3311            row.push(Datum::String("a"));
3312            row.push_list_with(|row| {
3313                row.push(Datum::String("one"));
3314                row.push(Datum::String("two"));
3315                row.push(Datum::String("three"));
3316            });
3317            row.push(Datum::String("b"));
3318            row.push(Datum::String("c"));
3319        });
3320        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
3321    }
3322
3323    #[mz_ore::test]
3324    fn miri_test_round_trip() {
3325        fn round_trip(datums: Vec<Datum>) {
3326            let row = Row::pack(datums.clone());
3327
3328            // When run under miri this catches undefined bytes written to data
3329            // eg by calling push_copy! on a type which contains undefined padding values
3330            println!("{:?}", row.data());
3331
3332            let datums2 = row.iter().collect::<Vec<_>>();
3333            let datums3 = row.unpack();
3334            assert_eq!(datums, datums2);
3335            assert_eq!(datums, datums3);
3336        }
3337
3338        round_trip(vec![]);
3339        round_trip(
3340            SqlScalarType::enumerate()
3341                .iter()
3342                .flat_map(|r#type| r#type.interesting_datums())
3343                .collect(),
3344        );
3345        round_trip(vec![
3346            Datum::Null,
3347            Datum::Null,
3348            Datum::False,
3349            Datum::True,
3350            Datum::Int16(-21),
3351            Datum::Int32(-42),
3352            Datum::Int64(-2_147_483_648 - 42),
3353            Datum::UInt8(0),
3354            Datum::UInt8(1),
3355            Datum::UInt16(0),
3356            Datum::UInt16(1),
3357            Datum::UInt16(1 << 8),
3358            Datum::UInt32(0),
3359            Datum::UInt32(1),
3360            Datum::UInt32(1 << 8),
3361            Datum::UInt32(1 << 16),
3362            Datum::UInt32(1 << 24),
3363            Datum::UInt64(0),
3364            Datum::UInt64(1),
3365            Datum::UInt64(1 << 8),
3366            Datum::UInt64(1 << 16),
3367            Datum::UInt64(1 << 24),
3368            Datum::UInt64(1 << 32),
3369            Datum::UInt64(1 << 40),
3370            Datum::UInt64(1 << 48),
3371            Datum::UInt64(1 << 56),
3372            Datum::Float32(OrderedFloat::from(-42.12)),
3373            Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
3374            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
3375            Datum::Timestamp(
3376                CheckedTimestamp::from_timestamplike(
3377                    NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
3378                        .unwrap()
3379                        .and_hms_opt(14, 32, 11)
3380                        .unwrap(),
3381                )
3382                .unwrap(),
3383            ),
3384            Datum::TimestampTz(
3385                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3386                    .unwrap(),
3387            ),
3388            Datum::Interval(Interval {
3389                months: 312,
3390                ..Default::default()
3391            }),
3392            Datum::Interval(Interval::new(0, 0, 1_012_312)),
3393            Datum::Bytes(&[]),
3394            Datum::Bytes(&[0, 2, 1, 255]),
3395            Datum::String(""),
3396            Datum::String("العَرَبِيَّة"),
3397        ]);
3398    }
3399
3400    #[mz_ore::test]
3401    fn test_array() {
3402        // Construct an array using `Row::push_array` and verify that it unpacks
3403        // correctly.
3404        const DIM: ArrayDimension = ArrayDimension {
3405            lower_bound: 2,
3406            length: 2,
3407        };
3408        let mut row = Row::default();
3409        let mut packer = row.packer();
3410        packer
3411            .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3412            .unwrap();
3413        let arr1 = row.unpack_first().unwrap_array();
3414        assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3415        assert_eq!(
3416            arr1.elements().into_iter().collect::<Vec<_>>(),
3417            vec![Datum::Int32(1), Datum::Int32(2)]
3418        );
3419
3420        // Pack a previously-constructed `Datum::Array` and verify that it
3421        // unpacks correctly.
3422        let row = Row::pack_slice(&[Datum::Array(arr1)]);
3423        let arr2 = row.unpack_first().unwrap_array();
3424        assert_eq!(arr1, arr2);
3425    }
3426
3427    #[mz_ore::test]
3428    fn test_multidimensional_array() {
3429        let datums = vec![
3430            Datum::Int32(1),
3431            Datum::Int32(2),
3432            Datum::Int32(3),
3433            Datum::Int32(4),
3434            Datum::Int32(5),
3435            Datum::Int32(6),
3436            Datum::Int32(7),
3437            Datum::Int32(8),
3438        ];
3439
3440        let mut row = Row::default();
3441        let mut packer = row.packer();
3442        packer
3443            .try_push_array(
3444                &[
3445                    ArrayDimension {
3446                        lower_bound: 1,
3447                        length: 1,
3448                    },
3449                    ArrayDimension {
3450                        lower_bound: 1,
3451                        length: 4,
3452                    },
3453                    ArrayDimension {
3454                        lower_bound: 1,
3455                        length: 2,
3456                    },
3457                ],
3458                &datums,
3459            )
3460            .unwrap();
3461        let array = row.unpack_first().unwrap_array();
3462        assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3463    }
3464
3465    #[mz_ore::test]
3466    fn test_array_max_dimensions() {
3467        let mut row = Row::default();
3468        let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3469
3470        // An array with one too many dimensions should be rejected.
3471        let res = row.packer().try_push_array(
3472            &vec![
3473                ArrayDimension {
3474                    lower_bound: 1,
3475                    length: 1
3476                };
3477                max_dims + 1
3478            ],
3479            vec![Datum::Int32(4)],
3480        );
3481        assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3482        assert!(row.data.is_empty());
3483
3484        // An array with exactly the maximum allowable dimensions should be
3485        // accepted.
3486        row.packer()
3487            .try_push_array(
3488                &vec![
3489                    ArrayDimension {
3490                        lower_bound: 1,
3491                        length: 1
3492                    };
3493                    max_dims
3494                ],
3495                vec![Datum::Int32(4)],
3496            )
3497            .unwrap();
3498    }
3499
3500    #[mz_ore::test]
3501    fn test_array_wrong_cardinality() {
3502        let mut row = Row::default();
3503        let res = row.packer().try_push_array(
3504            &[
3505                ArrayDimension {
3506                    lower_bound: 1,
3507                    length: 2,
3508                },
3509                ArrayDimension {
3510                    lower_bound: 1,
3511                    length: 3,
3512                },
3513            ],
3514            vec![Datum::Int32(1), Datum::Int32(2)],
3515        );
3516        assert_eq!(
3517            res,
3518            Err(InvalidArrayError::WrongCardinality {
3519                actual: 2,
3520                expected: 6,
3521            })
3522        );
3523        assert!(row.data.is_empty());
3524    }
3525
3526    #[mz_ore::test]
3527    fn test_nesting() {
3528        let mut row = Row::default();
3529        row.packer().push_dict_with(|row| {
3530            row.push(Datum::String("favourites"));
3531            row.push_list_with(|row| {
3532                row.push(Datum::String("ice cream"));
3533                row.push(Datum::String("oreos"));
3534                row.push(Datum::String("cheesecake"));
3535            });
3536            row.push(Datum::String("name"));
3537            row.push(Datum::String("bob"));
3538        });
3539
3540        let mut iter = row.unpack_first().unwrap_map().iter();
3541
3542        let (k, v) = iter.next().unwrap();
3543        assert_eq!(k, "favourites");
3544        assert_eq!(
3545            v.unwrap_list().iter().collect::<Vec<_>>(),
3546            vec![
3547                Datum::String("ice cream"),
3548                Datum::String("oreos"),
3549                Datum::String("cheesecake"),
3550            ]
3551        );
3552
3553        let (k, v) = iter.next().unwrap();
3554        assert_eq!(k, "name");
3555        assert_eq!(v, Datum::String("bob"));
3556    }
3557
3558    #[mz_ore::test]
3559    fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3560        let pack = |ok| {
3561            let mut row = Row::default();
3562            row.packer().push_dict_with(|row| {
3563                if ok {
3564                    row.push(Datum::String("key"));
3565                    row.push(Datum::Int32(42));
3566                    Ok(7)
3567                } else {
3568                    Err("fail")
3569                }
3570            })?;
3571            Ok(row)
3572        };
3573
3574        assert_eq!(pack(false), Err("fail"));
3575
3576        let row = pack(true)?;
3577        let mut dict = row.unpack_first().unwrap_map().iter();
3578        assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3579        assert_eq!(dict.next(), None);
3580
3581        Ok(())
3582    }
3583
3584    #[mz_ore::test]
3585    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
3586    fn test_datum_sizes() {
3587        let arena = RowArena::new();
3588
3589        // Test the claims about various datum sizes.
3590        let values_of_interest = vec![
3591            Datum::Null,
3592            Datum::False,
3593            Datum::Int16(0),
3594            Datum::Int32(0),
3595            Datum::Int64(0),
3596            Datum::UInt8(0),
3597            Datum::UInt8(1),
3598            Datum::UInt16(0),
3599            Datum::UInt16(1),
3600            Datum::UInt16(1 << 8),
3601            Datum::UInt32(0),
3602            Datum::UInt32(1),
3603            Datum::UInt32(1 << 8),
3604            Datum::UInt32(1 << 16),
3605            Datum::UInt32(1 << 24),
3606            Datum::UInt64(0),
3607            Datum::UInt64(1),
3608            Datum::UInt64(1 << 8),
3609            Datum::UInt64(1 << 16),
3610            Datum::UInt64(1 << 24),
3611            Datum::UInt64(1 << 32),
3612            Datum::UInt64(1 << 40),
3613            Datum::UInt64(1 << 48),
3614            Datum::UInt64(1 << 56),
3615            Datum::Float32(OrderedFloat(0.0)),
3616            Datum::Float64(OrderedFloat(0.0)),
3617            Datum::from(numeric::Numeric::from(0)),
3618            Datum::from(numeric::Numeric::from(1000)),
3619            Datum::from(numeric::Numeric::from(9999)),
3620            Datum::Date(
3621                NaiveDate::from_ymd_opt(1, 1, 1)
3622                    .unwrap()
3623                    .try_into()
3624                    .unwrap(),
3625            ),
3626            Datum::Timestamp(
3627                CheckedTimestamp::from_timestamplike(
3628                    DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3629                )
3630                .unwrap(),
3631            ),
3632            Datum::TimestampTz(
3633                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3634                    .unwrap(),
3635            ),
3636            Datum::Interval(Interval::default()),
3637            Datum::Bytes(&[]),
3638            Datum::String(""),
3639            Datum::JsonNull,
3640            Datum::Range(Range { inner: None }),
3641            arena.make_datum(|packer| {
3642                packer
3643                    .push_range(Range::new(Some((
3644                        RangeLowerBound::new(Datum::Int32(-1), true),
3645                        RangeUpperBound::new(Datum::Int32(1), true),
3646                    ))))
3647                    .unwrap();
3648            }),
3649        ];
3650        for value in values_of_interest {
3651            if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3652                panic!("Disparity in claimed size for {:?}", value);
3653            }
3654        }
3655    }
3656
3657    #[mz_ore::test]
3658    fn test_range_errors() {
3659        fn test_range_errors_inner<'a>(
3660            datums: Vec<Vec<Datum<'a>>>,
3661        ) -> Result<(), InvalidRangeError> {
3662            let mut row = Row::default();
3663            let row_len = row.byte_len();
3664            let mut packer = row.packer();
3665            let r = packer.push_range_with(
3666                RangeLowerBound {
3667                    inclusive: true,
3668                    bound: Some(|row: &mut RowPacker| {
3669                        for d in &datums[0] {
3670                            row.push(d);
3671                        }
3672                        Ok(())
3673                    }),
3674                },
3675                RangeUpperBound {
3676                    inclusive: true,
3677                    bound: Some(|row: &mut RowPacker| {
3678                        for d in &datums[1] {
3679                            row.push(d);
3680                        }
3681                        Ok(())
3682                    }),
3683                },
3684            );
3685
3686            assert_eq!(row_len, row.byte_len());
3687
3688            r
3689        }
3690
3691        for panicking_case in [
3692            vec![vec![Datum::Int32(1)], vec![]],
3693            vec![
3694                vec![Datum::Int32(1), Datum::Int32(2)],
3695                vec![Datum::Int32(3)],
3696            ],
3697            vec![
3698                vec![Datum::Int32(1)],
3699                vec![Datum::Int32(2), Datum::Int32(3)],
3700            ],
3701            vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3702            vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3703            vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3704            vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3705        ] {
3706            #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
3707            let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3708            assert_err!(result);
3709        }
3710
3711        let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3712        assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3713    }
3714
3715    /// Lists have a variable-length encoding for their lengths. We test each case here.
3716    #[mz_ore::test]
3717    #[cfg_attr(miri, ignore)] // slow
3718    fn test_list_encoding() {
3719        fn test_list_encoding_inner(len: usize) {
3720            let list_elem = |i: usize| {
3721                if i % 2 == 0 {
3722                    Datum::False
3723                } else {
3724                    Datum::True
3725                }
3726            };
3727            let mut row = Row::default();
3728            {
3729                // Push some stuff.
3730                let mut packer = row.packer();
3731                packer.push(Datum::String("start"));
3732                packer.push_list_with(|packer| {
3733                    for i in 0..len {
3734                        packer.push(list_elem(i));
3735                    }
3736                });
3737                packer.push(Datum::String("end"));
3738            }
3739            // Check that we read back exactly what we pushed.
3740            let mut row_it = row.iter();
3741            assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3742            match row_it.next().unwrap() {
3743                Datum::List(list) => {
3744                    let mut list_it = list.iter();
3745                    for i in 0..len {
3746                        assert_eq!(list_it.next().unwrap(), list_elem(i));
3747                    }
3748                    assert_none!(list_it.next());
3749                }
3750                _ => panic!("expected Datum::List"),
3751            }
3752            assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3753            assert_none!(row_it.next());
3754        }
3755
3756        test_list_encoding_inner(0);
3757        test_list_encoding_inner(1);
3758        test_list_encoding_inner(10);
3759        test_list_encoding_inner(TINY - 1); // tiny
3760        test_list_encoding_inner(TINY + 1); // short
3761        test_list_encoding_inner(SHORT + 1); // long
3762
3763        // The biggest one takes 40 s on my laptop, probably not worth it.
3764        //test_list_encoding_inner(LONG + 1); // huge
3765    }
3766
3767    /// Demonstrates that DatumList's Eq (bytewise) and Ord (datum-by-datum) are now consistent.
3768    /// A list containing -0.0 and one containing +0.0 have different byte representations
3769    /// (IEEE 754 distinguishes them), originally Eq says they are not equal. But after
3770    /// using the new Datum::cmp, Eq says they are equal, which matches what Ord
3771    /// compares via iter().cmp(other.iter()), and them as equal.
3772    #[mz_ore::test]
3773    fn test_datum_list_eq_ord_consistency() {
3774        // Build list containing +0.0
3775        let mut row_pos = Row::default();
3776        row_pos.packer().push_list_with(|p| {
3777            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3778        });
3779        let list_pos = row_pos.unpack_first().unwrap_list();
3780
3781        // Build list containing -0.0 (distinct bit pattern from +0.0)
3782        let mut row_neg = Row::default();
3783        row_neg.packer().push_list_with(|p| {
3784            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3785        });
3786        let list_neg = row_neg.unpack_first().unwrap_list();
3787
3788        // Eq is bytewise: different encodings => not equal
3789        // This was a bug in the past, so we test it.
3790        assert_eq!(
3791            list_pos, list_neg,
3792            "Eq should see different encodings as equal"
3793        );
3794
3795        // Ord is datum-by-datum: -0.0 and +0.0 compare equal as Datums
3796        assert_eq!(
3797            list_pos.cmp(&list_neg),
3798            Ordering::Equal,
3799            "Ord (datum-by-datum) should see -0.0 and +0.0 as equal"
3800        );
3801    }
3802
3803    /// Demonstrates that DatumMap's derived Eq (bytewise) can make maps with equal keys and
3804    /// values compare equal when values have different encodings (e.g. -0.0 vs +0.0).
3805    #[mz_ore::test]
3806    fn test_datum_map_eq_bytewise_consistency() {
3807        // Build map {"k": +0.0}
3808        let mut row_pos = Row::default();
3809        row_pos.packer().push_dict_with(|p| {
3810            p.push(Datum::String("k"));
3811            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3812        });
3813        let map_pos = row_pos.unpack_first().unwrap_map();
3814
3815        // Build map {"k": -0.0}
3816        let mut row_neg = Row::default();
3817        row_neg.packer().push_dict_with(|p| {
3818            p.push(Datum::String("k"));
3819            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3820        });
3821        let map_neg = row_neg.unpack_first().unwrap_map();
3822
3823        // Same keys and semantically equal values, but Eq (bytewise) says not equal
3824        assert_eq!(
3825            map_pos, map_neg,
3826            "DatumMap Eq is semantic; -0.0 and +0.0 have different encodings but are equal"
3827        );
3828        // Verify they have the same logical content
3829        let entries_pos: Vec<_> = map_pos.iter().collect();
3830        let entries_neg: Vec<_> = map_neg.iter().collect();
3831        assert_eq!(entries_pos.len(), entries_neg.len());
3832        for ((k1, v1), (k2, v2)) in entries_pos.iter().zip_eq(entries_neg.iter()) {
3833            assert_eq!(k1, k2);
3834            assert_eq!(
3835                v1, v2,
3836                "Datum-level comparison treats -0.0 and +0.0 as equal"
3837            );
3838        }
3839    }
3840
3841    /// Hash must agree with Eq: equal lists must have the same hash.
3842    #[mz_ore::test]
3843    fn test_datum_list_hash_consistency() {
3844        // Equal lists (including -0.0 vs +0.0) must hash the same
3845        let mut row_pos = Row::default();
3846        row_pos.packer().push_list_with(|p| {
3847            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3848        });
3849        let list_pos = row_pos.unpack_first().unwrap_list();
3850
3851        let mut row_neg = Row::default();
3852        row_neg.packer().push_list_with(|p| {
3853            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3854        });
3855        let list_neg = row_neg.unpack_first().unwrap_list();
3856
3857        assert_eq!(list_pos, list_neg);
3858        assert_eq!(
3859            hash(&list_pos),
3860            hash(&list_neg),
3861            "equal lists must have same hash"
3862        );
3863
3864        // Unequal lists should have different hashes (with asymptotic probability 1)
3865        let mut row_a = Row::default();
3866        row_a.packer().push_list_with(|p| {
3867            p.push(Datum::Int32(1));
3868            p.push(Datum::Int32(2));
3869        });
3870        let list_a = row_a.unpack_first().unwrap_list();
3871
3872        let mut row_b = Row::default();
3873        row_b.packer().push_list_with(|p| {
3874            p.push(Datum::Int32(1));
3875            p.push(Datum::Int32(3));
3876        });
3877        let list_b = row_b.unpack_first().unwrap_list();
3878
3879        assert_ne!(list_a, list_b);
3880        assert_ne!(
3881            hash(&list_a),
3882            hash(&list_b),
3883            "unequal lists must have different hashes"
3884        );
3885    }
3886
3887    /// Ord/PartialOrd for DatumList: less, equal, greater.
3888    #[mz_ore::test]
3889    fn test_datum_list_ordering() {
3890        let mut row_12 = Row::default();
3891        row_12.packer().push_list_with(|p| {
3892            p.push(Datum::Int32(1));
3893            p.push(Datum::Int32(2));
3894        });
3895        let list_12 = row_12.unpack_first().unwrap_list();
3896
3897        let mut row_13 = Row::default();
3898        row_13.packer().push_list_with(|p| {
3899            p.push(Datum::Int32(1));
3900            p.push(Datum::Int32(3));
3901        });
3902        let list_13 = row_13.unpack_first().unwrap_list();
3903
3904        let mut row_123 = Row::default();
3905        row_123.packer().push_list_with(|p| {
3906            p.push(Datum::Int32(1));
3907            p.push(Datum::Int32(2));
3908            p.push(Datum::Int32(3));
3909        });
3910        let list_123 = row_123.unpack_first().unwrap_list();
3911
3912        // [1, 2] < [1, 3] due to the second element being different
3913        assert_eq!(list_12.cmp(&list_13), Ordering::Less);
3914        assert_eq!(list_13.cmp(&list_12), Ordering::Greater);
3915        assert_eq!(list_12.cmp(&list_12), Ordering::Equal);
3916        // shorter prefix compares less
3917        assert_eq!(list_12.cmp(&list_123), Ordering::Less);
3918    }
3919
3920    /// Hash must agree with Eq: equal maps must have the same hash.
3921    #[mz_ore::test]
3922    fn test_datum_map_hash_consistency() {
3923        let mut row_pos = Row::default();
3924        row_pos.packer().push_dict_with(|p| {
3925            p.push(Datum::String("x"));
3926            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3927        });
3928        let map_pos = row_pos.unpack_first().unwrap_map();
3929
3930        let mut row_neg = Row::default();
3931        row_neg.packer().push_dict_with(|p| {
3932            p.push(Datum::String("x"));
3933            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3934        });
3935        let map_neg = row_neg.unpack_first().unwrap_map();
3936
3937        assert_eq!(map_pos, map_neg);
3938        assert_eq!(
3939            hash(&map_pos),
3940            hash(&map_neg),
3941            "equal maps must have same hash"
3942        );
3943
3944        let mut row_a = Row::default();
3945        row_a.packer().push_dict_with(|p| {
3946            p.push(Datum::String("a"));
3947            p.push(Datum::Int32(1));
3948        });
3949        let map_a = row_a.unpack_first().unwrap_map();
3950
3951        let mut row_b = Row::default();
3952        row_b.packer().push_dict_with(|p| {
3953            p.push(Datum::String("a"));
3954            p.push(Datum::Int32(2));
3955        });
3956        let map_b = row_b.unpack_first().unwrap_map();
3957
3958        assert_ne!(map_a, map_b);
3959        assert_ne!(
3960            hash(&map_a),
3961            hash(&map_b),
3962            "unequal maps must have different hashes"
3963        );
3964    }
3965
3966    /// Ord/PartialOrd for DatumMap: less, equal, greater (by key then value).
3967    #[mz_ore::test]
3968    fn test_datum_map_ordering() {
3969        let mut row_a1 = Row::default();
3970        row_a1.packer().push_dict_with(|p| {
3971            p.push(Datum::String("a"));
3972            p.push(Datum::Int32(1));
3973        });
3974        let map_a1 = row_a1.unpack_first().unwrap_map();
3975
3976        let mut row_a2 = Row::default();
3977        row_a2.packer().push_dict_with(|p| {
3978            p.push(Datum::String("a"));
3979            p.push(Datum::Int32(2));
3980        });
3981        let map_a2 = row_a2.unpack_first().unwrap_map();
3982
3983        let mut row_b1 = Row::default();
3984        row_b1.packer().push_dict_with(|p| {
3985            p.push(Datum::String("b"));
3986            p.push(Datum::Int32(1));
3987        });
3988        let map_b1 = row_b1.unpack_first().unwrap_map();
3989
3990        assert_eq!(map_a1.cmp(&map_a2), Ordering::Less);
3991        assert_eq!(map_a2.cmp(&map_a1), Ordering::Greater);
3992        assert_eq!(map_a1.cmp(&map_a1), Ordering::Equal);
3993        assert_eq!(map_a1.cmp(&map_b1), Ordering::Less); // "a" < "b"
3994    }
3995
3996    /// Datum puts Null last in the enum so that nulls sort last (PostgreSQL default).
3997    /// This ordering is used when comparing DatumList/DatumMap (e.g. jsonb_agg tiebreaker).
3998    #[mz_ore::test]
3999    fn test_datum_list_and_map_null_sorts_last() {
4000        // DatumList: [1] < [null] so non-null sorts before null
4001        let mut row_list_1 = Row::default();
4002        row_list_1
4003            .packer()
4004            .push_list_with(|p| p.push(Datum::Int32(1)));
4005        let list_1 = row_list_1.unpack_first().unwrap_list();
4006
4007        let mut row_list_null = Row::default();
4008        row_list_null
4009            .packer()
4010            .push_list_with(|p| p.push(Datum::Null));
4011        let list_null = row_list_null.unpack_first().unwrap_list();
4012
4013        assert_eq!(list_1.cmp(&list_null), Ordering::Less);
4014        assert_eq!(list_null.cmp(&list_1), Ordering::Greater);
4015
4016        // DatumMap: {"k": 1} < {"k": null} so non-null sorts before null (same as jsonb_agg)
4017        let mut row_map_1 = Row::default();
4018        row_map_1.packer().push_dict_with(|p| {
4019            p.push(Datum::String("k"));
4020            p.push(Datum::Int32(1));
4021        });
4022        let map_1 = row_map_1.unpack_first().unwrap_map();
4023
4024        let mut row_map_null = Row::default();
4025        row_map_null.packer().push_dict_with(|p| {
4026            p.push(Datum::String("k"));
4027            p.push(Datum::Null);
4028        });
4029        let map_null = row_map_null.unpack_first().unwrap_map();
4030
4031        assert_eq!(map_1.cmp(&map_null), Ordering::Less);
4032        assert_eq!(map_null.cmp(&map_1), Ordering::Greater);
4033    }
4034}