Skip to main content

mz_repr/
row.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::cell::{Cell, RefCell};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::hash::{Hash, Hasher};
16use std::marker::PhantomData;
17use std::mem::{size_of, transmute};
18use std::ops::Deref;
19use std::str;
20
21use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
22use compact_bytes::CompactBytes;
23use mz_ore::cast::{CastFrom, ReinterpretCast};
24use mz_ore::soft_assert_no_log;
25use mz_ore::vec::Vector;
26use mz_persist_types::Codec64;
27use num_enum::{IntoPrimitive, TryFromPrimitive};
28use ordered_float::OrderedFloat;
29#[cfg(any(test, feature = "proptest"))]
30use proptest::prelude::*;
31#[cfg(any(test, feature = "proptest"))]
32use proptest::strategy::{BoxedStrategy, Strategy};
33use serde::{Deserialize, Serialize};
34use uuid::Uuid;
35
36use crate::adt::array::{
37    Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
38};
39use crate::adt::date::Date;
40use crate::adt::interval::Interval;
41use crate::adt::mz_acl_item::{AclItem, MzAclItem};
42use crate::adt::numeric;
43use crate::adt::numeric::Numeric;
44use crate::adt::range::{
45    self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
46};
47use crate::adt::timestamp::CheckedTimestamp;
48#[cfg(any(test, feature = "proptest"))]
49use crate::scalar::arb_datum;
50use crate::scalar::{DatumKind, SqlScalarType};
51use crate::{Datum, RelationDesc, Timestamp};
52
53pub(crate) mod encode;
54pub mod iter;
55
56include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
57
58/// A packed representation for `Datum`s.
59///
60/// `Datum` is easy to work with but very space inefficient. A `Datum::Int32(42)`
61/// is laid out in memory like this:
62///
63///   tag: 3
64///   padding: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
65///   data: 0 0 0 42
66///   padding: 0 0 0 0 0 0 0 0 0 0 0 0
67///
68/// For a total of 32 bytes! The second set of padding is needed in case we were
69/// to write a 16-byte datum into this location. The first set of padding is
70/// needed to align that hypothetical decimal to a 16 bytes boundary.
71///
72/// A `Row` stores zero or more `Datum`s without any padding. We avoid the need
73/// for the first set of padding by only providing access to the `Datum`s via
74/// calls to `ptr::read_unaligned`, which on modern x86 is barely penalized. We
75/// avoid the need for the second set of padding by not providing mutable access
76/// to the `Datum`. Instead, `Row` is append-only.
77///
78/// A `Row` can be built from a collection of `Datum`s using `Row::pack`, but it
79/// is more efficient to use `Row::pack_slice` so that a right-sized allocation
80/// can be created. If that is not possible, consider using the row buffer
81/// pattern: allocate one row, pack into it, and then call [`Row::clone`] to
82/// receive a copy of that row, leaving behind the original allocation to pack
83/// future rows.
84///
85/// Creating a row via [`Row::pack_slice`]:
86///
87/// ```
88/// # use mz_repr::{Row, Datum};
89/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
90/// assert_eq!(row.unpack(), vec![Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)])
91/// ```
92///
93/// `Row`s can be unpacked by iterating over them:
94///
95/// ```
96/// # use mz_repr::{Row, Datum};
97/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
98/// assert_eq!(row.iter().nth(1).unwrap(), Datum::Int32(1));
99/// ```
100///
101/// If you want random access to the `Datum`s in a `Row`, use `Row::unpack` to create a `Vec<Datum>`
102/// ```
103/// # use mz_repr::{Row, Datum};
104/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
105/// let datums = row.unpack();
106/// assert_eq!(datums[1], Datum::Int32(1));
107/// ```
108///
109/// # Performance
110///
111/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
112/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
113/// avoids the allocations involved in `Row::new()`.
114#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
115pub struct Row {
116    data: CompactBytes,
117}
118
119impl Row {
120    const SIZE: usize = CompactBytes::MAX_INLINE;
121
122    /// A variant of `Row::from_proto` that allows for reuse of internal allocs
123    /// and validates the decoding against a provided [`RelationDesc`].
124    pub fn decode_from_proto(
125        &mut self,
126        proto: &ProtoRow,
127        desc: &RelationDesc,
128    ) -> Result<(), String> {
129        let mut packer = self.packer();
130        for (col_idx, _, _) in desc.iter_all() {
131            let d = match proto.datums.get(col_idx.to_raw()) {
132                Some(x) => x,
133                None => {
134                    packer.push(Datum::Null);
135                    continue;
136                }
137            };
138            packer.try_push_proto(d)?;
139        }
140
141        Ok(())
142    }
143
144    /// Allocate an empty `Row` with a pre-allocated capacity.
145    #[inline]
146    pub fn with_capacity(cap: usize) -> Self {
147        Self {
148            data: CompactBytes::with_capacity(cap),
149        }
150    }
151
152    /// Create an empty `Row`.
153    #[inline]
154    pub const fn empty() -> Self {
155        Self {
156            data: CompactBytes::empty(),
157        }
158    }
159
160    /// Creates a new row from supplied bytes.
161    ///
162    /// # Safety
163    ///
164    /// This method relies on `data` being an appropriate row encoding, and can
165    /// result in unsafety if this is not the case.
166    pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
167        Row {
168            data: CompactBytes::new(data),
169        }
170    }
171
172    /// Constructs a [`RowPacker`] that will pack datums into this row's
173    /// allocation.
174    ///
175    /// This method clears the existing contents of the row, but retains the
176    /// allocation.
177    pub fn packer(&mut self) -> RowPacker<'_> {
178        self.clear();
179        RowPacker { row: self }
180    }
181
182    /// Take some `Datum`s and pack them into a `Row`.
183    ///
184    /// This method builds a `Row` by repeatedly increasing the backing
185    /// allocation. If the contents of the iterator are known ahead of
186    /// time, consider [`Row::with_capacity`] to right-size the allocation
187    /// first, and then [`RowPacker::extend`] to populate it with `Datum`s.
188    /// This avoids the repeated allocation resizing and copying.
189    pub fn pack<'a, I, D>(iter: I) -> Row
190    where
191        I: IntoIterator<Item = D>,
192        D: Borrow<Datum<'a>>,
193    {
194        let mut row = Row::default();
195        row.packer().extend(iter);
196        row
197    }
198
199    /// Use `self` to pack `iter`, and then clone the result.
200    ///
201    /// This is a convenience method meant to reduce boilerplate around row
202    /// formation.
203    pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
204    where
205        I: IntoIterator<Item = D>,
206        D: Borrow<Datum<'a>>,
207    {
208        self.packer().extend(iter);
209        self.clone()
210    }
211
212    /// Like [`Row::pack`], but the provided iterator is allowed to produce an
213    /// error, in which case the packing operation is aborted and the error
214    /// returned.
215    pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
216    where
217        I: IntoIterator<Item = Result<D, E>>,
218        D: Borrow<Datum<'a>>,
219    {
220        let mut row = Row::default();
221        row.packer().try_extend(iter)?;
222        Ok(row)
223    }
224
225    /// Pack a slice of `Datum`s into a `Row`.
226    ///
227    /// This method has the advantage over `pack` that it can determine the required
228    /// allocation before packing the elements, ensuring only one allocation and no
229    /// redundant copies required.
230    pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
231        // Pre-allocate the needed number of bytes.
232        let mut row = Row::with_capacity(datums_size(slice.iter()));
233        row.packer().extend(slice.iter());
234        row
235    }
236
237    /// Returns the total amount of bytes used by this row.
238    pub fn byte_len(&self) -> usize {
239        let heap_size = if self.data.spilled() {
240            self.data.len()
241        } else {
242            0
243        };
244        let inline_size = std::mem::size_of::<Self>();
245        inline_size.saturating_add(heap_size)
246    }
247
248    /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself.
249    pub fn data_len(&self) -> usize {
250        self.data.len()
251    }
252
253    /// Returns the total capacity in bytes used by this row.
254    pub fn byte_capacity(&self) -> usize {
255        self.data.capacity()
256    }
257
258    /// Extracts a Row slice containing the entire [`Row`].
259    #[inline]
260    pub fn as_row_ref(&self) -> &RowRef {
261        // SAFETY: `Row` contains valid row data, by construction.
262        unsafe { RowRef::from_slice(self.data.as_slice()) }
263    }
264
265    /// Clear the contents of the [`Row`], leaving any allocation in place.
266    #[inline]
267    fn clear(&mut self) {
268        self.data.clear();
269    }
270}
271
272impl Borrow<RowRef> for Row {
273    #[inline]
274    fn borrow(&self) -> &RowRef {
275        self.as_row_ref()
276    }
277}
278
279impl AsRef<RowRef> for Row {
280    #[inline]
281    fn as_ref(&self) -> &RowRef {
282        self.as_row_ref()
283    }
284}
285
286impl Deref for Row {
287    type Target = RowRef;
288
289    #[inline]
290    fn deref(&self) -> &Self::Target {
291        self.as_row_ref()
292    }
293}
294
295// Nothing depends on Row being exactly 24, we just want to add visibility to the size.
296static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
297
298impl Clone for Row {
299    fn clone(&self) -> Self {
300        Row {
301            data: self.data.clone(),
302        }
303    }
304
305    fn clone_from(&mut self, source: &Self) {
306        self.data.clone_from(&source.data);
307    }
308}
309
310// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
311impl std::hash::Hash for Row {
312    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
313        self.as_row_ref().hash(state)
314    }
315}
316
317#[cfg(any(test, feature = "proptest"))]
318impl Arbitrary for Row {
319    type Parameters = prop::collection::SizeRange;
320    type Strategy = BoxedStrategy<Row>;
321
322    fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
323        prop::collection::vec(arb_datum(true), size)
324            .prop_map(|items| {
325                let mut row = Row::default();
326                let mut packer = row.packer();
327                for item in items.iter() {
328                    let datum: Datum<'_> = item.into();
329                    packer.push(datum);
330                }
331                row
332            })
333            .boxed()
334    }
335}
336
337impl PartialOrd for Row {
338    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
339        Some(self.cmp(other))
340    }
341}
342
343impl Ord for Row {
344    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
345        self.as_ref().cmp(other.as_ref())
346    }
347}
348
349#[allow(missing_debug_implementations)]
350mod columnation {
351    use columnation::{Columnation, Region};
352    use mz_ore::region::LgAllocRegion;
353
354    use crate::Row;
355
356    /// Region allocation for `Row` data.
357    ///
358    /// Content bytes are stored in stable contiguous memory locations,
359    /// and then a `Row` referencing them is falsified.
360    pub struct RowStack {
361        region: LgAllocRegion<u8>,
362    }
363
364    impl RowStack {
365        const LIMIT: usize = 2 << 20;
366    }
367
368    // Implement `Default` manually to specify a region allocation limit.
369    impl Default for RowStack {
370        fn default() -> Self {
371            Self {
372                // Limit the region size to 2MiB.
373                region: LgAllocRegion::with_limit(Self::LIMIT),
374            }
375        }
376    }
377
378    impl Columnation for Row {
379        type InnerRegion = RowStack;
380    }
381
382    impl Region for RowStack {
383        type Item = Row;
384        #[inline]
385        fn clear(&mut self) {
386            self.region.clear();
387        }
388        #[inline(always)]
389        unsafe fn copy(&mut self, item: &Row) -> Row {
390            if item.data.spilled() {
391                let bytes = self.region.copy_slice(&item.data[..]);
392                Row {
393                    data: compact_bytes::CompactBytes::from_raw_parts(
394                        bytes.as_mut_ptr(),
395                        item.data.len(),
396                        item.data.capacity(),
397                    ),
398                }
399            } else {
400                item.clone()
401            }
402        }
403
404        fn reserve_items<'a, I>(&mut self, items: I)
405        where
406            Self: 'a,
407            I: Iterator<Item = &'a Self::Item> + Clone,
408        {
409            let size = items
410                .filter(|row| row.data.spilled())
411                .map(|row| row.data.len())
412                .sum();
413            let size = std::cmp::min(size, Self::LIMIT);
414            self.region.reserve(size);
415        }
416
417        fn reserve_regions<'a, I>(&mut self, regions: I)
418        where
419            Self: 'a,
420            I: Iterator<Item = &'a Self> + Clone,
421        {
422            let size = regions.map(|r| r.region.len()).sum();
423            let size = std::cmp::min(size, Self::LIMIT);
424            self.region.reserve(size);
425        }
426
427        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
428            self.region.heap_size(callback)
429        }
430    }
431}
432
433mod columnar {
434    use columnar::common::PushIndexAs;
435    use columnar::{
436        AsBytes, Borrow, Clear, Columnar, Container, FromBytes, Index, IndexAs, Len, Push,
437    };
438    use mz_ore::cast::CastFrom;
439    use std::ops::Range;
440
441    use crate::{Row, RowRef};
442
443    #[derive(
444        Copy,
445        Clone,
446        Debug,
447        Default,
448        PartialEq,
449        serde::Serialize,
450        serde::Deserialize
451    )]
452    pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
453        /// Bounds container; provides indexed access to offsets.
454        bounds: BC,
455        /// Values container; provides slice access to bytes.
456        values: VC,
457    }
458
459    impl Columnar for Row {
460        #[inline(always)]
461        fn copy_from(&mut self, other: columnar::Ref<'_, Self>) {
462            self.clear();
463            self.data.extend_from_slice(other.data());
464        }
465        #[inline(always)]
466        fn into_owned(other: columnar::Ref<'_, Self>) -> Self {
467            other.to_owned()
468        }
469        type Container = Rows;
470        #[inline(always)]
471        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
472        where
473            Self: 'a,
474        {
475            thing
476        }
477    }
478
479    impl<BC: PushIndexAs<u64>> Borrow for Rows<BC, Vec<u8>> {
480        type Ref<'a> = &'a RowRef;
481        type Borrowed<'a>
482            = Rows<BC::Borrowed<'a>, &'a [u8]>
483        where
484            Self: 'a;
485        #[inline(always)]
486        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
487            Rows {
488                bounds: self.bounds.borrow(),
489                values: self.values.borrow(),
490            }
491        }
492        #[inline(always)]
493        fn reborrow<'c, 'a: 'c>(item: Self::Borrowed<'a>) -> Self::Borrowed<'c>
494        where
495            Self: 'a,
496        {
497            Rows {
498                bounds: BC::reborrow(item.bounds),
499                values: item.values,
500            }
501        }
502
503        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
504        where
505            Self: 'a,
506        {
507            item
508        }
509    }
510
511    impl<BC: PushIndexAs<u64>> Container for Rows<BC, Vec<u8>> {
512        fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
513            if !range.is_empty() {
514                // Imported bounds will be relative to this starting offset.
515                let values_len: u64 = self.values.len().try_into().expect("must fit");
516
517                // Push all bytes that we can, all at once.
518                let other_lower = if range.start == 0 {
519                    0
520                } else {
521                    other.bounds.index_as(range.start - 1)
522                };
523                let other_upper = other.bounds.index_as(range.end - 1);
524                self.values.extend_from_self(
525                    other.values,
526                    usize::try_from(other_lower).expect("must fit")
527                        ..usize::try_from(other_upper).expect("must fit"),
528                );
529
530                // Each bound needs to be shifted by `values_len - other_lower`.
531                if values_len == other_lower {
532                    self.bounds.extend_from_self(other.bounds, range);
533                } else {
534                    for index in range {
535                        let shifted = other.bounds.index_as(index) - other_lower + values_len;
536                        self.bounds.push(&shifted)
537                    }
538                }
539            }
540        }
541        fn reserve_for<'a, I>(&mut self, selves: I)
542        where
543            Self: 'a,
544            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
545        {
546            self.bounds.reserve_for(selves.clone().map(|r| r.bounds));
547            self.values.reserve_for(selves.map(|r| r.values));
548        }
549    }
550
551    impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
552        const SLICE_COUNT: usize = BC::SLICE_COUNT + VC::SLICE_COUNT;
553        #[inline(always)]
554        fn get_byte_slice(&self, index: usize) -> (u64, &'a [u8]) {
555            debug_assert!(index < Self::SLICE_COUNT);
556            if index < BC::SLICE_COUNT {
557                self.bounds.get_byte_slice(index)
558            } else {
559                self.values.get_byte_slice(index - BC::SLICE_COUNT)
560            }
561        }
562    }
563    impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
564        const SLICE_COUNT: usize = BC::SLICE_COUNT + VC::SLICE_COUNT;
565        #[inline(always)]
566        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
567            Self {
568                bounds: FromBytes::from_bytes(bytes),
569                values: FromBytes::from_bytes(bytes),
570            }
571        }
572    }
573
574    impl<BC: Len, VC> Len for Rows<BC, VC> {
575        #[inline(always)]
576        fn len(&self) -> usize {
577            self.bounds.len()
578        }
579    }
580
581    impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
582        type Ref = &'a RowRef;
583        #[inline(always)]
584        fn get(&self, index: usize) -> Self::Ref {
585            let lower = if index == 0 {
586                0
587            } else {
588                self.bounds.index_as(index - 1)
589            };
590            let upper = self.bounds.index_as(index);
591            let lower = usize::cast_from(lower);
592            let upper = usize::cast_from(upper);
593            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
594            // that correspond to the original rows.
595            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
596        }
597    }
598    impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
599        type Ref = &'a RowRef;
600        #[inline(always)]
601        fn get(&self, index: usize) -> Self::Ref {
602            let lower = if index == 0 {
603                0
604            } else {
605                self.bounds.index_as(index - 1)
606            };
607            let upper = self.bounds.index_as(index);
608            let lower = usize::cast_from(lower);
609            let upper = usize::cast_from(upper);
610            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
611            // that correspond to the original rows.
612            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
613        }
614    }
615
616    impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
617        #[inline(always)]
618        fn push(&mut self, item: &Row) {
619            self.values.extend_from_slice(item.data.as_slice());
620            self.bounds.push(u64::cast_from(self.values.len()));
621        }
622    }
623    impl<BC: for<'a> Push<&'a u64>> Push<&RowRef> for Rows<BC> {
624        #[inline(always)]
625        fn push(&mut self, item: &RowRef) {
626            self.values.extend_from_slice(item.data());
627            self.bounds.push(&u64::cast_from(self.values.len()));
628        }
629    }
630    impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
631        #[inline(always)]
632        fn clear(&mut self) {
633            self.bounds.clear();
634            self.values.clear();
635        }
636    }
637}
638
639/// A contiguous slice of bytes that are row data.
640///
641/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
642#[derive(PartialEq, Eq, Hash)]
643#[repr(transparent)]
644pub struct RowRef([u8]);
645
646impl RowRef {
647    /// Create a [`RowRef`] from a slice of data.
648    ///
649    /// # Safety
650    ///
651    /// We do not check that the provided slice is valid [`Row`] data; the caller is required to
652    /// ensure this.
653    pub unsafe fn from_slice(row: &[u8]) -> &RowRef {
654        #[allow(clippy::as_conversions)]
655        let ptr = row as *const [u8] as *const RowRef;
656        // SAFETY: We know `ptr` is non-null and aligned because it came from a &[u8].
657        unsafe { &*ptr }
658    }
659
660    /// Unpack `self` into a `Vec<Datum>` for efficient random access.
661    pub fn unpack(&self) -> Vec<Datum<'_>> {
662        // It's usually cheaper to unpack twice to figure out the right length than it is to grow the vec as we go
663        let len = self.iter().count();
664        let mut vec = Vec::with_capacity(len);
665        vec.extend(self.iter());
666        vec
667    }
668
669    /// Return the first [`Datum`] in `self`
670    ///
671    /// Panics if the [`RowRef`] is empty.
672    pub fn unpack_first(&self) -> Datum<'_> {
673        self.iter().next().unwrap()
674    }
675
676    /// Iterate the [`Datum`] elements of the [`RowRef`].
677    pub fn iter(&self) -> DatumListIter<'_> {
678        DatumListIter { data: &self.0 }
679    }
680
681    /// Return the byte length of this [`RowRef`].
682    pub fn byte_len(&self) -> usize {
683        self.0.len()
684    }
685
686    /// For debugging only.
687    pub fn data(&self) -> &[u8] {
688        &self.0
689    }
690
691    /// True iff there is no data in this [`RowRef`].
692    pub fn is_empty(&self) -> bool {
693        self.0.is_empty()
694    }
695}
696
697impl ToOwned for RowRef {
698    type Owned = Row;
699
700    fn to_owned(&self) -> Self::Owned {
701        // SAFETY: RowRef has the invariant that the wrapped data must be a valid Row encoding.
702        unsafe { Row::from_bytes_unchecked(&self.0) }
703    }
704}
705
706impl<'a> IntoIterator for &'a RowRef {
707    type Item = Datum<'a>;
708    type IntoIter = DatumListIter<'a>;
709
710    fn into_iter(self) -> DatumListIter<'a> {
711        DatumListIter { data: &self.0 }
712    }
713}
714
715/// These implementations order first by length, and then by slice contents.
716/// This allows many comparisons to complete without dereferencing memory.
717/// Warning: These order by the u8 array representation, and NOT by Datum::cmp.
718impl PartialOrd for RowRef {
719    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
720        Some(self.cmp(other))
721    }
722}
723
724impl Ord for RowRef {
725    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
726        match self.0.len().cmp(&other.0.len()) {
727            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
728            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
729            std::cmp::Ordering::Equal => self.0.cmp(&other.0),
730        }
731    }
732}
733
734impl fmt::Debug for RowRef {
735    /// Debug representation using the internal datums
736    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
737        f.write_str("RowRef{")?;
738        f.debug_list().entries(&*self).finish()?;
739        f.write_str("}")
740    }
741}
742
743/// Packs datums into a [`Row`].
744///
745/// Creating a `RowPacker` via [`Row::packer`] starts a packing operation on the
746/// row. A packing operation always starts from scratch: the existing contents
747/// of the underlying row are cleared.
748///
749/// To complete a packing operation, drop the `RowPacker`.
750#[derive(Debug)]
751pub struct RowPacker<'a> {
752    row: &'a mut Row,
753}
754
755/// Infallible conversion from a [`Datum`] to a typed value.
756///
757/// Used by [`DatumList::typed_iter`] to yield elements as `T` rather than
758/// raw `Datum`s. At runtime, `T` is always `Datum<'a>`, so the conversion
759/// is identity.
760///
761/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
762/// behind the generic type parameter and type erasure.
763///
764/// This trait is sealed and cannot be implemented outside of this crate.
765pub trait FromDatum<'a>:
766    Sized + PartialEq + std::borrow::Borrow<Datum<'a>> + sealed::Sealed
767{
768    fn from_datum(datum: Datum<'a>) -> Self;
769}
770
771mod sealed {
772    use crate::Datum;
773
774    pub trait Sealed {}
775    impl<'a> Sealed for Datum<'a> {}
776}
777
778impl<'a> FromDatum<'a> for Datum<'a> {
779    #[inline]
780    fn from_datum(datum: Datum<'a>) -> Self {
781        datum
782    }
783}
784
785#[derive(Debug, Clone)]
786pub struct DatumListIter<'a> {
787    data: &'a [u8],
788}
789
790#[derive(Debug, Clone)]
791pub struct DatumListTypedIter<'a, T> {
792    inner: DatumListIter<'a>,
793    _phantom: PhantomData<fn() -> T>,
794}
795
796#[derive(Debug, Clone)]
797pub struct DatumDictIter<'a> {
798    data: &'a [u8],
799    prev_key: Option<&'a str>,
800}
801
802#[derive(Debug, Clone)]
803pub struct DatumDictTypedIter<'a, T> {
804    inner: DatumDictIter<'a>,
805    _phantom: PhantomData<fn() -> T>,
806}
807
808/// `RowArena` is used to hold on to temporary `Row`s for functions like `eval` that need to create complex `Datum`s but don't have a `Row` to put them in yet.
809#[derive(Debug)]
810pub struct RowArena {
811    // Semantically, this field would be better represented by a `Vec<Box<[u8]>>`,
812    // as once the arena takes ownership of a byte vector the vector is never
813    // modified. But `RowArena::push_bytes` takes ownership of a `Vec<u8>`, so
814    // storing that `Vec<u8>` directly avoids an allocation. The cost is
815    // additional memory use, as the vector may have spare capacity, but row
816    // arenas are short lived so this is the better tradeoff.
817    inner: RefCell<Vec<Vec<u8>>>,
818}
819
820// DatumList and DatumDict defined here rather than near Datum because we need private access to the unsafe data field
821
822/// A sequence of Datums
823///
824/// The type parameter `T` represents the element type of the list. It is a
825/// phantom parameter that carries no runtime data — the actual elements are
826/// stored as serialized bytes and `T` is not enforced at runtime. It is up
827/// to the caller to ensure `T` matches the actual element type. The default
828/// `T = Datum<'a>` means existing code that writes `DatumList<'a>` continues
829/// to work unchanged.
830///
831/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
832/// behind the generic type parameter.
833pub struct DatumList<'a, T = Datum<'a>> {
834    /// Points at the serialized datums
835    data: &'a [u8],
836    _phantom: PhantomData<fn() -> T>,
837}
838
839impl<'a, T> DatumList<'a, T> {
840    /// Private constructor. All `DatumList` values should be created through
841    /// this function to keep the `PhantomData` bookkeeping in one place.
842    pub(crate) fn new(data: &'a [u8]) -> Self {
843        DatumList {
844            data,
845            _phantom: PhantomData,
846        }
847    }
848}
849
850impl<'a, T> Clone for DatumList<'a, T> {
851    fn clone(&self) -> Self {
852        *self
853    }
854}
855
856impl<'a, T> Copy for DatumList<'a, T> {}
857
858impl<'a, T> Debug for DatumList<'a, T> {
859    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
860        f.debug_list().entries(self.iter()).finish()
861    }
862}
863
864impl<'a, T> PartialEq for DatumList<'a, T> {
865    #[inline(always)]
866    fn eq(&self, other: &DatumList<'a, T>) -> bool {
867        self.iter().eq(other.iter())
868    }
869}
870
871impl<'a, T> Eq for DatumList<'a, T> {}
872
873impl<'a, T> Hash for DatumList<'a, T> {
874    #[inline(always)]
875    fn hash<H: Hasher>(&self, state: &mut H) {
876        for d in self.iter() {
877            d.hash(state);
878        }
879    }
880}
881
882impl<T> Ord for DatumList<'_, T> {
883    #[inline(always)]
884    fn cmp(&self, other: &DatumList<'_, T>) -> Ordering {
885        self.iter().cmp(other.iter())
886    }
887}
888
889impl<T> PartialOrd for DatumList<'_, T> {
890    #[inline(always)]
891    fn partial_cmp(&self, other: &DatumList<'_, T>) -> Option<Ordering> {
892        Some(self.cmp(other))
893    }
894}
895
896/// A mapping from string keys to Datums
897///
898/// The type parameter `T` represents the value type of the map. It is a
899/// phantom parameter — the actual values are stored as serialized bytes and
900/// `T` is not enforced at runtime. It is up to the caller to ensure `T`
901/// matches the actual value type. The default `T = Datum<'a>` means existing
902/// code that writes `DatumMap<'a>` continues to work unchanged.
903///
904/// See `doc/developer/design/20260311_sqlfunc_generic.md` for the design
905/// behind the generic type parameter.
906pub struct DatumMap<'a, T = Datum<'a>> {
907    /// Points at the serialized datums, which should be sorted in key order
908    data: &'a [u8],
909    _phantom: PhantomData<fn() -> T>,
910}
911
912impl<'a, T> DatumMap<'a, T> {
913    /// Private constructor. All `DatumMap` values should be created through
914    /// this function to keep the `PhantomData` bookkeeping in one place.
915    pub(crate) fn new(data: &'a [u8]) -> Self {
916        DatumMap {
917            data,
918            _phantom: PhantomData,
919        }
920    }
921}
922
923impl<'a, T> Clone for DatumMap<'a, T> {
924    fn clone(&self) -> Self {
925        *self
926    }
927}
928
929impl<'a, T> Copy for DatumMap<'a, T> {}
930
931impl<'a, T> PartialEq for DatumMap<'a, T> {
932    #[inline(always)]
933    fn eq(&self, other: &DatumMap<'a, T>) -> bool {
934        self.iter().eq(other.iter())
935    }
936}
937
938impl<'a, T> Eq for DatumMap<'a, T> {}
939
940impl<'a, T> Hash for DatumMap<'a, T> {
941    #[inline(always)]
942    fn hash<H: Hasher>(&self, state: &mut H) {
943        for (k, v) in self.iter() {
944            k.hash(state);
945            v.hash(state);
946        }
947    }
948}
949
950impl<'a, T> Ord for DatumMap<'a, T> {
951    #[inline(always)]
952    fn cmp(&self, other: &DatumMap<'a, T>) -> Ordering {
953        self.iter().cmp(other.iter())
954    }
955}
956
957impl<'a, T> PartialOrd for DatumMap<'a, T> {
958    #[inline(always)]
959    fn partial_cmp(&self, other: &DatumMap<'a, T>) -> Option<Ordering> {
960        Some(self.cmp(other))
961    }
962}
963
964impl<'a> crate::scalar::SqlContainerType for DatumList<'a, Datum<'a>> {
965    fn unwrap_element_type(container: &SqlScalarType) -> &SqlScalarType {
966        container.unwrap_list_element_type()
967    }
968    fn wrap_element_type(element: SqlScalarType) -> SqlScalarType {
969        SqlScalarType::List {
970            element_type: Box::new(element),
971            custom_id: None,
972        }
973    }
974}
975
976impl<'a> crate::scalar::SqlContainerType for DatumMap<'a, Datum<'a>> {
977    fn unwrap_element_type(container: &SqlScalarType) -> &SqlScalarType {
978        container.unwrap_map_value_type()
979    }
980    fn wrap_element_type(element: SqlScalarType) -> SqlScalarType {
981        SqlScalarType::Map {
982            value_type: Box::new(element),
983            custom_id: None,
984        }
985    }
986}
987
988/// Represents a single `Datum`, appropriate to be nested inside other
989/// `Datum`s.
990#[derive(Clone, Copy, Eq, PartialEq, Hash)]
991pub struct DatumNested<'a> {
992    val: &'a [u8],
993}
994
995impl<'a> std::fmt::Display for DatumNested<'a> {
996    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
997        std::fmt::Display::fmt(&self.datum(), f)
998    }
999}
1000
1001impl<'a> std::fmt::Debug for DatumNested<'a> {
1002    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1003        f.debug_struct("DatumNested")
1004            .field("val", &self.datum())
1005            .finish()
1006    }
1007}
1008
1009impl<'a> DatumNested<'a> {
1010    // Figure out which bytes `read_datum` returns (e.g. including the tag),
1011    // and then store a reference to those bytes, so we can "replay" this same
1012    // call later on without storing the datum itself.
1013    pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
1014        let prev = *data;
1015        let _ = unsafe { read_datum(data) };
1016        DatumNested {
1017            val: &prev[..(prev.len() - data.len())],
1018        }
1019    }
1020
1021    /// Returns the datum `self` contains.
1022    pub fn datum(&self) -> Datum<'a> {
1023        let mut temp = self.val;
1024        unsafe { read_datum(&mut temp) }
1025    }
1026}
1027
1028impl<'a> Ord for DatumNested<'a> {
1029    fn cmp(&self, other: &Self) -> Ordering {
1030        self.datum().cmp(&other.datum())
1031    }
1032}
1033
1034impl<'a> PartialOrd for DatumNested<'a> {
1035    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1036        Some(self.cmp(other))
1037    }
1038}
1039
1040// Prefer adding new tags to the end of the enum. Certain behavior, like row ordering and EXPLAIN
1041// PHYSICAL PLAN, rely on the ordering of this enum. Neither of these are breaking changes, but
1042// it's annoying when they change.
1043#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
1044#[repr(u8)]
1045enum Tag {
1046    Null,
1047    False,
1048    True,
1049    Int16,
1050    Int32,
1051    Int64,
1052    UInt8,
1053    UInt32,
1054    Float32,
1055    Float64,
1056    Date,
1057    Time,
1058    Timestamp,
1059    TimestampTz,
1060    Interval,
1061    BytesTiny,
1062    BytesShort,
1063    BytesLong,
1064    BytesHuge,
1065    StringTiny,
1066    StringShort,
1067    StringLong,
1068    StringHuge,
1069    Uuid,
1070    Array,
1071    ListTiny,
1072    ListShort,
1073    ListLong,
1074    ListHuge,
1075    Dict,
1076    JsonNull,
1077    Dummy,
1078    Numeric,
1079    UInt16,
1080    UInt64,
1081    MzTimestamp,
1082    Range,
1083    MzAclItem,
1084    AclItem,
1085    // Everything except leap seconds and times beyond the range of
1086    // i64 nanoseconds. (Note that Materialize does not support leap
1087    // seconds, but this module does).
1088    CheapTimestamp,
1089    // Everything except leap seconds and times beyond the range of
1090    // i64 nanoseconds. (Note that Materialize does not support leap
1091    // seconds, but this module does).
1092    CheapTimestampTz,
1093    // The next several tags are for variable-length signed integer encoding.
1094    // The basic idea is that `NonNegativeIntN_K` is used to encode a datum of type
1095    // IntN whose actual value is positive or zero and fits in K bits, and similarly for
1096    // NegativeIntN_K with negative values.
1097    //
1098    // The order of these tags matters, because we want to be able to choose the
1099    // tag for a given datum quickly, with arithmetic, rather than slowly, with a
1100    // stack of `if` statements.
1101    //
1102    // Separate tags for non-negative and negative numbers are used to avoid having to
1103    // waste one bit in the actual data space to encode the sign.
1104    NonNegativeInt16_0, // i.e., 0
1105    NonNegativeInt16_8,
1106    NonNegativeInt16_16,
1107
1108    NonNegativeInt32_0,
1109    NonNegativeInt32_8,
1110    NonNegativeInt32_16,
1111    NonNegativeInt32_24,
1112    NonNegativeInt32_32,
1113
1114    NonNegativeInt64_0,
1115    NonNegativeInt64_8,
1116    NonNegativeInt64_16,
1117    NonNegativeInt64_24,
1118    NonNegativeInt64_32,
1119    NonNegativeInt64_40,
1120    NonNegativeInt64_48,
1121    NonNegativeInt64_56,
1122    NonNegativeInt64_64,
1123
1124    NegativeInt16_0, // i.e., -1
1125    NegativeInt16_8,
1126    NegativeInt16_16,
1127
1128    NegativeInt32_0,
1129    NegativeInt32_8,
1130    NegativeInt32_16,
1131    NegativeInt32_24,
1132    NegativeInt32_32,
1133
1134    NegativeInt64_0,
1135    NegativeInt64_8,
1136    NegativeInt64_16,
1137    NegativeInt64_24,
1138    NegativeInt64_32,
1139    NegativeInt64_40,
1140    NegativeInt64_48,
1141    NegativeInt64_56,
1142    NegativeInt64_64,
1143
1144    // These are like the ones above, but for unsigned types. The
1145    // situation is slightly simpler as we don't have negatives.
1146    UInt8_0, // i.e., 0
1147    UInt8_8,
1148
1149    UInt16_0,
1150    UInt16_8,
1151    UInt16_16,
1152
1153    UInt32_0,
1154    UInt32_8,
1155    UInt32_16,
1156    UInt32_24,
1157    UInt32_32,
1158
1159    UInt64_0,
1160    UInt64_8,
1161    UInt64_16,
1162    UInt64_24,
1163    UInt64_32,
1164    UInt64_40,
1165    UInt64_48,
1166    UInt64_56,
1167    UInt64_64,
1168}
1169
1170impl Tag {
1171    fn actual_int_length(self) -> Option<usize> {
1172        use Tag::*;
1173        let val = match self {
1174            NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
1175            | UInt32_0 | UInt64_0 => 0,
1176            NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
1177            | UInt32_8 | UInt64_8 => 1,
1178            NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
1179            | UInt32_16 | UInt64_16 => 2,
1180            NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
1181            NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
1182            NonNegativeInt64_40 | UInt64_40 => 5,
1183            NonNegativeInt64_48 | UInt64_48 => 6,
1184            NonNegativeInt64_56 | UInt64_56 => 7,
1185            NonNegativeInt64_64 | UInt64_64 => 8,
1186            NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
1187            NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
1188            NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
1189            NegativeInt32_24 | NegativeInt64_24 => 3,
1190            NegativeInt32_32 | NegativeInt64_32 => 4,
1191            NegativeInt64_40 => 5,
1192            NegativeInt64_48 => 6,
1193            NegativeInt64_56 => 7,
1194            NegativeInt64_64 => 8,
1195
1196            _ => return None,
1197        };
1198        Some(val)
1199    }
1200}
1201
1202// --------------------------------------------------------------------------------
1203// reading data
1204
1205/// Read a byte slice starting at byte `offset`.
1206///
1207/// Updates `offset` to point to the first byte after the end of the read region.
1208fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
1209    let len = u64::from_le_bytes(read_byte_array(data));
1210    let len = usize::cast_from(len);
1211    let (bytes, next) = data.split_at(len);
1212    *data = next;
1213    bytes
1214}
1215
1216/// Read a data whose length is encoded in the row before its contents.
1217///
1218/// Updates `offset` to point to the first byte after the end of the read region.
1219///
1220/// # Safety
1221///
1222/// This function is safe if the datum's length and contents were previously written by `push_lengthed_bytes`,
1223/// and it was only written with a `String` tag if it was indeed UTF-8.
1224unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
1225    let len = match tag {
1226        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
1227        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1228            usize::from(u16::from_le_bytes(read_byte_array(data)))
1229        }
1230        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1231            usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
1232        }
1233        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1234            usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
1235        }
1236        _ => unreachable!(),
1237    };
1238    let (bytes, next) = data.split_at(len);
1239    *data = next;
1240    match tag {
1241        Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
1242        Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
1243            Datum::String(str::from_utf8_unchecked(bytes))
1244        }
1245        Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
1246            Datum::List(DatumList::new(bytes))
1247        }
1248        _ => unreachable!(),
1249    }
1250}
1251
1252fn read_byte(data: &mut &[u8]) -> u8 {
1253    let byte = data[0];
1254    *data = &data[1..];
1255    byte
1256}
1257
1258/// Read `length` bytes from `data` at `offset`, updating the
1259/// latter. Extend the resulting buffer to an array of `N` bytes by
1260/// inserting `FILL` in the k most significant bytes, where k = N - length.
1261///
1262/// SAFETY:
1263///   * length <= N
1264///   * offset + length <= data.len()
1265fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1266    data: &mut &[u8],
1267    length: usize,
1268) -> [u8; N] {
1269    let mut raw = [FILL; N];
1270    let (prev, next) = data.split_at(length);
1271    (raw[..prev.len()]).copy_from_slice(prev);
1272    *data = next;
1273    raw
1274}
1275/// Read `length` bytes from `data` at `offset`, updating the
1276/// latter. Extend the resulting buffer to a negative `N`-byte
1277/// twos complement integer by filling the remaining bits with 1.
1278///
1279/// SAFETY:
1280///   * length <= N
1281///   * offset + length <= data.len()
1282fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1283    read_byte_array_sign_extending::<N, 255>(data, length)
1284}
1285
1286/// Read `length` bytes from `data` at `offset`, updating the
1287/// latter. Extend the resulting buffer to a positive or zero `N`-byte
1288/// twos complement integer by filling the remaining bits with 0.
1289///
1290/// SAFETY:
1291///   * length <= N
1292///   * offset + length <= data.len()
1293fn read_byte_array_extending_nonnegative<const N: usize>(
1294    data: &mut &[u8],
1295    length: usize,
1296) -> [u8; N] {
1297    read_byte_array_sign_extending::<N, 0>(data, length)
1298}
1299
1300pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1301    let (prev, next) = data.split_first_chunk().unwrap();
1302    *data = next;
1303    *prev
1304}
1305
1306pub(super) fn read_date(data: &mut &[u8]) -> Date {
1307    let days = i32::from_le_bytes(read_byte_array(data));
1308    Date::from_pg_epoch(days).expect("unexpected date")
1309}
1310
1311pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1312    let year = i32::from_le_bytes(read_byte_array(data));
1313    let ordinal = u32::from_le_bytes(read_byte_array(data));
1314    NaiveDate::from_yo_opt(year, ordinal).unwrap()
1315}
1316
1317pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1318    let secs = u32::from_le_bytes(read_byte_array(data));
1319    let nanos = u32::from_le_bytes(read_byte_array(data));
1320    NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1321}
1322
1323/// Read a datum starting at byte `offset`.
1324///
1325/// Updates `offset` to point to the first byte after the end of the read region.
1326///
1327/// # Safety
1328///
1329/// This function is safe if a `Datum` was previously written at this offset by `push_datum`.
1330/// Otherwise it could return invalid values, which is Undefined Behavior.
1331pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1332    let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1333    match tag {
1334        Tag::Null => Datum::Null,
1335        Tag::False => Datum::False,
1336        Tag::True => Datum::True,
1337        Tag::UInt8_0 | Tag::UInt8_8 => {
1338            let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1339                data,
1340                tag.actual_int_length()
1341                    .expect("returns a value for variable-length-encoded integer tags"),
1342            ));
1343            Datum::UInt8(i)
1344        }
1345        Tag::Int16 => {
1346            let i = i16::from_le_bytes(read_byte_array(data));
1347            Datum::Int16(i)
1348        }
1349        Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1350            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1351            // and `data` is big enough because it was encoded validly. These assumptions
1352            // are checked in debug asserts.
1353            let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1354                data,
1355                tag.actual_int_length()
1356                    .expect("returns a value for variable-length-encoded integer tags"),
1357            ));
1358            Datum::Int16(i)
1359        }
1360        Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1361            let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1362                data,
1363                tag.actual_int_length()
1364                    .expect("returns a value for variable-length-encoded integer tags"),
1365            ));
1366            Datum::UInt16(i)
1367        }
1368        Tag::Int32 => {
1369            let i = i32::from_le_bytes(read_byte_array(data));
1370            Datum::Int32(i)
1371        }
1372        Tag::NonNegativeInt32_0
1373        | Tag::NonNegativeInt32_32
1374        | Tag::NonNegativeInt32_8
1375        | Tag::NonNegativeInt32_16
1376        | Tag::NonNegativeInt32_24 => {
1377            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1378            // and `data` is big enough because it was encoded validly. These assumptions
1379            // are checked in debug asserts.
1380            let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1381                data,
1382                tag.actual_int_length()
1383                    .expect("returns a value for variable-length-encoded integer tags"),
1384            ));
1385            Datum::Int32(i)
1386        }
1387        Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1388            let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1389                data,
1390                tag.actual_int_length()
1391                    .expect("returns a value for variable-length-encoded integer tags"),
1392            ));
1393            Datum::UInt32(i)
1394        }
1395        Tag::Int64 => {
1396            let i = i64::from_le_bytes(read_byte_array(data));
1397            Datum::Int64(i)
1398        }
1399        Tag::NonNegativeInt64_0
1400        | Tag::NonNegativeInt64_64
1401        | Tag::NonNegativeInt64_8
1402        | Tag::NonNegativeInt64_16
1403        | Tag::NonNegativeInt64_24
1404        | Tag::NonNegativeInt64_32
1405        | Tag::NonNegativeInt64_40
1406        | Tag::NonNegativeInt64_48
1407        | Tag::NonNegativeInt64_56 => {
1408            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1409            // and `data` is big enough because it was encoded validly. These assumptions
1410            // are checked in debug asserts.
1411
1412            let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1413                data,
1414                tag.actual_int_length()
1415                    .expect("returns a value for variable-length-encoded integer tags"),
1416            ));
1417            Datum::Int64(i)
1418        }
1419        Tag::UInt64_0
1420        | Tag::UInt64_8
1421        | Tag::UInt64_16
1422        | Tag::UInt64_24
1423        | Tag::UInt64_32
1424        | Tag::UInt64_40
1425        | Tag::UInt64_48
1426        | Tag::UInt64_56
1427        | Tag::UInt64_64 => {
1428            let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1429                data,
1430                tag.actual_int_length()
1431                    .expect("returns a value for variable-length-encoded integer tags"),
1432            ));
1433            Datum::UInt64(i)
1434        }
1435        Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1436            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1437            // and `data` is big enough because it was encoded validly. These assumptions
1438            // are checked in debug asserts.
1439            let i = i16::from_le_bytes(read_byte_array_extending_negative(
1440                data,
1441                tag.actual_int_length()
1442                    .expect("returns a value for variable-length-encoded integer tags"),
1443            ));
1444            Datum::Int16(i)
1445        }
1446        Tag::NegativeInt32_0
1447        | Tag::NegativeInt32_32
1448        | Tag::NegativeInt32_8
1449        | Tag::NegativeInt32_16
1450        | Tag::NegativeInt32_24 => {
1451            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1452            // and `data` is big enough because it was encoded validly. These assumptions
1453            // are checked in debug asserts.
1454            let i = i32::from_le_bytes(read_byte_array_extending_negative(
1455                data,
1456                tag.actual_int_length()
1457                    .expect("returns a value for variable-length-encoded integer tags"),
1458            ));
1459            Datum::Int32(i)
1460        }
1461        Tag::NegativeInt64_0
1462        | Tag::NegativeInt64_64
1463        | Tag::NegativeInt64_8
1464        | Tag::NegativeInt64_16
1465        | Tag::NegativeInt64_24
1466        | Tag::NegativeInt64_32
1467        | Tag::NegativeInt64_40
1468        | Tag::NegativeInt64_48
1469        | Tag::NegativeInt64_56 => {
1470            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1471            // and `data` is big enough because the row was encoded validly. These assumptions
1472            // are checked in debug asserts.
1473            let i = i64::from_le_bytes(read_byte_array_extending_negative(
1474                data,
1475                tag.actual_int_length()
1476                    .expect("returns a value for variable-length-encoded integer tags"),
1477            ));
1478            Datum::Int64(i)
1479        }
1480
1481        Tag::UInt8 => {
1482            let i = u8::from_le_bytes(read_byte_array(data));
1483            Datum::UInt8(i)
1484        }
1485        Tag::UInt16 => {
1486            let i = u16::from_le_bytes(read_byte_array(data));
1487            Datum::UInt16(i)
1488        }
1489        Tag::UInt32 => {
1490            let i = u32::from_le_bytes(read_byte_array(data));
1491            Datum::UInt32(i)
1492        }
1493        Tag::UInt64 => {
1494            let i = u64::from_le_bytes(read_byte_array(data));
1495            Datum::UInt64(i)
1496        }
1497        Tag::Float32 => {
1498            let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1499            Datum::Float32(OrderedFloat::from(f))
1500        }
1501        Tag::Float64 => {
1502            let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1503            Datum::Float64(OrderedFloat::from(f))
1504        }
1505        Tag::Date => Datum::Date(read_date(data)),
1506        Tag::Time => Datum::Time(read_time(data)),
1507        Tag::CheapTimestamp => {
1508            let ts = i64::from_le_bytes(read_byte_array(data));
1509            let secs = ts.div_euclid(1_000_000_000);
1510            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1511            let ndt = DateTime::from_timestamp(secs, nsecs)
1512                .expect("We only write round-trippable timestamps")
1513                .naive_utc();
1514            Datum::Timestamp(
1515                CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1516            )
1517        }
1518        Tag::CheapTimestampTz => {
1519            let ts = i64::from_le_bytes(read_byte_array(data));
1520            let secs = ts.div_euclid(1_000_000_000);
1521            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1522            let dt = DateTime::from_timestamp(secs, nsecs)
1523                .expect("We only write round-trippable timestamps");
1524            Datum::TimestampTz(
1525                CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1526            )
1527        }
1528        Tag::Timestamp => {
1529            let date = read_naive_date(data);
1530            let time = read_time(data);
1531            Datum::Timestamp(
1532                CheckedTimestamp::from_timestamplike(date.and_time(time))
1533                    .expect("unexpected timestamp"),
1534            )
1535        }
1536        Tag::TimestampTz => {
1537            let date = read_naive_date(data);
1538            let time = read_time(data);
1539            Datum::TimestampTz(
1540                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1541                    date.and_time(time),
1542                    Utc,
1543                ))
1544                .expect("unexpected timestamptz"),
1545            )
1546        }
1547        Tag::Interval => {
1548            let months = i32::from_le_bytes(read_byte_array(data));
1549            let days = i32::from_le_bytes(read_byte_array(data));
1550            let micros = i64::from_le_bytes(read_byte_array(data));
1551            Datum::Interval(Interval {
1552                months,
1553                days,
1554                micros,
1555            })
1556        }
1557        Tag::BytesTiny
1558        | Tag::BytesShort
1559        | Tag::BytesLong
1560        | Tag::BytesHuge
1561        | Tag::StringTiny
1562        | Tag::StringShort
1563        | Tag::StringLong
1564        | Tag::StringHuge
1565        | Tag::ListTiny
1566        | Tag::ListShort
1567        | Tag::ListLong
1568        | Tag::ListHuge => read_lengthed_datum(data, tag),
1569        Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1570        Tag::Array => {
1571            // See the comment in `Row::push_array` for details on the encoding
1572            // of arrays.
1573            let ndims = read_byte(data);
1574            let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1575            let (dims, next) = data.split_at(dims_size);
1576            *data = next;
1577            let bytes = read_untagged_bytes(data);
1578            Datum::Array(Array {
1579                dims: ArrayDimensions { data: dims },
1580                elements: DatumList::new(bytes),
1581            })
1582        }
1583        Tag::Dict => {
1584            let bytes = read_untagged_bytes(data);
1585            Datum::Map(DatumMap::new(bytes))
1586        }
1587        Tag::JsonNull => Datum::JsonNull,
1588        Tag::Dummy => Datum::Dummy,
1589        Tag::Numeric => {
1590            let digits = read_byte(data).into();
1591            let exponent = i8::reinterpret_cast(read_byte(data));
1592            let bits = read_byte(data);
1593
1594            let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1595            let lsu_u8_len = lsu_u16_len * 2;
1596            let (lsu_u8, next) = data.split_at(lsu_u8_len);
1597            *data = next;
1598
1599            // TODO: if we refactor the decimal library to accept the owned
1600            // array as a parameter to `from_raw_parts` below, we could likely
1601            // avoid a copy because it is exactly the value we want
1602            let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1603            for (i, c) in lsu_u8.chunks(2).enumerate() {
1604                lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1605            }
1606
1607            let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1608            Datum::from(d)
1609        }
1610        Tag::MzTimestamp => {
1611            let t = Timestamp::decode(read_byte_array(data));
1612            Datum::MzTimestamp(t)
1613        }
1614        Tag::Range => {
1615            // See notes on `push_range_with` for details about encoding.
1616            let flag_byte = read_byte(data);
1617            let flags = range::InternalFlags::from_bits(flag_byte)
1618                .expect("range flags must be encoded validly");
1619
1620            if flags.contains(range::InternalFlags::EMPTY) {
1621                assert!(
1622                    flags == range::InternalFlags::EMPTY,
1623                    "empty ranges contain only RANGE_EMPTY flag"
1624                );
1625
1626                return Datum::Range(Range { inner: None });
1627            }
1628
1629            let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1630                None
1631            } else {
1632                Some(DatumNested::extract(data))
1633            };
1634
1635            let lower = RangeBound {
1636                inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1637                bound: lower_bound,
1638            };
1639
1640            let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1641                None
1642            } else {
1643                Some(DatumNested::extract(data))
1644            };
1645
1646            let upper = RangeBound {
1647                inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1648                bound: upper_bound,
1649            };
1650
1651            Datum::Range(Range {
1652                inner: Some(RangeInner { lower, upper }),
1653            })
1654        }
1655        Tag::MzAclItem => {
1656            const N: usize = MzAclItem::binary_size();
1657            let mz_acl_item =
1658                MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1659            Datum::MzAclItem(mz_acl_item)
1660        }
1661        Tag::AclItem => {
1662            const N: usize = AclItem::binary_size();
1663            let acl_item =
1664                AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1665            Datum::AclItem(acl_item)
1666        }
1667    }
1668}
1669
1670// --------------------------------------------------------------------------------
1671// writing data
1672
1673fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1674where
1675    D: Vector<u8>,
1676{
1677    let len = u64::cast_from(bytes.len());
1678    data.extend_from_slice(&len.to_le_bytes());
1679    data.extend_from_slice(bytes);
1680}
1681
1682fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1683where
1684    D: Vector<u8>,
1685{
1686    match tag {
1687        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1688            let len = bytes.len().to_le_bytes();
1689            data.push(len[0]);
1690        }
1691        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1692            let len = bytes.len().to_le_bytes();
1693            data.extend_from_slice(&len[0..2]);
1694        }
1695        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1696            let len = bytes.len().to_le_bytes();
1697            data.extend_from_slice(&len[0..4]);
1698        }
1699        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1700            let len = bytes.len().to_le_bytes();
1701            data.extend_from_slice(&len);
1702        }
1703        _ => unreachable!(),
1704    }
1705    data.extend_from_slice(bytes);
1706}
1707
1708pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1709    i32::to_le_bytes(date.pg_epoch_days())
1710}
1711
1712fn push_date<D>(data: &mut D, date: Date)
1713where
1714    D: Vector<u8>,
1715{
1716    data.extend_from_slice(&date_to_array(date));
1717}
1718
1719pub(super) fn naive_date_to_arrays(
1720    date: NaiveDate,
1721) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1722    (
1723        i32::to_le_bytes(date.year()),
1724        u32::to_le_bytes(date.ordinal()),
1725    )
1726}
1727
1728fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1729where
1730    D: Vector<u8>,
1731{
1732    let (ds1, ds2) = naive_date_to_arrays(date);
1733    data.extend_from_slice(&ds1);
1734    data.extend_from_slice(&ds2);
1735}
1736
1737pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1738    (
1739        u32::to_le_bytes(time.num_seconds_from_midnight()),
1740        u32::to_le_bytes(time.nanosecond()),
1741    )
1742}
1743
1744fn push_time<D>(data: &mut D, time: NaiveTime)
1745where
1746    D: Vector<u8>,
1747{
1748    let (ts1, ts2) = time_to_arrays(time);
1749    data.extend_from_slice(&ts1);
1750    data.extend_from_slice(&ts2);
1751}
1752
1753/// Returns an i64 representing a `NaiveDateTime`, if
1754/// said i64 can be round-tripped back to a `NaiveDateTime`.
1755///
1756/// The only exotic NDTs for which this can't happen are those that
1757/// are hundreds of years in the future or past, or those that
1758/// represent a leap second. (Note that Materialize does not support
1759/// leap seconds, but this module does).
1760// This function is inspired by `NaiveDateTime::timestamp_nanos`,
1761// with extra checking.
1762fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1763    let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1764    if subsec_nanos >= 1_000_000_000 {
1765        return None;
1766    }
1767    let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1768    as_ns.checked_add(i64::from(subsec_nanos))
1769}
1770
1771// This function is extremely hot, so
1772// we just use `as` to avoid the overhead of
1773// `try_into` followed by `unwrap`.
1774// `leading_ones` and `leading_zeros`
1775// can never return values greater than 64, so the conversion is safe.
1776#[inline(always)]
1777#[allow(clippy::as_conversions)]
1778fn min_bytes_signed<T>(i: T) -> u8
1779where
1780    T: Into<i64>,
1781{
1782    let i: i64 = i.into();
1783
1784    // To fit in n bytes, we require that
1785    // everything but the leading sign bits fits in n*8
1786    // bits.
1787    let n_sign_bits = if i.is_negative() {
1788        i.leading_ones() as u8
1789    } else {
1790        i.leading_zeros() as u8
1791    };
1792
1793    (64 - n_sign_bits + 7) / 8
1794}
1795
1796// In principle we could just use `min_bytes_signed`, rather than
1797// having a separate function here, as long as we made that one take
1798// `T: Into<i128>` instead of 64. But LLVM doesn't seem smart enough
1799// to realize that that function is the same as the current version,
1800// and generates worse code.
1801//
1802// Justification for `as` is the same as in `min_bytes_signed`.
1803#[inline(always)]
1804#[allow(clippy::as_conversions)]
1805fn min_bytes_unsigned<T>(i: T) -> u8
1806where
1807    T: Into<u64>,
1808{
1809    let i: u64 = i.into();
1810
1811    let n_sign_bits = i.leading_zeros() as u8;
1812
1813    (64 - n_sign_bits + 7) / 8
1814}
1815
1816const TINY: usize = 1 << 8;
1817const SHORT: usize = 1 << 16;
1818const LONG: usize = 1 << 32;
1819
1820fn push_datum<D>(data: &mut D, datum: Datum)
1821where
1822    D: Vector<u8>,
1823{
1824    match datum {
1825        Datum::Null => data.push(Tag::Null.into()),
1826        Datum::False => data.push(Tag::False.into()),
1827        Datum::True => data.push(Tag::True.into()),
1828        Datum::Int16(i) => {
1829            let mbs = min_bytes_signed(i);
1830            let tag = u8::from(if i.is_negative() {
1831                Tag::NegativeInt16_0
1832            } else {
1833                Tag::NonNegativeInt16_0
1834            }) + mbs;
1835
1836            data.push(tag);
1837            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1838        }
1839        Datum::Int32(i) => {
1840            let mbs = min_bytes_signed(i);
1841            let tag = u8::from(if i.is_negative() {
1842                Tag::NegativeInt32_0
1843            } else {
1844                Tag::NonNegativeInt32_0
1845            }) + mbs;
1846
1847            data.push(tag);
1848            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1849        }
1850        Datum::Int64(i) => {
1851            let mbs = min_bytes_signed(i);
1852            let tag = u8::from(if i.is_negative() {
1853                Tag::NegativeInt64_0
1854            } else {
1855                Tag::NonNegativeInt64_0
1856            }) + mbs;
1857
1858            data.push(tag);
1859            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1860        }
1861        Datum::UInt8(i) => {
1862            let mbu = min_bytes_unsigned(i);
1863            let tag = u8::from(Tag::UInt8_0) + mbu;
1864            data.push(tag);
1865            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1866        }
1867        Datum::UInt16(i) => {
1868            let mbu = min_bytes_unsigned(i);
1869            let tag = u8::from(Tag::UInt16_0) + mbu;
1870            data.push(tag);
1871            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1872        }
1873        Datum::UInt32(i) => {
1874            let mbu = min_bytes_unsigned(i);
1875            let tag = u8::from(Tag::UInt32_0) + mbu;
1876            data.push(tag);
1877            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1878        }
1879        Datum::UInt64(i) => {
1880            let mbu = min_bytes_unsigned(i);
1881            let tag = u8::from(Tag::UInt64_0) + mbu;
1882            data.push(tag);
1883            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1884        }
1885        Datum::Float32(f) => {
1886            data.push(Tag::Float32.into());
1887            data.extend_from_slice(&f.to_bits().to_le_bytes());
1888        }
1889        Datum::Float64(f) => {
1890            data.push(Tag::Float64.into());
1891            data.extend_from_slice(&f.to_bits().to_le_bytes());
1892        }
1893        Datum::Date(d) => {
1894            data.push(Tag::Date.into());
1895            push_date(data, d);
1896        }
1897        Datum::Time(t) => {
1898            data.push(Tag::Time.into());
1899            push_time(data, t);
1900        }
1901        Datum::Timestamp(t) => {
1902            let datetime = t.to_naive();
1903            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1904                data.push(Tag::CheapTimestamp.into());
1905                data.extend_from_slice(&nanos.to_le_bytes());
1906            } else {
1907                data.push(Tag::Timestamp.into());
1908                push_naive_date(data, datetime.date());
1909                push_time(data, datetime.time());
1910            }
1911        }
1912        Datum::TimestampTz(t) => {
1913            let datetime = t.to_naive();
1914            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1915                data.push(Tag::CheapTimestampTz.into());
1916                data.extend_from_slice(&nanos.to_le_bytes());
1917            } else {
1918                data.push(Tag::TimestampTz.into());
1919                push_naive_date(data, datetime.date());
1920                push_time(data, datetime.time());
1921            }
1922        }
1923        Datum::Interval(i) => {
1924            data.push(Tag::Interval.into());
1925            data.extend_from_slice(&i.months.to_le_bytes());
1926            data.extend_from_slice(&i.days.to_le_bytes());
1927            data.extend_from_slice(&i.micros.to_le_bytes());
1928        }
1929        Datum::Bytes(bytes) => {
1930            let tag = match bytes.len() {
1931                0..TINY => Tag::BytesTiny,
1932                TINY..SHORT => Tag::BytesShort,
1933                SHORT..LONG => Tag::BytesLong,
1934                _ => Tag::BytesHuge,
1935            };
1936            data.push(tag.into());
1937            push_lengthed_bytes(data, bytes, tag);
1938        }
1939        Datum::String(string) => {
1940            let tag = match string.len() {
1941                0..TINY => Tag::StringTiny,
1942                TINY..SHORT => Tag::StringShort,
1943                SHORT..LONG => Tag::StringLong,
1944                _ => Tag::StringHuge,
1945            };
1946            data.push(tag.into());
1947            push_lengthed_bytes(data, string.as_bytes(), tag);
1948        }
1949        Datum::List(list) => {
1950            let tag = match list.data.len() {
1951                0..TINY => Tag::ListTiny,
1952                TINY..SHORT => Tag::ListShort,
1953                SHORT..LONG => Tag::ListLong,
1954                _ => Tag::ListHuge,
1955            };
1956            data.push(tag.into());
1957            push_lengthed_bytes(data, list.data, tag);
1958        }
1959        Datum::Uuid(u) => {
1960            data.push(Tag::Uuid.into());
1961            data.extend_from_slice(u.as_bytes());
1962        }
1963        Datum::Array(array) => {
1964            // See the comment in `Row::push_array` for details on the encoding
1965            // of arrays.
1966            data.push(Tag::Array.into());
1967            data.push(array.dims.ndims());
1968            data.extend_from_slice(array.dims.data);
1969            push_untagged_bytes(data, array.elements.data);
1970        }
1971        Datum::Map(dict) => {
1972            data.push(Tag::Dict.into());
1973            push_untagged_bytes(data, dict.data);
1974        }
1975        Datum::JsonNull => data.push(Tag::JsonNull.into()),
1976        Datum::MzTimestamp(t) => {
1977            data.push(Tag::MzTimestamp.into());
1978            data.extend_from_slice(&t.encode());
1979        }
1980        Datum::Dummy => data.push(Tag::Dummy.into()),
1981        Datum::Numeric(mut n) => {
1982            // Pseudo-canonical representation of decimal values with
1983            // insignificant zeroes trimmed. This compresses the number further
1984            // than `Numeric::trim` by removing all zeroes, and not only those in
1985            // the fractional component.
1986            numeric::cx_datum().reduce(&mut n.0);
1987            let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1988            data.push(Tag::Numeric.into());
1989            data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1990            data.push(
1991                i8::try_from(exponent)
1992                    .expect("exponent to fit within i8; should not exceed +/- 39")
1993                    .to_le_bytes()[0],
1994            );
1995            data.push(bits);
1996
1997            let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1998
1999            // Little endian machines can take the lsu directly from u16 to u8.
2000            if cfg!(target_endian = "little") {
2001                // SAFETY: `lsu` (returned by `coefficient_units()`) is a `&[u16]`, so
2002                // each element can safely be transmuted into two `u8`s.
2003                let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
2004                // The `u8` aligned version of the `lsu` should have twice as many
2005                // elements as we expect for the `u16` version.
2006                soft_assert_no_log!(
2007                    lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
2008                    "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
2009                    Numeric::digits_to_lsu_elements_len(digits) * 2,
2010                    lsu_bytes.len()
2011                );
2012                // There should be no unaligned elements in the prefix or suffix.
2013                soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
2014                data.extend_from_slice(lsu_bytes);
2015            } else {
2016                for u in lsu {
2017                    data.extend_from_slice(&u.to_le_bytes());
2018                }
2019            }
2020        }
2021        Datum::Range(range) => {
2022            // See notes on `push_range_with` for details about encoding.
2023            data.push(Tag::Range.into());
2024            data.push(range.internal_flag_bits());
2025
2026            if let Some(RangeInner { lower, upper }) = range.inner {
2027                for bound in [lower.bound, upper.bound] {
2028                    if let Some(bound) = bound {
2029                        match bound.datum() {
2030                            Datum::Null => panic!("cannot push Datum::Null into range"),
2031                            d => push_datum::<D>(data, d),
2032                        }
2033                    }
2034                }
2035            }
2036        }
2037        Datum::MzAclItem(mz_acl_item) => {
2038            data.push(Tag::MzAclItem.into());
2039            data.extend_from_slice(&mz_acl_item.encode_binary());
2040        }
2041        Datum::AclItem(acl_item) => {
2042            data.push(Tag::AclItem.into());
2043            data.extend_from_slice(&acl_item.encode_binary());
2044        }
2045    }
2046}
2047
2048/// Return the number of bytes these Datums would use if packed as a Row.
2049pub fn row_size<'a, I>(a: I) -> usize
2050where
2051    I: IntoIterator<Item = Datum<'a>>,
2052{
2053    // Using datums_size instead of a.data().len() here is safer because it will
2054    // return the size of the datums if they were packed into a Row. Although
2055    // a.data().len() happens to give the correct answer (and is faster), data()
2056    // is documented as for debugging only.
2057    let sz = datums_size::<_, _>(a);
2058    let size_of_row = std::mem::size_of::<Row>();
2059    // The Row struct attempts to inline data until it can't fit in the
2060    // preallocated size. Otherwise it spills to heap, and uses the Row to point
2061    // to that.
2062    if sz > Row::SIZE {
2063        sz + size_of_row
2064    } else {
2065        size_of_row
2066    }
2067}
2068
2069/// Number of bytes required by the datum.
2070/// This is used to optimistically pre-allocate buffers for packing rows.
2071pub fn datum_size(datum: &Datum) -> usize {
2072    match datum {
2073        Datum::Null => 1,
2074        Datum::False => 1,
2075        Datum::True => 1,
2076        Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
2077        Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
2078        Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
2079        Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2080        Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2081        Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2082        Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
2083        Datum::Float32(_) => 1 + size_of::<f32>(),
2084        Datum::Float64(_) => 1 + size_of::<f64>(),
2085        Datum::Date(_) => 1 + size_of::<i32>(),
2086        Datum::Time(_) => 1 + 8,
2087        Datum::Timestamp(t) => {
2088            1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
2089                8
2090            } else {
2091                16
2092            }
2093        }
2094        Datum::TimestampTz(t) => {
2095            1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
2096                8
2097            } else {
2098                16
2099            }
2100        }
2101        Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
2102        Datum::Bytes(bytes) => {
2103            // We use a variable length representation of slice length.
2104            let bytes_for_length = match bytes.len() {
2105                0..TINY => 1,
2106                TINY..SHORT => 2,
2107                SHORT..LONG => 4,
2108                _ => 8,
2109            };
2110            1 + bytes_for_length + bytes.len()
2111        }
2112        Datum::String(string) => {
2113            // We use a variable length representation of slice length.
2114            let bytes_for_length = match string.len() {
2115                0..TINY => 1,
2116                TINY..SHORT => 2,
2117                SHORT..LONG => 4,
2118                _ => 8,
2119            };
2120            1 + bytes_for_length + string.len()
2121        }
2122        Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
2123        Datum::Array(array) => {
2124            1 + size_of::<u8>()
2125                + array.dims.data.len()
2126                + size_of::<u64>()
2127                + array.elements.data.len()
2128        }
2129        Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
2130        Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
2131        Datum::JsonNull => 1,
2132        Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
2133        Datum::Dummy => 1,
2134        Datum::Numeric(d) => {
2135            let mut d = d.0.clone();
2136            // Values must be reduced to determine appropriate number of
2137            // coefficient units.
2138            numeric::cx_datum().reduce(&mut d);
2139            // 4 = 1 bit each for tag, digits, exponent, bits
2140            4 + (d.coefficient_units().len() * 2)
2141        }
2142        Datum::Range(Range { inner }) => {
2143            // Tag + flags
2144            2 + match inner {
2145                None => 0,
2146                Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
2147                    .iter()
2148                    .map(|bound| match bound {
2149                        None => 0,
2150                        Some(bound) => bound.val.len(),
2151                    })
2152                    .sum(),
2153            }
2154        }
2155        Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
2156        Datum::AclItem(_) => 1 + AclItem::binary_size(),
2157    }
2158}
2159
2160/// Number of bytes required by a sequence of datums.
2161///
2162/// This method can be used to right-size the allocation for a `Row`
2163/// before calling [`RowPacker::extend`].
2164pub fn datums_size<'a, I, D>(iter: I) -> usize
2165where
2166    I: IntoIterator<Item = D>,
2167    D: Borrow<Datum<'a>>,
2168{
2169    iter.into_iter().map(|d| datum_size(d.borrow())).sum()
2170}
2171
2172/// Number of bytes required by a list of datums. This computes the size that would be required if
2173/// the given datums were packed into a list.
2174///
2175/// This is used to optimistically pre-allocate buffers for packing rows.
2176pub fn datum_list_size<'a, I, D>(iter: I) -> usize
2177where
2178    I: IntoIterator<Item = D>,
2179    D: Borrow<Datum<'a>>,
2180{
2181    1 + size_of::<u64>() + datums_size(iter)
2182}
2183
2184impl RowPacker<'_> {
2185    /// Constructs a row packer that will pack additional datums into the
2186    /// provided row.
2187    ///
2188    /// This function is intentionally somewhat inconvenient to call. You
2189    /// usually want to call [`Row::packer`] instead to start packing from
2190    /// scratch.
2191    pub fn for_existing_row(row: &mut Row) -> RowPacker<'_> {
2192        RowPacker { row }
2193    }
2194
2195    /// Extend an existing `Row` with a `Datum`.
2196    #[inline]
2197    pub fn push<'a, D>(&mut self, datum: D)
2198    where
2199        D: Borrow<Datum<'a>>,
2200    {
2201        push_datum(&mut self.row.data, *datum.borrow());
2202    }
2203
2204    /// Extend an existing `Row` with additional `Datum`s.
2205    #[inline]
2206    pub fn extend<'a, I, D>(&mut self, iter: I)
2207    where
2208        I: IntoIterator<Item = D>,
2209        D: Borrow<Datum<'a>>,
2210    {
2211        for datum in iter {
2212            push_datum(&mut self.row.data, *datum.borrow())
2213        }
2214    }
2215
2216    /// Extend an existing `Row` with additional `Datum`s.
2217    ///
2218    /// In the case the iterator produces an error, the pushing of
2219    /// datums in terminated and the error returned. The `Row` will
2220    /// be incomplete, but it will be safe to read datums from it.
2221    #[inline]
2222    pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
2223    where
2224        I: IntoIterator<Item = Result<D, E>>,
2225        D: Borrow<Datum<'a>>,
2226    {
2227        for datum in iter {
2228            push_datum(&mut self.row.data, *datum?.borrow());
2229        }
2230        Ok(())
2231    }
2232
2233    /// Appends the datums of an entire `Row`.
2234    pub fn extend_by_row(&mut self, row: &Row) {
2235        self.row.data.extend_from_slice(row.data.as_slice());
2236    }
2237
2238    /// Appends the datums of an entire `Row`.
2239    pub fn extend_by_row_ref(&mut self, row: &RowRef) {
2240        self.row.data.extend_from_slice(row.data());
2241    }
2242
2243    /// Appends the slice of data representing an entire `Row`. The data is not validated.
2244    ///
2245    /// # Safety
2246    ///
2247    /// The requirements from [`Row::from_bytes_unchecked`] apply here, too:
2248    /// This method relies on `data` being an appropriate row encoding, and can
2249    /// result in unsafety if this is not the case.
2250    #[inline]
2251    pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
2252        self.row.data.extend_from_slice(data)
2253    }
2254
2255    /// Pushes a [`DatumList`] that is built from a closure.
2256    ///
2257    /// The supplied closure will be invoked once with a `Row` that can be used
2258    /// to populate the list. It is valid to call any method on the
2259    /// [`RowPacker`] except for [`RowPacker::clear`], [`RowPacker::truncate`],
2260    /// or [`RowPacker::truncate_datums`].
2261    ///
2262    /// Returns the value returned by the closure, if any.
2263    ///
2264    /// ```
2265    /// # use mz_repr::{Row, Datum};
2266    /// let mut row = Row::default();
2267    /// row.packer().push_list_with(|row| {
2268    ///     row.push(Datum::String("age"));
2269    ///     row.push(Datum::Int64(42));
2270    /// });
2271    /// assert_eq!(
2272    ///     row.unpack_first().unwrap_list().iter().collect::<Vec<_>>(),
2273    ///     vec![Datum::String("age"), Datum::Int64(42)],
2274    /// );
2275    /// ```
2276    #[inline]
2277    pub fn push_list_with<F, R>(&mut self, f: F) -> R
2278    where
2279        F: FnOnce(&mut RowPacker) -> R,
2280    {
2281        // First, assume that the list will fit in 255 bytes, and thus the length will fit in
2282        // 1 byte. If not, we'll fix it up later.
2283        let start = self.row.data.len();
2284        self.row.data.push(Tag::ListTiny.into());
2285        // Write a dummy len, will fix it up later.
2286        self.row.data.push(0);
2287
2288        let out = f(self);
2289
2290        // The `- 1 - 1` is for the tag and the len.
2291        let len = self.row.data.len() - start - 1 - 1;
2292        // We now know the real len.
2293        if len < TINY {
2294            // If the len fits in 1 byte, we just need to fix up the len.
2295            self.row.data[start + 1] = len.to_le_bytes()[0];
2296        } else {
2297            // Note: We move this code path into its own function, so that the common case can be
2298            // inlined.
2299            long_list(&mut self.row.data, start, len);
2300        }
2301
2302        /// 1. Fix up the tag.
2303        /// 2. Move the actual data a bit (for which we also need to make room at the end).
2304        /// 3. Fix up the len.
2305        /// `data`: The row's backing data.
2306        /// `start`: where `push_list_with` started writing in `data`.
2307        /// `len`: the length of the data, excluding the tag and the length.
2308        #[cold]
2309        fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2310            // `len_len`: the length of the length. (Possible values are: 2, 4, 8. 1 is handled
2311            // elsewhere.) The other parameters are the same as for `long_list`.
2312            let long_list_inner = |data: &mut CompactBytes, len_len| {
2313                // We'll need memory for the new, bigger length, so make the `CompactBytes` bigger.
2314                // The `- 1` is because the old length was 1 byte.
2315                const ZEROS: [u8; 8] = [0; 8];
2316                data.extend_from_slice(&ZEROS[0..len_len - 1]);
2317                // Move the data to the end of the `CompactBytes`, to make space for the new length.
2318                // Originally, it started after the 1-byte tag and the 1-byte length, now it will
2319                // start after the 1-byte tag and the len_len-byte length.
2320                //
2321                // Note that this is the only operation in `long_list` whose cost is proportional
2322                // to `len`. Since `len` is at least 256 here, the other operations' cost are
2323                // negligible. `copy_within` is a memmove, which is probably a fair bit faster per
2324                // Datum than a Datum encoding in the `f` closure.
2325                data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2326                // Write the new length.
2327                data[start + 1..start + 1 + len_len]
2328                    .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2329            };
2330            match len {
2331                0..TINY => {
2332                    unreachable!()
2333                }
2334                TINY..SHORT => {
2335                    data[start] = Tag::ListShort.into();
2336                    long_list_inner(data, 2);
2337                }
2338                SHORT..LONG => {
2339                    data[start] = Tag::ListLong.into();
2340                    long_list_inner(data, 4);
2341                }
2342                _ => {
2343                    data[start] = Tag::ListHuge.into();
2344                    long_list_inner(data, 8);
2345                }
2346            };
2347        }
2348
2349        out
2350    }
2351
2352    /// Pushes a [`DatumMap`] that is built from a closure.
2353    ///
2354    /// The supplied closure will be invoked once with a `Row` that can be used
2355    /// to populate the dict.
2356    ///
2357    /// The closure **must** alternate pushing string keys and arbitrary values,
2358    /// otherwise reading the dict will cause a panic.
2359    ///
2360    /// The closure **must** push keys in ascending order, otherwise equality
2361    /// checks on the resulting `Row` may be wrong and reading the dict IN DEBUG
2362    /// MODE will cause a panic.
2363    ///
2364    /// The closure **must not** call [`RowPacker::clear`],
2365    /// [`RowPacker::truncate`], or [`RowPacker::truncate_datums`].
2366    ///
2367    /// # Example
2368    ///
2369    /// ```
2370    /// # use mz_repr::{Row, Datum};
2371    /// let mut row = Row::default();
2372    /// row.packer().push_dict_with(|row| {
2373    ///
2374    ///     // key
2375    ///     row.push(Datum::String("age"));
2376    ///     // value
2377    ///     row.push(Datum::Int64(42));
2378    ///
2379    ///     // key
2380    ///     row.push(Datum::String("name"));
2381    ///     // value
2382    ///     row.push(Datum::String("bob"));
2383    /// });
2384    /// assert_eq!(
2385    ///     row.unpack_first().unwrap_map().iter().collect::<Vec<_>>(),
2386    ///     vec![("age", Datum::Int64(42)), ("name", Datum::String("bob"))]
2387    /// );
2388    /// ```
2389    pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2390    where
2391        F: FnOnce(&mut RowPacker) -> R,
2392    {
2393        self.row.data.push(Tag::Dict.into());
2394        let start = self.row.data.len();
2395        // write a dummy len, will fix it up later
2396        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2397
2398        let res = f(self);
2399
2400        let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2401        // fix up the len
2402        self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2403
2404        res
2405    }
2406
2407    /// Like [`RowPacker::push_dict_with`], but accepts a fallible closure.
2408    pub fn try_push_dict_with<F, E>(&mut self, f: F) -> Result<(), E>
2409    where
2410        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2411    {
2412        self.push_dict_with(f)
2413    }
2414
2415    /// Convenience function to construct an array from an iter of `Datum`s.
2416    ///
2417    /// Returns an error if the number of elements in `iter` does not match
2418    /// the cardinality of the array as described by `dims`, or if the
2419    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2420    /// occurs, the packer's state will be unchanged.
2421    pub fn try_push_array<'a, I, D>(
2422        &mut self,
2423        dims: &[ArrayDimension],
2424        iter: I,
2425    ) -> Result<(), InvalidArrayError>
2426    where
2427        I: IntoIterator<Item = D>,
2428        D: Borrow<Datum<'a>>,
2429    {
2430        // SAFETY: The function returns the exact number of elements pushed into the array.
2431        unsafe {
2432            self.push_array_with_unchecked(dims, |packer| {
2433                let mut nelements = 0;
2434                for datum in iter {
2435                    packer.push(datum);
2436                    nelements += 1;
2437                }
2438                Ok::<_, InvalidArrayError>(nelements)
2439            })
2440        }
2441    }
2442
2443    /// Like [`RowPacker::try_push_array`], but accepts a fallible iterator of
2444    /// elements.
2445    pub fn try_push_array_fallible<'a, I, D, E>(
2446        &mut self,
2447        dims: &[ArrayDimension],
2448        iter: I,
2449    ) -> Result<Result<(), E>, InvalidArrayError>
2450    where
2451        I: IntoIterator<Item = Result<D, E>>,
2452        D: Borrow<Datum<'a>>,
2453    {
2454        enum Error<E> {
2455            Usage(InvalidArrayError),
2456            Inner(E),
2457        }
2458
2459        impl<E> From<InvalidArrayError> for Error<E> {
2460            fn from(e: InvalidArrayError) -> Self {
2461                Self::Usage(e)
2462            }
2463        }
2464
2465        // SAFETY: The function returns the exact number of elements pushed into the array.
2466        let result = unsafe {
2467            self.push_array_with_unchecked(dims, |packer| {
2468                let mut nelements = 0;
2469                for datum in iter {
2470                    packer.push(datum.map_err(Error::Inner)?);
2471                    nelements += 1;
2472                }
2473                Ok(nelements)
2474            })
2475        };
2476        match result {
2477            Ok(()) => Ok(Ok(())),
2478            Err(Error::Usage(e)) => Err(e),
2479            Err(Error::Inner(e)) => Ok(Err(e)),
2480        }
2481    }
2482
2483    /// Convenience function to construct an array from a function. The function must return the
2484    /// number of elements it pushed into the array. It is undefined behavior if the function returns
2485    /// a number different to the number of elements it pushed.
2486    ///
2487    /// Returns an error if the number of elements pushed by `f` does not match
2488    /// the cardinality of the array as described by `dims`, or if the
2489    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`], or if `f` errors. If an error
2490    /// occurs, the packer's state will be unchanged.
2491    pub unsafe fn push_array_with_unchecked<F, E>(
2492        &mut self,
2493        dims: &[ArrayDimension],
2494        f: F,
2495    ) -> Result<(), E>
2496    where
2497        F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2498        E: From<InvalidArrayError>,
2499    {
2500        // Arrays are encoded as follows.
2501        //
2502        // u8    ndims
2503        // u64   dim_0 lower bound
2504        // u64   dim_0 length
2505        // ...
2506        // u64   dim_n lower bound
2507        // u64   dim_n length
2508        // u64   element data size in bytes
2509        // u8    element data, where elements are encoded in row-major order
2510
2511        if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2512            return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2513        }
2514
2515        let start = self.row.data.len();
2516        self.row.data.push(Tag::Array.into());
2517
2518        // Write dimension information.
2519        self.row
2520            .data
2521            .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2522        for dim in dims {
2523            self.row
2524                .data
2525                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2526            self.row
2527                .data
2528                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2529        }
2530
2531        // Write elements.
2532        let off = self.row.data.len();
2533        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2534        let nelements = match f(self) {
2535            Ok(nelements) => nelements,
2536            Err(e) => {
2537                self.row.data.truncate(start);
2538                return Err(e);
2539            }
2540        };
2541        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2542        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2543
2544        // Check that the number of elements written matches the dimension
2545        // information.
2546        let cardinality = match dims {
2547            [] => 0,
2548            dims => dims.iter().map(|d| d.length).product(),
2549        };
2550        if nelements != cardinality {
2551            self.row.data.truncate(start);
2552            return Err(InvalidArrayError::WrongCardinality {
2553                actual: nelements,
2554                expected: cardinality,
2555            }
2556            .into());
2557        }
2558
2559        Ok(())
2560    }
2561
2562    /// Pushes an [`Array`] that is built from a closure.
2563    ///
2564    /// __WARNING__: This is fairly "sharp" tool that is easy to get wrong. You
2565    /// should prefer [`RowPacker::try_push_array`] when possible.
2566    ///
2567    /// Returns an error if the number of elements pushed does not match
2568    /// the cardinality of the array as described by `dims`, or if the
2569    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2570    /// occurs, the packer's state will be unchanged.
2571    pub fn push_array_with_row_major<F, I>(
2572        &mut self,
2573        dims: I,
2574        f: F,
2575    ) -> Result<(), InvalidArrayError>
2576    where
2577        I: IntoIterator<Item = ArrayDimension>,
2578        F: FnOnce(&mut RowPacker) -> usize,
2579    {
2580        let start = self.row.data.len();
2581        self.row.data.push(Tag::Array.into());
2582
2583        // Write dummy dimension length for now, we'll fix it up.
2584        let dims_start = self.row.data.len();
2585        self.row.data.push(42);
2586
2587        let mut num_dims: u8 = 0;
2588        let mut cardinality: usize = 1;
2589        for dim in dims {
2590            num_dims += 1;
2591            cardinality *= dim.length;
2592
2593            self.row
2594                .data
2595                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2596            self.row
2597                .data
2598                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2599        }
2600
2601        if num_dims > MAX_ARRAY_DIMENSIONS {
2602            // Reset the packer state so we don't have invalid data.
2603            self.row.data.truncate(start);
2604            return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2605        }
2606        // Fix up our dimension length.
2607        self.row.data[dims_start..dims_start + size_of::<u8>()]
2608            .copy_from_slice(&num_dims.to_le_bytes());
2609
2610        // Write elements.
2611        let off = self.row.data.len();
2612        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2613
2614        let nelements = f(self);
2615
2616        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2617        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2618
2619        // Check that the number of elements written matches the dimension
2620        // information.
2621        let cardinality = match num_dims {
2622            0 => 0,
2623            _ => cardinality,
2624        };
2625        if nelements != cardinality {
2626            self.row.data.truncate(start);
2627            return Err(InvalidArrayError::WrongCardinality {
2628                actual: nelements,
2629                expected: cardinality,
2630            });
2631        }
2632
2633        Ok(())
2634    }
2635
2636    /// Convenience function to push a `DatumList` from an iter of `Datum`s
2637    ///
2638    /// See [`RowPacker::push_dict_with`] if you need to be able to handle errors
2639    pub fn push_list<'a, I, D>(&mut self, iter: I)
2640    where
2641        I: IntoIterator<Item = D>,
2642        D: Borrow<Datum<'a>>,
2643    {
2644        self.push_list_with(|packer| {
2645            for elem in iter {
2646                packer.push(*elem.borrow())
2647            }
2648        });
2649    }
2650
2651    /// Convenience function to push a `DatumMap` from an iter of `(&str, Datum)` pairs
2652    pub fn push_dict<'a, I, D>(&mut self, iter: I)
2653    where
2654        I: IntoIterator<Item = (&'a str, D)>,
2655        D: Borrow<Datum<'a>>,
2656    {
2657        self.push_dict_with(|packer| {
2658            for (k, v) in iter {
2659                packer.push(Datum::String(k));
2660                packer.push(*v.borrow())
2661            }
2662        })
2663    }
2664
2665    /// Pushes a `Datum::Range` derived from the `Range<Datum<'a>`.
2666    ///
2667    /// # Panics
2668    /// - If lower and upper express finite values and they are datums of
2669    ///   different types.
2670    /// - If lower or upper express finite values and are equal to
2671    ///   `Datum::Null`. To handle `Datum::Null` properly, use
2672    ///   [`RangeBound::new`].
2673    ///
2674    /// # Notes
2675    /// - This function canonicalizes the range before pushing it to the row.
2676    /// - Prefer this function over `push_range_with` because of its
2677    ///   canonicaliztion.
2678    /// - Prefer creating [`RangeBound`]s using [`RangeBound::new`], which
2679    ///   handles `Datum::Null` in a SQL-friendly way.
2680    pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2681        range.canonicalize()?;
2682        match range.inner {
2683            None => {
2684                self.row.data.push(Tag::Range.into());
2685                // Untagged bytes only contains the `RANGE_EMPTY` flag value.
2686                self.row.data.push(range::InternalFlags::EMPTY.bits());
2687                Ok(())
2688            }
2689            Some(inner) => self.push_range_with(
2690                RangeLowerBound {
2691                    inclusive: inner.lower.inclusive,
2692                    bound: inner
2693                        .lower
2694                        .bound
2695                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2696                },
2697                RangeUpperBound {
2698                    inclusive: inner.upper.inclusive,
2699                    bound: inner
2700                        .upper
2701                        .bound
2702                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2703                },
2704            ),
2705        }
2706    }
2707
2708    /// Pushes a `DatumRange` built from the specified arguments.
2709    ///
2710    /// # Warning
2711    /// Unlike `push_range`, `push_range_with` _does not_ canonicalize its
2712    /// inputs. Consequentially, this means it's possible to generate ranges
2713    /// that will not reflect the proper ordering and equality.
2714    ///
2715    /// # Panics
2716    /// - If lower or upper expresses a finite value and does not push exactly
2717    ///   one value into the `RowPacker`.
2718    /// - If lower and upper express finite values and they are datums of
2719    ///   different types.
2720    /// - If lower or upper express finite values and push `Datum::Null`.
2721    ///
2722    /// # Notes
2723    /// - Prefer `push_range_with` over this function. This function should be
2724    ///   used only when you are not pushing `Datum`s to the inner row.
2725    /// - Range encoding is `[<flag bytes>,<lower>?,<upper>?]`, where `lower`
2726    ///   and `upper` are optional, contingent on the flag value expressing an
2727    ///   empty range (where neither will be present) or infinite bounds (where
2728    ///   each infinite bound will be absent).
2729    /// - To push an emtpy range, use `push_range` using `Range { inner: None }`.
2730    pub fn push_range_with<L, U, E>(
2731        &mut self,
2732        lower: RangeLowerBound<L>,
2733        upper: RangeUpperBound<U>,
2734    ) -> Result<(), E>
2735    where
2736        L: FnOnce(&mut RowPacker) -> Result<(), E>,
2737        U: FnOnce(&mut RowPacker) -> Result<(), E>,
2738        E: From<InvalidRangeError>,
2739    {
2740        let start = self.row.data.len();
2741        self.row.data.push(Tag::Range.into());
2742
2743        let mut flags = range::InternalFlags::empty();
2744
2745        flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2746        flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2747        flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2748        flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2749
2750        let mut expected_datums = 0;
2751
2752        self.row.data.push(flags.bits());
2753
2754        let datum_check = self.row.data.len();
2755
2756        if let Some(value) = lower.bound {
2757            let start = self.row.data.len();
2758            value(self)?;
2759            assert!(
2760                start < self.row.data.len(),
2761                "finite values must each push exactly one value; expected 1 but got 0"
2762            );
2763            expected_datums += 1;
2764        }
2765
2766        if let Some(value) = upper.bound {
2767            let start = self.row.data.len();
2768            value(self)?;
2769            assert!(
2770                start < self.row.data.len(),
2771                "finite values must each push exactly one value; expected 1 but got 0"
2772            );
2773            expected_datums += 1;
2774        }
2775
2776        // Validate the invariants that 0, 1, or 2 elements were pushed, none are Null,
2777        // and if two are pushed then the second is not less than the first. Panic in
2778        // some cases and error in others.
2779        let mut actual_datums = 0;
2780        let mut seen = None;
2781        let mut dataz = &self.row.data[datum_check..];
2782        while !dataz.is_empty() {
2783            let d = unsafe { read_datum(&mut dataz) };
2784            assert!(d != Datum::Null, "cannot push Datum::Null into range");
2785
2786            match seen {
2787                None => seen = Some(d),
2788                Some(seen) => {
2789                    let seen_kind = DatumKind::from(seen);
2790                    let d_kind = DatumKind::from(d);
2791                    assert!(
2792                        seen_kind == d_kind,
2793                        "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2794                    );
2795
2796                    if seen > d {
2797                        self.row.data.truncate(start);
2798                        return Err(InvalidRangeError::MisorderedRangeBounds.into());
2799                    }
2800                }
2801            }
2802            actual_datums += 1;
2803        }
2804
2805        assert!(
2806            actual_datums == expected_datums,
2807            "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2808        );
2809
2810        Ok(())
2811    }
2812
2813    /// Clears the contents of the packer without de-allocating its backing memory.
2814    pub fn clear(&mut self) {
2815        self.row.data.clear();
2816    }
2817
2818    /// Truncates the underlying storage to the specified byte position.
2819    ///
2820    /// # Safety
2821    ///
2822    /// `pos` MUST specify a byte offset that lies on a datum boundary.
2823    /// If `pos` specifies a byte offset that is *within* a datum, the row
2824    /// packer will produce an invalid row, the unpacking of which may
2825    /// trigger undefined behavior!
2826    ///
2827    /// To find the byte offset of a datum boundary, inspect the packer's
2828    /// byte length by calling `packer.data().len()` after pushing the desired
2829    /// number of datums onto the packer.
2830    pub unsafe fn truncate(&mut self, pos: usize) {
2831        self.row.data.truncate(pos)
2832    }
2833
2834    /// Truncates the underlying row to contain at most the first `n` datums.
2835    pub fn truncate_datums(&mut self, n: usize) {
2836        let prev_len = self.row.data.len();
2837        let mut iter = self.row.iter();
2838        for _ in iter.by_ref().take(n) {}
2839        let next_len = iter.data.len();
2840        // SAFETY: iterator offsets always lie on a datum boundary.
2841        unsafe { self.truncate(prev_len - next_len) }
2842    }
2843
2844    /// Returns the total amount of bytes used by the underlying row.
2845    pub fn byte_len(&self) -> usize {
2846        self.row.byte_len()
2847    }
2848}
2849
2850impl<'a> IntoIterator for &'a Row {
2851    type Item = Datum<'a>;
2852    type IntoIter = DatumListIter<'a>;
2853    fn into_iter(self) -> DatumListIter<'a> {
2854        self.iter()
2855    }
2856}
2857
2858impl fmt::Debug for Row {
2859    /// Debug representation using the internal datums
2860    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2861        f.write_str("Row{")?;
2862        f.debug_list().entries(self.iter()).finish()?;
2863        f.write_str("}")
2864    }
2865}
2866
2867impl fmt::Display for Row {
2868    /// Display representation using the internal datums
2869    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2870        f.write_str("(")?;
2871        for (i, datum) in self.iter().enumerate() {
2872            if i != 0 {
2873                f.write_str(", ")?;
2874            }
2875            write!(f, "{}", datum)?;
2876        }
2877        f.write_str(")")
2878    }
2879}
2880
2881impl<'a, T> DatumList<'a, T> {
2882    pub fn iter(&self) -> DatumListIter<'a> {
2883        DatumListIter { data: self.data }
2884    }
2885
2886    /// Iterate elements as typed `T` values rather than raw `Datum`s.
2887    ///
2888    /// Each datum is decoded and converted via [`FromDatum`]. Since generic
2889    /// type parameters in `#[sqlfunc]` are erased to `Datum<'a>` before code
2890    /// generation, this is monomorphized to an identity conversion at runtime.
2891    pub fn typed_iter(&self) -> DatumListTypedIter<'a, T>
2892    where
2893        T: FromDatum<'a>,
2894    {
2895        DatumListTypedIter {
2896            inner: self.iter(),
2897            _phantom: PhantomData,
2898        }
2899    }
2900
2901    /// For debugging only
2902    pub fn data(&self) -> &'a [u8] {
2903        self.data
2904    }
2905}
2906
2907impl<T> DatumList<'static, T> {
2908    pub fn empty() -> Self {
2909        DatumList::new(&[])
2910    }
2911}
2912
2913impl<'a> IntoIterator for DatumList<'a> {
2914    type Item = Datum<'a>;
2915    type IntoIter = DatumListIter<'a>;
2916    fn into_iter(self) -> DatumListIter<'a> {
2917        self.iter()
2918    }
2919}
2920
2921impl<'a> Iterator for DatumListIter<'a> {
2922    type Item = Datum<'a>;
2923    fn next(&mut self) -> Option<Self::Item> {
2924        if self.data.is_empty() {
2925            None
2926        } else {
2927            Some(unsafe { read_datum(&mut self.data) })
2928        }
2929    }
2930}
2931
2932impl<'a, T: FromDatum<'a>> Iterator for DatumListTypedIter<'a, T> {
2933    type Item = T;
2934    fn next(&mut self) -> Option<Self::Item> {
2935        self.inner.next().map(T::from_datum)
2936    }
2937}
2938
2939impl<'a, T> DatumMap<'a, T> {
2940    pub fn iter(&self) -> DatumDictIter<'a> {
2941        DatumDictIter {
2942            data: self.data,
2943            prev_key: None,
2944        }
2945    }
2946
2947    /// Iterate entries as `(&str, T)` pairs rather than `(&str, Datum)`.
2948    ///
2949    /// Each value datum is converted via [`FromDatum`]. Since generic type
2950    /// parameters in `#[sqlfunc]` are erased to `Datum<'a>` before code
2951    /// generation, this is monomorphized to an identity conversion at runtime.
2952    pub fn typed_iter(&self) -> DatumDictTypedIter<'a, T>
2953    where
2954        T: FromDatum<'a>,
2955    {
2956        DatumDictTypedIter {
2957            inner: self.iter(),
2958            _phantom: PhantomData,
2959        }
2960    }
2961
2962    /// For debugging only
2963    pub fn data(&self) -> &'a [u8] {
2964        self.data
2965    }
2966}
2967
2968impl<T> DatumMap<'static, T> {
2969    pub fn empty() -> Self {
2970        DatumMap::new(&[])
2971    }
2972}
2973
2974impl<'a, T> Debug for DatumMap<'a, T> {
2975    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2976        f.debug_map().entries(self.iter()).finish()
2977    }
2978}
2979
2980impl<'a> IntoIterator for &'a DatumMap<'a> {
2981    type Item = (&'a str, Datum<'a>);
2982    type IntoIter = DatumDictIter<'a>;
2983    fn into_iter(self) -> DatumDictIter<'a> {
2984        self.iter()
2985    }
2986}
2987
2988impl<'a> Iterator for DatumDictIter<'a> {
2989    type Item = (&'a str, Datum<'a>);
2990    fn next(&mut self) -> Option<Self::Item> {
2991        if self.data.is_empty() {
2992            None
2993        } else {
2994            let key_tag =
2995                Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2996            assert!(
2997                key_tag == Tag::StringTiny
2998                    || key_tag == Tag::StringShort
2999                    || key_tag == Tag::StringLong
3000                    || key_tag == Tag::StringHuge,
3001                "Dict keys must be strings, got {:?}",
3002                key_tag
3003            );
3004            let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
3005            let val = unsafe { read_datum(&mut self.data) };
3006
3007            // if in debug mode, sanity check keys
3008            if cfg!(debug_assertions) {
3009                if let Some(prev_key) = self.prev_key {
3010                    debug_assert!(
3011                        prev_key < key,
3012                        "Dict keys must be unique and given in ascending order: {} came before {}",
3013                        prev_key,
3014                        key
3015                    );
3016                }
3017                self.prev_key = Some(key);
3018            }
3019
3020            Some((key, val))
3021        }
3022    }
3023}
3024
3025impl<'a, T: FromDatum<'a>> Iterator for DatumDictTypedIter<'a, T> {
3026    type Item = (&'a str, T);
3027    fn next(&mut self) -> Option<Self::Item> {
3028        self.inner.next().map(|(k, v)| (k, T::from_datum(v)))
3029    }
3030}
3031
3032impl RowArena {
3033    pub fn new() -> Self {
3034        RowArena {
3035            inner: RefCell::new(vec![]),
3036        }
3037    }
3038
3039    /// Creates a `RowArena` with a hint of how many rows will be created in the arena, to avoid
3040    /// reallocations of its internal vector.
3041    pub fn with_capacity(capacity: usize) -> Self {
3042        RowArena {
3043            inner: RefCell::new(Vec::with_capacity(capacity)),
3044        }
3045    }
3046
3047    /// Does a `reserve` on the underlying `Vec`. Call this when you expect `additional` more datums
3048    /// to be created in this arena.
3049    pub fn reserve(&self, additional: usize) {
3050        self.inner.borrow_mut().reserve(additional);
3051    }
3052
3053    /// Take ownership of `bytes` for the lifetime of the arena.
3054    #[allow(clippy::transmute_ptr_to_ptr)]
3055    pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
3056        let mut inner = self.inner.borrow_mut();
3057        inner.push(bytes);
3058        let owned_bytes = &inner[inner.len() - 1];
3059        unsafe {
3060            // This is safe because:
3061            //   * We only ever append to self.inner, so the byte vector
3062            //     will live as long as the arena.
3063            //   * We return a reference to the byte vector's contents, so it's
3064            //     okay if self.inner reallocates and moves the byte
3065            //     vector.
3066            //   * We don't allow access to the byte vector itself, so it will
3067            //     never reallocate.
3068            transmute::<&[u8], &'a [u8]>(owned_bytes)
3069        }
3070    }
3071
3072    /// Take ownership of `string` for the lifetime of the arena.
3073    pub fn push_string<'a>(&'a self, string: String) -> &'a str {
3074        let owned_bytes = self.push_bytes(string.into_bytes());
3075        unsafe {
3076            // This is safe because we know it was a `String` just before.
3077            std::str::from_utf8_unchecked(owned_bytes)
3078        }
3079    }
3080
3081    /// Take ownership of `row` for the lifetime of the arena, returning a
3082    /// reference to the first datum in the row.
3083    ///
3084    /// If we had an owned datum type, this method would be much clearer, and
3085    /// would be called `push_owned_datum`.
3086    pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
3087        let mut inner = self.inner.borrow_mut();
3088        inner.push(row.data.into_vec());
3089        unsafe {
3090            // This is safe because:
3091            //   * We only ever append to self.inner, so the row data will live
3092            //     as long as the arena.
3093            //   * We force the row data into its own heap allocation--
3094            //     importantly, we do NOT store the SmallVec, which might be
3095            //     storing data inline--so it's okay if self.inner reallocates
3096            //     and moves the row.
3097            //   * We don't allow access to the byte vector itself, so it will
3098            //     never reallocate.
3099            let datum = read_datum(&mut &inner[inner.len() - 1][..]);
3100            transmute::<Datum<'_>, Datum<'a>>(datum)
3101        }
3102    }
3103
3104    /// Equivalent to `push_unary_row` but returns a `DatumNested` rather than a
3105    /// `Datum`.
3106    fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
3107        let mut inner = self.inner.borrow_mut();
3108        inner.push(row.data.into_vec());
3109        unsafe {
3110            // This is safe because:
3111            //   * We only ever append to self.inner, so the row data will live
3112            //     as long as the arena.
3113            //   * We force the row data into its own heap allocation--
3114            //     importantly, we do NOT store the SmallVec, which might be
3115            //     storing data inline--so it's okay if self.inner reallocates
3116            //     and moves the row.
3117            //   * We don't allow access to the byte vector itself, so it will
3118            //     never reallocate.
3119            let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
3120            transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
3121        }
3122    }
3123
3124    /// Convenience function to make a new `Row` containing a single datum, and
3125    /// take ownership of it for the lifetime of the arena
3126    ///
3127    /// ```
3128    /// # use mz_repr::{RowArena, Datum};
3129    /// let arena = RowArena::new();
3130    /// let datum = arena.make_datum(|packer| {
3131    ///   packer.push_list(&[Datum::String("hello"), Datum::String("world")]);
3132    /// });
3133    /// assert_eq!(datum.unwrap_list().iter().collect::<Vec<_>>(), vec![Datum::String("hello"), Datum::String("world")]);
3134    /// ```
3135    pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
3136    where
3137        F: FnOnce(&mut RowPacker),
3138    {
3139        let mut row = Row::default();
3140        f(&mut row.packer());
3141        self.push_unary_row(row)
3142    }
3143
3144    /// Convenience function to build a list datum from an iterator of typed
3145    /// elements and return it as a `DatumList<'a, T>`.
3146    ///
3147    /// By accepting an iterator of `T: Borrow<Datum>` instead of a raw
3148    /// `RowPacker` closure, this guarantees that only elements of type `T`
3149    /// are pushed.
3150    pub fn make_datum_list<'a, T: std::borrow::Borrow<Datum<'a>>>(
3151        &'a self,
3152        iter: impl IntoIterator<Item = T>,
3153    ) -> DatumList<'a, T> {
3154        let datum = self.make_datum(|packer| {
3155            packer.push_list_with(|packer| {
3156                for elem in iter {
3157                    packer.push(*elem.borrow());
3158                }
3159            });
3160        });
3161        DatumList::new(datum.unwrap_list().data())
3162    }
3163
3164    /// Convenience function identical to `make_datum` but instead returns a
3165    /// `DatumNested`.
3166    pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
3167    where
3168        F: FnOnce(&mut RowPacker),
3169    {
3170        let mut row = Row::default();
3171        f(&mut row.packer());
3172        self.push_unary_row_datum_nested(row)
3173    }
3174
3175    /// Like [`RowArena::make_datum`], but the provided closure can return an error.
3176    pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
3177    where
3178        F: FnOnce(&mut RowPacker) -> Result<(), E>,
3179    {
3180        let mut row = Row::default();
3181        f(&mut row.packer())?;
3182        Ok(self.push_unary_row(row))
3183    }
3184
3185    /// Clear the contents of the arena.
3186    pub fn clear(&mut self) {
3187        self.inner.borrow_mut().clear();
3188    }
3189}
3190
3191impl Default for RowArena {
3192    fn default() -> RowArena {
3193        RowArena::new()
3194    }
3195}
3196
3197/// A thread-local row, which can be borrowed and returned.
3198/// # Example
3199///
3200/// Use this type instead of creating a new row:
3201/// ```
3202/// use mz_repr::SharedRow;
3203///
3204/// let mut row_builder = SharedRow::get();
3205/// ```
3206///
3207/// This allows us to reuse an existing row allocation instead of creating a new one or retaining
3208/// an allocation locally. Additionally, we can observe the size of the local row in a central
3209/// place and potentially reallocate to reduce memory needs.
3210///
3211/// # Panic
3212///
3213/// [`SharedRow::get`] panics when trying to obtain multiple references to the shared row.
3214#[derive(Debug)]
3215pub struct SharedRow(Row);
3216
3217impl SharedRow {
3218    thread_local! {
3219        /// A thread-local slot containing a shared Row that can be temporarily used by a function.
3220        /// There can be at most one active user of this Row, which is tracked by the state of the
3221        /// `Option<_>` wrapper. When it is `Some(..)`, the row is available for using. When it
3222        /// is `None`, it is not, and the constructor will panic if a thread attempts to use it.
3223        static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
3224    }
3225
3226    /// Get the shared row.
3227    ///
3228    /// The row's contents are cleared before returning it.
3229    ///
3230    /// # Panic
3231    ///
3232    /// Panics when the row is already borrowed elsewhere.
3233    pub fn get() -> Self {
3234        let mut row = Self::SHARED_ROW
3235            .take()
3236            .expect("attempted to borrow already borrowed SharedRow");
3237        // Clear row
3238        row.packer();
3239        Self(row)
3240    }
3241
3242    /// Gets the shared row and uses it to pack `iter`.
3243    pub fn pack<'a, I, D>(iter: I) -> Row
3244    where
3245        I: IntoIterator<Item = D>,
3246        D: Borrow<Datum<'a>>,
3247    {
3248        let mut row_builder = Self::get();
3249        let mut row_packer = row_builder.packer();
3250        row_packer.extend(iter);
3251        row_builder.clone()
3252    }
3253}
3254
3255impl std::ops::Deref for SharedRow {
3256    type Target = Row;
3257
3258    fn deref(&self) -> &Self::Target {
3259        &self.0
3260    }
3261}
3262
3263impl std::ops::DerefMut for SharedRow {
3264    fn deref_mut(&mut self) -> &mut Self::Target {
3265        &mut self.0
3266    }
3267}
3268
3269impl Drop for SharedRow {
3270    fn drop(&mut self) {
3271        // Take the Row allocation from this instance and put it back in the thread local slot for
3272        // the next user. The Row in `self` is replaced with an empty Row which does not allocate.
3273        Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
3274    }
3275}
3276
3277#[cfg(test)]
3278mod tests {
3279    use std::cmp::Ordering;
3280    use std::collections::hash_map::DefaultHasher;
3281    use std::hash::{Hash, Hasher};
3282
3283    use chrono::{DateTime, NaiveDate};
3284    use itertools::Itertools;
3285    use mz_ore::{assert_err, assert_none};
3286    use ordered_float::OrderedFloat;
3287
3288    use crate::SqlScalarType;
3289
3290    use super::*;
3291
3292    fn hash<T: Hash>(t: &T) -> u64 {
3293        let mut hasher = DefaultHasher::new();
3294        t.hash(&mut hasher);
3295        hasher.finish()
3296    }
3297
3298    #[mz_ore::test]
3299    fn test_assumptions() {
3300        assert_eq!(size_of::<Tag>(), 1);
3301        #[cfg(target_endian = "big")]
3302        {
3303            // if you want to run this on a big-endian cpu, we'll need big-endian versions of the serialization code
3304            assert!(false);
3305        }
3306    }
3307
3308    #[mz_ore::test]
3309    fn miri_test_arena() {
3310        let arena = RowArena::new();
3311
3312        assert_eq!(arena.push_string("".to_owned()), "");
3313        assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
3314
3315        let empty: &[u8] = &[];
3316        assert_eq!(arena.push_bytes(vec![]), empty);
3317        assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
3318
3319        let mut row = Row::default();
3320        let mut packer = row.packer();
3321        packer.push_dict_with(|row| {
3322            row.push(Datum::String("a"));
3323            row.push_list_with(|row| {
3324                row.push(Datum::String("one"));
3325                row.push(Datum::String("two"));
3326                row.push(Datum::String("three"));
3327            });
3328            row.push(Datum::String("b"));
3329            row.push(Datum::String("c"));
3330        });
3331        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
3332    }
3333
3334    #[mz_ore::test]
3335    fn miri_test_round_trip() {
3336        fn round_trip(datums: Vec<Datum>) {
3337            let row = Row::pack(datums.clone());
3338
3339            // When run under miri this catches undefined bytes written to data
3340            // eg by calling push_copy! on a type which contains undefined padding values
3341            println!("{:?}", row.data());
3342
3343            let datums2 = row.iter().collect::<Vec<_>>();
3344            let datums3 = row.unpack();
3345            assert_eq!(datums, datums2);
3346            assert_eq!(datums, datums3);
3347        }
3348
3349        round_trip(vec![]);
3350        round_trip(
3351            SqlScalarType::enumerate()
3352                .iter()
3353                .flat_map(|r#type| r#type.interesting_datums())
3354                .collect(),
3355        );
3356        round_trip(vec![
3357            Datum::Null,
3358            Datum::Null,
3359            Datum::False,
3360            Datum::True,
3361            Datum::Int16(-21),
3362            Datum::Int32(-42),
3363            Datum::Int64(-2_147_483_648 - 42),
3364            Datum::UInt8(0),
3365            Datum::UInt8(1),
3366            Datum::UInt16(0),
3367            Datum::UInt16(1),
3368            Datum::UInt16(1 << 8),
3369            Datum::UInt32(0),
3370            Datum::UInt32(1),
3371            Datum::UInt32(1 << 8),
3372            Datum::UInt32(1 << 16),
3373            Datum::UInt32(1 << 24),
3374            Datum::UInt64(0),
3375            Datum::UInt64(1),
3376            Datum::UInt64(1 << 8),
3377            Datum::UInt64(1 << 16),
3378            Datum::UInt64(1 << 24),
3379            Datum::UInt64(1 << 32),
3380            Datum::UInt64(1 << 40),
3381            Datum::UInt64(1 << 48),
3382            Datum::UInt64(1 << 56),
3383            Datum::Float32(OrderedFloat::from(-42.12)),
3384            Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
3385            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
3386            Datum::Timestamp(
3387                CheckedTimestamp::from_timestamplike(
3388                    NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
3389                        .unwrap()
3390                        .and_hms_opt(14, 32, 11)
3391                        .unwrap(),
3392                )
3393                .unwrap(),
3394            ),
3395            Datum::TimestampTz(
3396                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3397                    .unwrap(),
3398            ),
3399            Datum::Interval(Interval {
3400                months: 312,
3401                ..Default::default()
3402            }),
3403            Datum::Interval(Interval::new(0, 0, 1_012_312)),
3404            Datum::Bytes(&[]),
3405            Datum::Bytes(&[0, 2, 1, 255]),
3406            Datum::String(""),
3407            Datum::String("العَرَبِيَّة"),
3408        ]);
3409    }
3410
3411    #[mz_ore::test]
3412    fn test_array() {
3413        // Construct an array using `Row::push_array` and verify that it unpacks
3414        // correctly.
3415        const DIM: ArrayDimension = ArrayDimension {
3416            lower_bound: 2,
3417            length: 2,
3418        };
3419        let mut row = Row::default();
3420        let mut packer = row.packer();
3421        packer
3422            .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3423            .unwrap();
3424        let arr1 = row.unpack_first().unwrap_array();
3425        assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3426        assert_eq!(
3427            arr1.elements().into_iter().collect::<Vec<_>>(),
3428            vec![Datum::Int32(1), Datum::Int32(2)]
3429        );
3430
3431        // Pack a previously-constructed `Datum::Array` and verify that it
3432        // unpacks correctly.
3433        let row = Row::pack_slice(&[Datum::Array(arr1)]);
3434        let arr2 = row.unpack_first().unwrap_array();
3435        assert_eq!(arr1, arr2);
3436    }
3437
3438    #[mz_ore::test]
3439    fn test_multidimensional_array() {
3440        let datums = vec![
3441            Datum::Int32(1),
3442            Datum::Int32(2),
3443            Datum::Int32(3),
3444            Datum::Int32(4),
3445            Datum::Int32(5),
3446            Datum::Int32(6),
3447            Datum::Int32(7),
3448            Datum::Int32(8),
3449        ];
3450
3451        let mut row = Row::default();
3452        let mut packer = row.packer();
3453        packer
3454            .try_push_array(
3455                &[
3456                    ArrayDimension {
3457                        lower_bound: 1,
3458                        length: 1,
3459                    },
3460                    ArrayDimension {
3461                        lower_bound: 1,
3462                        length: 4,
3463                    },
3464                    ArrayDimension {
3465                        lower_bound: 1,
3466                        length: 2,
3467                    },
3468                ],
3469                &datums,
3470            )
3471            .unwrap();
3472        let array = row.unpack_first().unwrap_array();
3473        assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3474    }
3475
3476    #[mz_ore::test]
3477    fn test_array_max_dimensions() {
3478        let mut row = Row::default();
3479        let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3480
3481        // An array with one too many dimensions should be rejected.
3482        let res = row.packer().try_push_array(
3483            &vec![
3484                ArrayDimension {
3485                    lower_bound: 1,
3486                    length: 1
3487                };
3488                max_dims + 1
3489            ],
3490            vec![Datum::Int32(4)],
3491        );
3492        assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3493        assert!(row.data.is_empty());
3494
3495        // An array with exactly the maximum allowable dimensions should be
3496        // accepted.
3497        row.packer()
3498            .try_push_array(
3499                &vec![
3500                    ArrayDimension {
3501                        lower_bound: 1,
3502                        length: 1
3503                    };
3504                    max_dims
3505                ],
3506                vec![Datum::Int32(4)],
3507            )
3508            .unwrap();
3509    }
3510
3511    #[mz_ore::test]
3512    fn test_array_wrong_cardinality() {
3513        let mut row = Row::default();
3514        let res = row.packer().try_push_array(
3515            &[
3516                ArrayDimension {
3517                    lower_bound: 1,
3518                    length: 2,
3519                },
3520                ArrayDimension {
3521                    lower_bound: 1,
3522                    length: 3,
3523                },
3524            ],
3525            vec![Datum::Int32(1), Datum::Int32(2)],
3526        );
3527        assert_eq!(
3528            res,
3529            Err(InvalidArrayError::WrongCardinality {
3530                actual: 2,
3531                expected: 6,
3532            })
3533        );
3534        assert!(row.data.is_empty());
3535    }
3536
3537    #[mz_ore::test]
3538    fn test_nesting() {
3539        let mut row = Row::default();
3540        row.packer().push_dict_with(|row| {
3541            row.push(Datum::String("favourites"));
3542            row.push_list_with(|row| {
3543                row.push(Datum::String("ice cream"));
3544                row.push(Datum::String("oreos"));
3545                row.push(Datum::String("cheesecake"));
3546            });
3547            row.push(Datum::String("name"));
3548            row.push(Datum::String("bob"));
3549        });
3550
3551        let mut iter = row.unpack_first().unwrap_map().iter();
3552
3553        let (k, v) = iter.next().unwrap();
3554        assert_eq!(k, "favourites");
3555        assert_eq!(
3556            v.unwrap_list().iter().collect::<Vec<_>>(),
3557            vec![
3558                Datum::String("ice cream"),
3559                Datum::String("oreos"),
3560                Datum::String("cheesecake"),
3561            ]
3562        );
3563
3564        let (k, v) = iter.next().unwrap();
3565        assert_eq!(k, "name");
3566        assert_eq!(v, Datum::String("bob"));
3567    }
3568
3569    #[mz_ore::test]
3570    fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3571        let pack = |ok| {
3572            let mut row = Row::default();
3573            row.packer().push_dict_with(|row| {
3574                if ok {
3575                    row.push(Datum::String("key"));
3576                    row.push(Datum::Int32(42));
3577                    Ok(7)
3578                } else {
3579                    Err("fail")
3580                }
3581            })?;
3582            Ok(row)
3583        };
3584
3585        assert_eq!(pack(false), Err("fail"));
3586
3587        let row = pack(true)?;
3588        let mut dict = row.unpack_first().unwrap_map().iter();
3589        assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3590        assert_eq!(dict.next(), None);
3591
3592        Ok(())
3593    }
3594
3595    #[mz_ore::test]
3596    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
3597    fn test_datum_sizes() {
3598        let arena = RowArena::new();
3599
3600        // Test the claims about various datum sizes.
3601        let values_of_interest = vec![
3602            Datum::Null,
3603            Datum::False,
3604            Datum::Int16(0),
3605            Datum::Int32(0),
3606            Datum::Int64(0),
3607            Datum::UInt8(0),
3608            Datum::UInt8(1),
3609            Datum::UInt16(0),
3610            Datum::UInt16(1),
3611            Datum::UInt16(1 << 8),
3612            Datum::UInt32(0),
3613            Datum::UInt32(1),
3614            Datum::UInt32(1 << 8),
3615            Datum::UInt32(1 << 16),
3616            Datum::UInt32(1 << 24),
3617            Datum::UInt64(0),
3618            Datum::UInt64(1),
3619            Datum::UInt64(1 << 8),
3620            Datum::UInt64(1 << 16),
3621            Datum::UInt64(1 << 24),
3622            Datum::UInt64(1 << 32),
3623            Datum::UInt64(1 << 40),
3624            Datum::UInt64(1 << 48),
3625            Datum::UInt64(1 << 56),
3626            Datum::Float32(OrderedFloat(0.0)),
3627            Datum::Float64(OrderedFloat(0.0)),
3628            Datum::from(numeric::Numeric::from(0)),
3629            Datum::from(numeric::Numeric::from(1000)),
3630            Datum::from(numeric::Numeric::from(9999)),
3631            Datum::Date(
3632                NaiveDate::from_ymd_opt(1, 1, 1)
3633                    .unwrap()
3634                    .try_into()
3635                    .unwrap(),
3636            ),
3637            Datum::Timestamp(
3638                CheckedTimestamp::from_timestamplike(
3639                    DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3640                )
3641                .unwrap(),
3642            ),
3643            Datum::TimestampTz(
3644                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3645                    .unwrap(),
3646            ),
3647            Datum::Interval(Interval::default()),
3648            Datum::Bytes(&[]),
3649            Datum::String(""),
3650            Datum::JsonNull,
3651            Datum::Range(Range { inner: None }),
3652            arena.make_datum(|packer| {
3653                packer
3654                    .push_range(Range::new(Some((
3655                        RangeLowerBound::new(Datum::Int32(-1), true),
3656                        RangeUpperBound::new(Datum::Int32(1), true),
3657                    ))))
3658                    .unwrap();
3659            }),
3660        ];
3661        for value in values_of_interest {
3662            if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3663                panic!("Disparity in claimed size for {:?}", value);
3664            }
3665        }
3666    }
3667
3668    #[mz_ore::test]
3669    fn test_range_errors() {
3670        fn test_range_errors_inner<'a>(
3671            datums: Vec<Vec<Datum<'a>>>,
3672        ) -> Result<(), InvalidRangeError> {
3673            let mut row = Row::default();
3674            let row_len = row.byte_len();
3675            let mut packer = row.packer();
3676            let r = packer.push_range_with(
3677                RangeLowerBound {
3678                    inclusive: true,
3679                    bound: Some(|row: &mut RowPacker| {
3680                        for d in &datums[0] {
3681                            row.push(d);
3682                        }
3683                        Ok(())
3684                    }),
3685                },
3686                RangeUpperBound {
3687                    inclusive: true,
3688                    bound: Some(|row: &mut RowPacker| {
3689                        for d in &datums[1] {
3690                            row.push(d);
3691                        }
3692                        Ok(())
3693                    }),
3694                },
3695            );
3696
3697            assert_eq!(row_len, row.byte_len());
3698
3699            r
3700        }
3701
3702        for panicking_case in [
3703            vec![vec![Datum::Int32(1)], vec![]],
3704            vec![
3705                vec![Datum::Int32(1), Datum::Int32(2)],
3706                vec![Datum::Int32(3)],
3707            ],
3708            vec![
3709                vec![Datum::Int32(1)],
3710                vec![Datum::Int32(2), Datum::Int32(3)],
3711            ],
3712            vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3713            vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3714            vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3715            vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3716        ] {
3717            #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
3718            let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3719            assert_err!(result);
3720        }
3721
3722        let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3723        assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3724    }
3725
3726    /// Lists have a variable-length encoding for their lengths. We test each case here.
3727    #[mz_ore::test]
3728    #[cfg_attr(miri, ignore)] // slow
3729    fn test_list_encoding() {
3730        fn test_list_encoding_inner(len: usize) {
3731            let list_elem = |i: usize| {
3732                if i % 2 == 0 {
3733                    Datum::False
3734                } else {
3735                    Datum::True
3736                }
3737            };
3738            let mut row = Row::default();
3739            {
3740                // Push some stuff.
3741                let mut packer = row.packer();
3742                packer.push(Datum::String("start"));
3743                packer.push_list_with(|packer| {
3744                    for i in 0..len {
3745                        packer.push(list_elem(i));
3746                    }
3747                });
3748                packer.push(Datum::String("end"));
3749            }
3750            // Check that we read back exactly what we pushed.
3751            let mut row_it = row.iter();
3752            assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3753            match row_it.next().unwrap() {
3754                Datum::List(list) => {
3755                    let mut list_it = list.iter();
3756                    for i in 0..len {
3757                        assert_eq!(list_it.next().unwrap(), list_elem(i));
3758                    }
3759                    assert_none!(list_it.next());
3760                }
3761                _ => panic!("expected Datum::List"),
3762            }
3763            assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3764            assert_none!(row_it.next());
3765        }
3766
3767        test_list_encoding_inner(0);
3768        test_list_encoding_inner(1);
3769        test_list_encoding_inner(10);
3770        test_list_encoding_inner(TINY - 1); // tiny
3771        test_list_encoding_inner(TINY + 1); // short
3772        test_list_encoding_inner(SHORT + 1); // long
3773
3774        // The biggest one takes 40 s on my laptop, probably not worth it.
3775        //test_list_encoding_inner(LONG + 1); // huge
3776    }
3777
3778    /// Demonstrates that DatumList's Eq (bytewise) and Ord (datum-by-datum) are now consistent.
3779    /// A list containing -0.0 and one containing +0.0 have different byte representations
3780    /// (IEEE 754 distinguishes them), originally Eq says they are not equal. But after
3781    /// using the new Datum::cmp, Eq says they are equal, which matches what Ord
3782    /// compares via iter().cmp(other.iter()), and them as equal.
3783    #[mz_ore::test]
3784    fn test_datum_list_eq_ord_consistency() {
3785        // Build list containing +0.0
3786        let mut row_pos = Row::default();
3787        row_pos.packer().push_list_with(|p| {
3788            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3789        });
3790        let list_pos = row_pos.unpack_first().unwrap_list();
3791
3792        // Build list containing -0.0 (distinct bit pattern from +0.0)
3793        let mut row_neg = Row::default();
3794        row_neg.packer().push_list_with(|p| {
3795            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3796        });
3797        let list_neg = row_neg.unpack_first().unwrap_list();
3798
3799        // Eq is bytewise: different encodings => not equal
3800        // This was a bug in the past, so we test it.
3801        assert_eq!(
3802            list_pos, list_neg,
3803            "Eq should see different encodings as equal"
3804        );
3805
3806        // Ord is datum-by-datum: -0.0 and +0.0 compare equal as Datums
3807        assert_eq!(
3808            list_pos.cmp(&list_neg),
3809            Ordering::Equal,
3810            "Ord (datum-by-datum) should see -0.0 and +0.0 as equal"
3811        );
3812    }
3813
3814    /// Demonstrates that DatumMap's derived Eq (bytewise) can make maps with equal keys and
3815    /// values compare equal when values have different encodings (e.g. -0.0 vs +0.0).
3816    #[mz_ore::test]
3817    fn test_datum_map_eq_bytewise_consistency() {
3818        // Build map {"k": +0.0}
3819        let mut row_pos = Row::default();
3820        row_pos.packer().push_dict_with(|p| {
3821            p.push(Datum::String("k"));
3822            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3823        });
3824        let map_pos = row_pos.unpack_first().unwrap_map();
3825
3826        // Build map {"k": -0.0}
3827        let mut row_neg = Row::default();
3828        row_neg.packer().push_dict_with(|p| {
3829            p.push(Datum::String("k"));
3830            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3831        });
3832        let map_neg = row_neg.unpack_first().unwrap_map();
3833
3834        // Same keys and semantically equal values, but Eq (bytewise) says not equal
3835        assert_eq!(
3836            map_pos, map_neg,
3837            "DatumMap Eq is semantic; -0.0 and +0.0 have different encodings but are equal"
3838        );
3839        // Verify they have the same logical content
3840        let entries_pos: Vec<_> = map_pos.iter().collect();
3841        let entries_neg: Vec<_> = map_neg.iter().collect();
3842        assert_eq!(entries_pos.len(), entries_neg.len());
3843        for ((k1, v1), (k2, v2)) in entries_pos.iter().zip_eq(entries_neg.iter()) {
3844            assert_eq!(k1, k2);
3845            assert_eq!(
3846                v1, v2,
3847                "Datum-level comparison treats -0.0 and +0.0 as equal"
3848            );
3849        }
3850    }
3851
3852    /// Hash must agree with Eq: equal lists must have the same hash.
3853    #[mz_ore::test]
3854    fn test_datum_list_hash_consistency() {
3855        // Equal lists (including -0.0 vs +0.0) must hash the same
3856        let mut row_pos = Row::default();
3857        row_pos.packer().push_list_with(|p| {
3858            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3859        });
3860        let list_pos = row_pos.unpack_first().unwrap_list();
3861
3862        let mut row_neg = Row::default();
3863        row_neg.packer().push_list_with(|p| {
3864            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3865        });
3866        let list_neg = row_neg.unpack_first().unwrap_list();
3867
3868        assert_eq!(list_pos, list_neg);
3869        assert_eq!(
3870            hash(&list_pos),
3871            hash(&list_neg),
3872            "equal lists must have same hash"
3873        );
3874
3875        // Unequal lists should have different hashes (with asymptotic probability 1)
3876        let mut row_a = Row::default();
3877        row_a.packer().push_list_with(|p| {
3878            p.push(Datum::Int32(1));
3879            p.push(Datum::Int32(2));
3880        });
3881        let list_a = row_a.unpack_first().unwrap_list();
3882
3883        let mut row_b = Row::default();
3884        row_b.packer().push_list_with(|p| {
3885            p.push(Datum::Int32(1));
3886            p.push(Datum::Int32(3));
3887        });
3888        let list_b = row_b.unpack_first().unwrap_list();
3889
3890        assert_ne!(list_a, list_b);
3891        assert_ne!(
3892            hash(&list_a),
3893            hash(&list_b),
3894            "unequal lists must have different hashes"
3895        );
3896    }
3897
3898    /// Ord/PartialOrd for DatumList: less, equal, greater.
3899    #[mz_ore::test]
3900    fn test_datum_list_ordering() {
3901        let mut row_12 = Row::default();
3902        row_12.packer().push_list_with(|p| {
3903            p.push(Datum::Int32(1));
3904            p.push(Datum::Int32(2));
3905        });
3906        let list_12 = row_12.unpack_first().unwrap_list();
3907
3908        let mut row_13 = Row::default();
3909        row_13.packer().push_list_with(|p| {
3910            p.push(Datum::Int32(1));
3911            p.push(Datum::Int32(3));
3912        });
3913        let list_13 = row_13.unpack_first().unwrap_list();
3914
3915        let mut row_123 = Row::default();
3916        row_123.packer().push_list_with(|p| {
3917            p.push(Datum::Int32(1));
3918            p.push(Datum::Int32(2));
3919            p.push(Datum::Int32(3));
3920        });
3921        let list_123 = row_123.unpack_first().unwrap_list();
3922
3923        // [1, 2] < [1, 3] due to the second element being different
3924        assert_eq!(list_12.cmp(&list_13), Ordering::Less);
3925        assert_eq!(list_13.cmp(&list_12), Ordering::Greater);
3926        assert_eq!(list_12.cmp(&list_12), Ordering::Equal);
3927        // shorter prefix compares less
3928        assert_eq!(list_12.cmp(&list_123), Ordering::Less);
3929    }
3930
3931    /// Hash must agree with Eq: equal maps must have the same hash.
3932    #[mz_ore::test]
3933    fn test_datum_map_hash_consistency() {
3934        let mut row_pos = Row::default();
3935        row_pos.packer().push_dict_with(|p| {
3936            p.push(Datum::String("x"));
3937            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3938        });
3939        let map_pos = row_pos.unpack_first().unwrap_map();
3940
3941        let mut row_neg = Row::default();
3942        row_neg.packer().push_dict_with(|p| {
3943            p.push(Datum::String("x"));
3944            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3945        });
3946        let map_neg = row_neg.unpack_first().unwrap_map();
3947
3948        assert_eq!(map_pos, map_neg);
3949        assert_eq!(
3950            hash(&map_pos),
3951            hash(&map_neg),
3952            "equal maps must have same hash"
3953        );
3954
3955        let mut row_a = Row::default();
3956        row_a.packer().push_dict_with(|p| {
3957            p.push(Datum::String("a"));
3958            p.push(Datum::Int32(1));
3959        });
3960        let map_a = row_a.unpack_first().unwrap_map();
3961
3962        let mut row_b = Row::default();
3963        row_b.packer().push_dict_with(|p| {
3964            p.push(Datum::String("a"));
3965            p.push(Datum::Int32(2));
3966        });
3967        let map_b = row_b.unpack_first().unwrap_map();
3968
3969        assert_ne!(map_a, map_b);
3970        assert_ne!(
3971            hash(&map_a),
3972            hash(&map_b),
3973            "unequal maps must have different hashes"
3974        );
3975    }
3976
3977    /// Ord/PartialOrd for DatumMap: less, equal, greater (by key then value).
3978    #[mz_ore::test]
3979    fn test_datum_map_ordering() {
3980        let mut row_a1 = Row::default();
3981        row_a1.packer().push_dict_with(|p| {
3982            p.push(Datum::String("a"));
3983            p.push(Datum::Int32(1));
3984        });
3985        let map_a1 = row_a1.unpack_first().unwrap_map();
3986
3987        let mut row_a2 = Row::default();
3988        row_a2.packer().push_dict_with(|p| {
3989            p.push(Datum::String("a"));
3990            p.push(Datum::Int32(2));
3991        });
3992        let map_a2 = row_a2.unpack_first().unwrap_map();
3993
3994        let mut row_b1 = Row::default();
3995        row_b1.packer().push_dict_with(|p| {
3996            p.push(Datum::String("b"));
3997            p.push(Datum::Int32(1));
3998        });
3999        let map_b1 = row_b1.unpack_first().unwrap_map();
4000
4001        assert_eq!(map_a1.cmp(&map_a2), Ordering::Less);
4002        assert_eq!(map_a2.cmp(&map_a1), Ordering::Greater);
4003        assert_eq!(map_a1.cmp(&map_a1), Ordering::Equal);
4004        assert_eq!(map_a1.cmp(&map_b1), Ordering::Less); // "a" < "b"
4005    }
4006
4007    /// Datum puts Null last in the enum so that nulls sort last (PostgreSQL default).
4008    /// This ordering is used when comparing DatumList/DatumMap (e.g. jsonb_agg tiebreaker).
4009    #[mz_ore::test]
4010    fn test_datum_list_and_map_null_sorts_last() {
4011        // DatumList: [1] < [null] so non-null sorts before null
4012        let mut row_list_1 = Row::default();
4013        row_list_1
4014            .packer()
4015            .push_list_with(|p| p.push(Datum::Int32(1)));
4016        let list_1 = row_list_1.unpack_first().unwrap_list();
4017
4018        let mut row_list_null = Row::default();
4019        row_list_null
4020            .packer()
4021            .push_list_with(|p| p.push(Datum::Null));
4022        let list_null = row_list_null.unpack_first().unwrap_list();
4023
4024        assert_eq!(list_1.cmp(&list_null), Ordering::Less);
4025        assert_eq!(list_null.cmp(&list_1), Ordering::Greater);
4026
4027        // DatumMap: {"k": 1} < {"k": null} so non-null sorts before null (same as jsonb_agg)
4028        let mut row_map_1 = Row::default();
4029        row_map_1.packer().push_dict_with(|p| {
4030            p.push(Datum::String("k"));
4031            p.push(Datum::Int32(1));
4032        });
4033        let map_1 = row_map_1.unpack_first().unwrap_map();
4034
4035        let mut row_map_null = Row::default();
4036        row_map_null.packer().push_dict_with(|p| {
4037            p.push(Datum::String("k"));
4038            p.push(Datum::Null);
4039        });
4040        let map_null = row_map_null.unpack_first().unwrap_map();
4041
4042        assert_eq!(map_1.cmp(&map_null), Ordering::Less);
4043        assert_eq!(map_null.cmp(&map_1), Ordering::Greater);
4044    }
4045}