Skip to main content

mz_repr/
row.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::cell::{Cell, RefCell};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::hash::{Hash, Hasher};
16use std::mem::{size_of, transmute};
17use std::ops::Deref;
18use std::str;
19
20use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
21use compact_bytes::CompactBytes;
22use mz_ore::cast::{CastFrom, ReinterpretCast};
23use mz_ore::soft_assert_no_log;
24use mz_ore::vec::Vector;
25use mz_persist_types::Codec64;
26use num_enum::{IntoPrimitive, TryFromPrimitive};
27use ordered_float::OrderedFloat;
28use proptest::prelude::*;
29use proptest::strategy::{BoxedStrategy, Strategy};
30use serde::{Deserialize, Serialize};
31use uuid::Uuid;
32
33use crate::adt::array::{
34    Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
35};
36use crate::adt::date::Date;
37use crate::adt::interval::Interval;
38use crate::adt::mz_acl_item::{AclItem, MzAclItem};
39use crate::adt::numeric;
40use crate::adt::numeric::Numeric;
41use crate::adt::range::{
42    self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
43};
44use crate::adt::timestamp::CheckedTimestamp;
45use crate::scalar::{DatumKind, arb_datum};
46use crate::{Datum, RelationDesc, Timestamp};
47
48pub(crate) mod encode;
49pub mod iter;
50
51include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
52
53/// A packed representation for `Datum`s.
54///
55/// `Datum` is easy to work with but very space inefficient. A `Datum::Int32(42)`
56/// is laid out in memory like this:
57///
58///   tag: 3
59///   padding: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
60///   data: 0 0 0 42
61///   padding: 0 0 0 0 0 0 0 0 0 0 0 0
62///
63/// For a total of 32 bytes! The second set of padding is needed in case we were
64/// to write a 16-byte datum into this location. The first set of padding is
65/// needed to align that hypothetical decimal to a 16 bytes boundary.
66///
67/// A `Row` stores zero or more `Datum`s without any padding. We avoid the need
68/// for the first set of padding by only providing access to the `Datum`s via
69/// calls to `ptr::read_unaligned`, which on modern x86 is barely penalized. We
70/// avoid the need for the second set of padding by not providing mutable access
71/// to the `Datum`. Instead, `Row` is append-only.
72///
73/// A `Row` can be built from a collection of `Datum`s using `Row::pack`, but it
74/// is more efficient to use `Row::pack_slice` so that a right-sized allocation
75/// can be created. If that is not possible, consider using the row buffer
76/// pattern: allocate one row, pack into it, and then call [`Row::clone`] to
77/// receive a copy of that row, leaving behind the original allocation to pack
78/// future rows.
79///
80/// Creating a row via [`Row::pack_slice`]:
81///
82/// ```
83/// # use mz_repr::{Row, Datum};
84/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
85/// assert_eq!(row.unpack(), vec![Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)])
86/// ```
87///
88/// `Row`s can be unpacked by iterating over them:
89///
90/// ```
91/// # use mz_repr::{Row, Datum};
92/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
93/// assert_eq!(row.iter().nth(1).unwrap(), Datum::Int32(1));
94/// ```
95///
96/// If you want random access to the `Datum`s in a `Row`, use `Row::unpack` to create a `Vec<Datum>`
97/// ```
98/// # use mz_repr::{Row, Datum};
99/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
100/// let datums = row.unpack();
101/// assert_eq!(datums[1], Datum::Int32(1));
102/// ```
103///
104/// # Performance
105///
106/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
107/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
108/// avoids the allocations involved in `Row::new()`.
109#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
110pub struct Row {
111    data: CompactBytes,
112}
113
114impl Row {
115    const SIZE: usize = CompactBytes::MAX_INLINE;
116
117    /// A variant of `Row::from_proto` that allows for reuse of internal allocs
118    /// and validates the decoding against a provided [`RelationDesc`].
119    pub fn decode_from_proto(
120        &mut self,
121        proto: &ProtoRow,
122        desc: &RelationDesc,
123    ) -> Result<(), String> {
124        let mut packer = self.packer();
125        for (col_idx, _, _) in desc.iter_all() {
126            let d = match proto.datums.get(col_idx.to_raw()) {
127                Some(x) => x,
128                None => {
129                    packer.push(Datum::Null);
130                    continue;
131                }
132            };
133            packer.try_push_proto(d)?;
134        }
135
136        Ok(())
137    }
138
139    /// Allocate an empty `Row` with a pre-allocated capacity.
140    #[inline]
141    pub fn with_capacity(cap: usize) -> Self {
142        Self {
143            data: CompactBytes::with_capacity(cap),
144        }
145    }
146
147    /// Create an empty `Row`.
148    #[inline]
149    pub const fn empty() -> Self {
150        Self {
151            data: CompactBytes::empty(),
152        }
153    }
154
155    /// Creates a new row from supplied bytes.
156    ///
157    /// # Safety
158    ///
159    /// This method relies on `data` being an appropriate row encoding, and can
160    /// result in unsafety if this is not the case.
161    pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
162        Row {
163            data: CompactBytes::new(data),
164        }
165    }
166
167    /// Constructs a [`RowPacker`] that will pack datums into this row's
168    /// allocation.
169    ///
170    /// This method clears the existing contents of the row, but retains the
171    /// allocation.
172    pub fn packer(&mut self) -> RowPacker<'_> {
173        self.clear();
174        RowPacker { row: self }
175    }
176
177    /// Take some `Datum`s and pack them into a `Row`.
178    ///
179    /// This method builds a `Row` by repeatedly increasing the backing
180    /// allocation. If the contents of the iterator are known ahead of
181    /// time, consider [`Row::with_capacity`] to right-size the allocation
182    /// first, and then [`RowPacker::extend`] to populate it with `Datum`s.
183    /// This avoids the repeated allocation resizing and copying.
184    pub fn pack<'a, I, D>(iter: I) -> Row
185    where
186        I: IntoIterator<Item = D>,
187        D: Borrow<Datum<'a>>,
188    {
189        let mut row = Row::default();
190        row.packer().extend(iter);
191        row
192    }
193
194    /// Use `self` to pack `iter`, and then clone the result.
195    ///
196    /// This is a convenience method meant to reduce boilerplate around row
197    /// formation.
198    pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
199    where
200        I: IntoIterator<Item = D>,
201        D: Borrow<Datum<'a>>,
202    {
203        self.packer().extend(iter);
204        self.clone()
205    }
206
207    /// Like [`Row::pack`], but the provided iterator is allowed to produce an
208    /// error, in which case the packing operation is aborted and the error
209    /// returned.
210    pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
211    where
212        I: IntoIterator<Item = Result<D, E>>,
213        D: Borrow<Datum<'a>>,
214    {
215        let mut row = Row::default();
216        row.packer().try_extend(iter)?;
217        Ok(row)
218    }
219
220    /// Pack a slice of `Datum`s into a `Row`.
221    ///
222    /// This method has the advantage over `pack` that it can determine the required
223    /// allocation before packing the elements, ensuring only one allocation and no
224    /// redundant copies required.
225    pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
226        // Pre-allocate the needed number of bytes.
227        let mut row = Row::with_capacity(datums_size(slice.iter()));
228        row.packer().extend(slice.iter());
229        row
230    }
231
232    /// Returns the total amount of bytes used by this row.
233    pub fn byte_len(&self) -> usize {
234        let heap_size = if self.data.spilled() {
235            self.data.len()
236        } else {
237            0
238        };
239        let inline_size = std::mem::size_of::<Self>();
240        inline_size.saturating_add(heap_size)
241    }
242
243    /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself.
244    pub fn data_len(&self) -> usize {
245        self.data.len()
246    }
247
248    /// Returns the total capacity in bytes used by this row.
249    pub fn byte_capacity(&self) -> usize {
250        self.data.capacity()
251    }
252
253    /// Extracts a Row slice containing the entire [`Row`].
254    #[inline]
255    pub fn as_row_ref(&self) -> &RowRef {
256        // SAFETY: `Row` contains valid row data, by construction.
257        unsafe { RowRef::from_slice(self.data.as_slice()) }
258    }
259
260    /// Clear the contents of the [`Row`], leaving any allocation in place.
261    #[inline]
262    fn clear(&mut self) {
263        self.data.clear();
264    }
265}
266
267impl Borrow<RowRef> for Row {
268    #[inline]
269    fn borrow(&self) -> &RowRef {
270        self.as_row_ref()
271    }
272}
273
274impl AsRef<RowRef> for Row {
275    #[inline]
276    fn as_ref(&self) -> &RowRef {
277        self.as_row_ref()
278    }
279}
280
281impl Deref for Row {
282    type Target = RowRef;
283
284    #[inline]
285    fn deref(&self) -> &Self::Target {
286        self.as_row_ref()
287    }
288}
289
290// Nothing depends on Row being exactly 24, we just want to add visibility to the size.
291static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
292
293impl Clone for Row {
294    fn clone(&self) -> Self {
295        Row {
296            data: self.data.clone(),
297        }
298    }
299
300    fn clone_from(&mut self, source: &Self) {
301        self.data.clone_from(&source.data);
302    }
303}
304
305// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
306impl std::hash::Hash for Row {
307    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
308        self.as_row_ref().hash(state)
309    }
310}
311
312impl Arbitrary for Row {
313    type Parameters = prop::collection::SizeRange;
314    type Strategy = BoxedStrategy<Row>;
315
316    fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
317        prop::collection::vec(arb_datum(true), size)
318            .prop_map(|items| {
319                let mut row = Row::default();
320                let mut packer = row.packer();
321                for item in items.iter() {
322                    let datum: Datum<'_> = item.into();
323                    packer.push(datum);
324                }
325                row
326            })
327            .boxed()
328    }
329}
330
331impl PartialOrd for Row {
332    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
333        Some(self.cmp(other))
334    }
335}
336
337impl Ord for Row {
338    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
339        self.as_ref().cmp(other.as_ref())
340    }
341}
342
343#[allow(missing_debug_implementations)]
344mod columnation {
345    use columnation::{Columnation, Region};
346    use mz_ore::region::LgAllocRegion;
347
348    use crate::Row;
349
350    /// Region allocation for `Row` data.
351    ///
352    /// Content bytes are stored in stable contiguous memory locations,
353    /// and then a `Row` referencing them is falsified.
354    pub struct RowStack {
355        region: LgAllocRegion<u8>,
356    }
357
358    impl RowStack {
359        const LIMIT: usize = 2 << 20;
360    }
361
362    // Implement `Default` manually to specify a region allocation limit.
363    impl Default for RowStack {
364        fn default() -> Self {
365            Self {
366                // Limit the region size to 2MiB.
367                region: LgAllocRegion::with_limit(Self::LIMIT),
368            }
369        }
370    }
371
372    impl Columnation for Row {
373        type InnerRegion = RowStack;
374    }
375
376    impl Region for RowStack {
377        type Item = Row;
378        #[inline]
379        fn clear(&mut self) {
380            self.region.clear();
381        }
382        #[inline(always)]
383        unsafe fn copy(&mut self, item: &Row) -> Row {
384            if item.data.spilled() {
385                let bytes = self.region.copy_slice(&item.data[..]);
386                Row {
387                    data: compact_bytes::CompactBytes::from_raw_parts(
388                        bytes.as_mut_ptr(),
389                        item.data.len(),
390                        item.data.capacity(),
391                    ),
392                }
393            } else {
394                item.clone()
395            }
396        }
397
398        fn reserve_items<'a, I>(&mut self, items: I)
399        where
400            Self: 'a,
401            I: Iterator<Item = &'a Self::Item> + Clone,
402        {
403            let size = items
404                .filter(|row| row.data.spilled())
405                .map(|row| row.data.len())
406                .sum();
407            let size = std::cmp::min(size, Self::LIMIT);
408            self.region.reserve(size);
409        }
410
411        fn reserve_regions<'a, I>(&mut self, regions: I)
412        where
413            Self: 'a,
414            I: Iterator<Item = &'a Self> + Clone,
415        {
416            let size = regions.map(|r| r.region.len()).sum();
417            let size = std::cmp::min(size, Self::LIMIT);
418            self.region.reserve(size);
419        }
420
421        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
422            self.region.heap_size(callback)
423        }
424    }
425}
426
427mod columnar {
428    use columnar::common::PushIndexAs;
429    use columnar::{
430        AsBytes, Borrow, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
431    };
432    use mz_ore::cast::CastFrom;
433    use std::ops::Range;
434
435    use crate::{Row, RowRef};
436
437    #[derive(
438        Copy,
439        Clone,
440        Debug,
441        Default,
442        PartialEq,
443        serde::Serialize,
444        serde::Deserialize
445    )]
446    pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
447        /// Bounds container; provides indexed access to offsets.
448        bounds: BC,
449        /// Values container; provides slice access to bytes.
450        values: VC,
451    }
452
453    impl Columnar for Row {
454        #[inline(always)]
455        fn copy_from(&mut self, other: columnar::Ref<'_, Self>) {
456            self.clear();
457            self.data.extend_from_slice(other.data());
458        }
459        #[inline(always)]
460        fn into_owned(other: columnar::Ref<'_, Self>) -> Self {
461            other.to_owned()
462        }
463        type Container = Rows;
464        #[inline(always)]
465        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
466        where
467            Self: 'a,
468        {
469            thing
470        }
471    }
472
473    impl<BC: PushIndexAs<u64>> Borrow for Rows<BC, Vec<u8>> {
474        type Ref<'a> = &'a RowRef;
475        type Borrowed<'a>
476            = Rows<BC::Borrowed<'a>, &'a [u8]>
477        where
478            Self: 'a;
479        #[inline(always)]
480        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
481            Rows {
482                bounds: self.bounds.borrow(),
483                values: self.values.borrow(),
484            }
485        }
486        #[inline(always)]
487        fn reborrow<'c, 'a: 'c>(item: Self::Borrowed<'a>) -> Self::Borrowed<'c>
488        where
489            Self: 'a,
490        {
491            Rows {
492                bounds: BC::reborrow(item.bounds),
493                values: item.values,
494            }
495        }
496
497        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
498        where
499            Self: 'a,
500        {
501            item
502        }
503    }
504
505    impl<BC: PushIndexAs<u64>> Container for Rows<BC, Vec<u8>> {
506        fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
507            if !range.is_empty() {
508                // Imported bounds will be relative to this starting offset.
509                let values_len: u64 = self.values.len().try_into().expect("must fit");
510
511                // Push all bytes that we can, all at once.
512                let other_lower = if range.start == 0 {
513                    0
514                } else {
515                    other.bounds.index_as(range.start - 1)
516                };
517                let other_upper = other.bounds.index_as(range.end - 1);
518                self.values.extend_from_self(
519                    other.values,
520                    usize::try_from(other_lower).expect("must fit")
521                        ..usize::try_from(other_upper).expect("must fit"),
522                );
523
524                // Each bound needs to be shifted by `values_len - other_lower`.
525                if values_len == other_lower {
526                    self.bounds.extend_from_self(other.bounds, range);
527                } else {
528                    for index in range {
529                        let shifted = other.bounds.index_as(index) - other_lower + values_len;
530                        self.bounds.push(&shifted)
531                    }
532                }
533            }
534        }
535        fn reserve_for<'a, I>(&mut self, selves: I)
536        where
537            Self: 'a,
538            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
539        {
540            self.bounds.reserve_for(selves.clone().map(|r| r.bounds));
541            self.values.reserve_for(selves.map(|r| r.values));
542        }
543    }
544
545    impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
546        #[inline(always)]
547        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
548            columnar::chain(self.bounds.as_bytes(), self.values.as_bytes())
549        }
550    }
551    impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
552        #[inline(always)]
553        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
554            Self {
555                bounds: FromBytes::from_bytes(bytes),
556                values: FromBytes::from_bytes(bytes),
557            }
558        }
559    }
560
561    impl<BC: Len, VC> Len for Rows<BC, VC> {
562        #[inline(always)]
563        fn len(&self) -> usize {
564            self.bounds.len()
565        }
566    }
567
568    impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
569        type Ref = &'a RowRef;
570        #[inline(always)]
571        fn get(&self, index: usize) -> Self::Ref {
572            let lower = if index == 0 {
573                0
574            } else {
575                self.bounds.index_as(index - 1)
576            };
577            let upper = self.bounds.index_as(index);
578            let lower = usize::cast_from(lower);
579            let upper = usize::cast_from(upper);
580            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
581            // that correspond to the original rows.
582            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
583        }
584    }
585    impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
586        type Ref = &'a RowRef;
587        #[inline(always)]
588        fn get(&self, index: usize) -> Self::Ref {
589            let lower = if index == 0 {
590                0
591            } else {
592                self.bounds.index_as(index - 1)
593            };
594            let upper = self.bounds.index_as(index);
595            let lower = usize::cast_from(lower);
596            let upper = usize::cast_from(upper);
597            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
598            // that correspond to the original rows.
599            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
600        }
601    }
602
603    impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
604        #[inline(always)]
605        fn push(&mut self, item: &Row) {
606            self.values.extend_from_slice(item.data.as_slice());
607            self.bounds.push(u64::cast_from(self.values.len()));
608        }
609    }
610    impl<BC: for<'a> Push<&'a u64>> Push<&RowRef> for Rows<BC> {
611        #[inline(always)]
612        fn push(&mut self, item: &RowRef) {
613            self.values.extend_from_slice(item.data());
614            self.bounds.push(&u64::cast_from(self.values.len()));
615        }
616    }
617    impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
618        #[inline(always)]
619        fn clear(&mut self) {
620            self.bounds.clear();
621            self.values.clear();
622        }
623    }
624    impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
625        #[inline(always)]
626        fn heap_size(&self) -> (usize, usize) {
627            let (l0, c0) = self.bounds.heap_size();
628            let (l1, c1) = self.values.heap_size();
629            (l0 + l1, c0 + c1)
630        }
631    }
632}
633
634/// A contiguous slice of bytes that are row data.
635///
636/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
637#[derive(PartialEq, Eq, Hash)]
638#[repr(transparent)]
639pub struct RowRef([u8]);
640
641impl RowRef {
642    /// Create a [`RowRef`] from a slice of data.
643    ///
644    /// # Safety
645    ///
646    /// We do not check that the provided slice is valid [`Row`] data; the caller is required to
647    /// ensure this.
648    pub unsafe fn from_slice(row: &[u8]) -> &RowRef {
649        #[allow(clippy::as_conversions)]
650        let ptr = row as *const [u8] as *const RowRef;
651        // SAFETY: We know `ptr` is non-null and aligned because it came from a &[u8].
652        unsafe { &*ptr }
653    }
654
655    /// Unpack `self` into a `Vec<Datum>` for efficient random access.
656    pub fn unpack(&self) -> Vec<Datum<'_>> {
657        // It's usually cheaper to unpack twice to figure out the right length than it is to grow the vec as we go
658        let len = self.iter().count();
659        let mut vec = Vec::with_capacity(len);
660        vec.extend(self.iter());
661        vec
662    }
663
664    /// Return the first [`Datum`] in `self`
665    ///
666    /// Panics if the [`RowRef`] is empty.
667    pub fn unpack_first(&self) -> Datum<'_> {
668        self.iter().next().unwrap()
669    }
670
671    /// Iterate the [`Datum`] elements of the [`RowRef`].
672    pub fn iter(&self) -> DatumListIter<'_> {
673        DatumListIter { data: &self.0 }
674    }
675
676    /// Return the byte length of this [`RowRef`].
677    pub fn byte_len(&self) -> usize {
678        self.0.len()
679    }
680
681    /// For debugging only.
682    pub fn data(&self) -> &[u8] {
683        &self.0
684    }
685
686    /// True iff there is no data in this [`RowRef`].
687    pub fn is_empty(&self) -> bool {
688        self.0.is_empty()
689    }
690}
691
692impl ToOwned for RowRef {
693    type Owned = Row;
694
695    fn to_owned(&self) -> Self::Owned {
696        // SAFETY: RowRef has the invariant that the wrapped data must be a valid Row encoding.
697        unsafe { Row::from_bytes_unchecked(&self.0) }
698    }
699}
700
701impl<'a> IntoIterator for &'a RowRef {
702    type Item = Datum<'a>;
703    type IntoIter = DatumListIter<'a>;
704
705    fn into_iter(self) -> DatumListIter<'a> {
706        DatumListIter { data: &self.0 }
707    }
708}
709
710/// These implementations order first by length, and then by slice contents.
711/// This allows many comparisons to complete without dereferencing memory.
712/// Warning: These order by the u8 array representation, and NOT by Datum::cmp.
713impl PartialOrd for RowRef {
714    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
715        Some(self.cmp(other))
716    }
717}
718
719impl Ord for RowRef {
720    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
721        match self.0.len().cmp(&other.0.len()) {
722            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
723            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
724            std::cmp::Ordering::Equal => self.0.cmp(&other.0),
725        }
726    }
727}
728
729impl fmt::Debug for RowRef {
730    /// Debug representation using the internal datums
731    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
732        f.write_str("RowRef{")?;
733        f.debug_list().entries(self.into_iter()).finish()?;
734        f.write_str("}")
735    }
736}
737
738/// Packs datums into a [`Row`].
739///
740/// Creating a `RowPacker` via [`Row::packer`] starts a packing operation on the
741/// row. A packing operation always starts from scratch: the existing contents
742/// of the underlying row are cleared.
743///
744/// To complete a packing operation, drop the `RowPacker`.
745#[derive(Debug)]
746pub struct RowPacker<'a> {
747    row: &'a mut Row,
748}
749
750#[derive(Debug, Clone)]
751pub struct DatumListIter<'a> {
752    data: &'a [u8],
753}
754
755#[derive(Debug, Clone)]
756pub struct DatumDictIter<'a> {
757    data: &'a [u8],
758    prev_key: Option<&'a str>,
759}
760
761/// `RowArena` is used to hold on to temporary `Row`s for functions like `eval` that need to create complex `Datum`s but don't have a `Row` to put them in yet.
762#[derive(Debug)]
763pub struct RowArena {
764    // Semantically, this field would be better represented by a `Vec<Box<[u8]>>`,
765    // as once the arena takes ownership of a byte vector the vector is never
766    // modified. But `RowArena::push_bytes` takes ownership of a `Vec<u8>`, so
767    // storing that `Vec<u8>` directly avoids an allocation. The cost is
768    // additional memory use, as the vector may have spare capacity, but row
769    // arenas are short lived so this is the better tradeoff.
770    inner: RefCell<Vec<Vec<u8>>>,
771}
772
773// DatumList and DatumDict defined here rather than near Datum because we need private access to the unsafe data field
774
775/// A sequence of Datums
776#[derive(Clone, Copy)]
777pub struct DatumList<'a> {
778    /// Points at the serialized datums
779    data: &'a [u8],
780}
781
782impl<'a> Debug for DatumList<'a> {
783    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
784        f.debug_list().entries(self.iter()).finish()
785    }
786}
787
788impl<'a> PartialEq for DatumList<'a> {
789    #[inline(always)]
790    fn eq(&self, other: &DatumList<'a>) -> bool {
791        self.iter().eq(other.iter())
792    }
793}
794
795impl<'a> Eq for DatumList<'a> {}
796
797impl<'a> Hash for DatumList<'a> {
798    #[inline(always)]
799    fn hash<H: Hasher>(&self, state: &mut H) {
800        for d in self.iter() {
801            d.hash(state);
802        }
803    }
804}
805
806impl Ord for DatumList<'_> {
807    #[inline(always)]
808    fn cmp(&self, other: &DatumList) -> Ordering {
809        self.iter().cmp(other.iter())
810    }
811}
812
813impl PartialOrd for DatumList<'_> {
814    #[inline(always)]
815    fn partial_cmp(&self, other: &DatumList) -> Option<Ordering> {
816        Some(self.cmp(other))
817    }
818}
819
820/// A mapping from string keys to Datums
821#[derive(Clone, Copy)]
822pub struct DatumMap<'a> {
823    /// Points at the serialized datums, which should be sorted in key order
824    data: &'a [u8],
825}
826
827impl<'a> PartialEq for DatumMap<'a> {
828    #[inline(always)]
829    fn eq(&self, other: &DatumMap<'a>) -> bool {
830        self.iter().eq(other.iter())
831    }
832}
833
834impl<'a> Eq for DatumMap<'a> {}
835
836impl<'a> Hash for DatumMap<'a> {
837    #[inline(always)]
838    fn hash<H: Hasher>(&self, state: &mut H) {
839        for (k, v) in self.iter() {
840            k.hash(state);
841            v.hash(state);
842        }
843    }
844}
845
846impl<'a> Ord for DatumMap<'a> {
847    #[inline(always)]
848    fn cmp(&self, other: &DatumMap<'a>) -> Ordering {
849        self.iter().cmp(other.iter())
850    }
851}
852
853impl<'a> PartialOrd for DatumMap<'a> {
854    #[inline(always)]
855    fn partial_cmp(&self, other: &DatumMap<'a>) -> Option<Ordering> {
856        Some(self.cmp(other))
857    }
858}
859
860/// Represents a single `Datum`, appropriate to be nested inside other
861/// `Datum`s.
862#[derive(Clone, Copy, Eq, PartialEq, Hash)]
863pub struct DatumNested<'a> {
864    val: &'a [u8],
865}
866
867impl<'a> std::fmt::Display for DatumNested<'a> {
868    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
869        std::fmt::Display::fmt(&self.datum(), f)
870    }
871}
872
873impl<'a> std::fmt::Debug for DatumNested<'a> {
874    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
875        f.debug_struct("DatumNested")
876            .field("val", &self.datum())
877            .finish()
878    }
879}
880
881impl<'a> DatumNested<'a> {
882    // Figure out which bytes `read_datum` returns (e.g. including the tag),
883    // and then store a reference to those bytes, so we can "replay" this same
884    // call later on without storing the datum itself.
885    pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
886        let prev = *data;
887        let _ = unsafe { read_datum(data) };
888        DatumNested {
889            val: &prev[..(prev.len() - data.len())],
890        }
891    }
892
893    /// Returns the datum `self` contains.
894    pub fn datum(&self) -> Datum<'a> {
895        let mut temp = self.val;
896        unsafe { read_datum(&mut temp) }
897    }
898}
899
900impl<'a> Ord for DatumNested<'a> {
901    fn cmp(&self, other: &Self) -> Ordering {
902        self.datum().cmp(&other.datum())
903    }
904}
905
906impl<'a> PartialOrd for DatumNested<'a> {
907    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
908        Some(self.cmp(other))
909    }
910}
911
912// Prefer adding new tags to the end of the enum. Certain behavior, like row ordering and EXPLAIN
913// PHYSICAL PLAN, rely on the ordering of this enum. Neither of these are breaking changes, but
914// it's annoying when they change.
915#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
916#[repr(u8)]
917enum Tag {
918    Null,
919    False,
920    True,
921    Int16,
922    Int32,
923    Int64,
924    UInt8,
925    UInt32,
926    Float32,
927    Float64,
928    Date,
929    Time,
930    Timestamp,
931    TimestampTz,
932    Interval,
933    BytesTiny,
934    BytesShort,
935    BytesLong,
936    BytesHuge,
937    StringTiny,
938    StringShort,
939    StringLong,
940    StringHuge,
941    Uuid,
942    Array,
943    ListTiny,
944    ListShort,
945    ListLong,
946    ListHuge,
947    Dict,
948    JsonNull,
949    Dummy,
950    Numeric,
951    UInt16,
952    UInt64,
953    MzTimestamp,
954    Range,
955    MzAclItem,
956    AclItem,
957    // Everything except leap seconds and times beyond the range of
958    // i64 nanoseconds. (Note that Materialize does not support leap
959    // seconds, but this module does).
960    CheapTimestamp,
961    // Everything except leap seconds and times beyond the range of
962    // i64 nanoseconds. (Note that Materialize does not support leap
963    // seconds, but this module does).
964    CheapTimestampTz,
965    // The next several tags are for variable-length signed integer encoding.
966    // The basic idea is that `NonNegativeIntN_K` is used to encode a datum of type
967    // IntN whose actual value is positive or zero and fits in K bits, and similarly for
968    // NegativeIntN_K with negative values.
969    //
970    // The order of these tags matters, because we want to be able to choose the
971    // tag for a given datum quickly, with arithmetic, rather than slowly, with a
972    // stack of `if` statements.
973    //
974    // Separate tags for non-negative and negative numbers are used to avoid having to
975    // waste one bit in the actual data space to encode the sign.
976    NonNegativeInt16_0, // i.e., 0
977    NonNegativeInt16_8,
978    NonNegativeInt16_16,
979
980    NonNegativeInt32_0,
981    NonNegativeInt32_8,
982    NonNegativeInt32_16,
983    NonNegativeInt32_24,
984    NonNegativeInt32_32,
985
986    NonNegativeInt64_0,
987    NonNegativeInt64_8,
988    NonNegativeInt64_16,
989    NonNegativeInt64_24,
990    NonNegativeInt64_32,
991    NonNegativeInt64_40,
992    NonNegativeInt64_48,
993    NonNegativeInt64_56,
994    NonNegativeInt64_64,
995
996    NegativeInt16_0, // i.e., -1
997    NegativeInt16_8,
998    NegativeInt16_16,
999
1000    NegativeInt32_0,
1001    NegativeInt32_8,
1002    NegativeInt32_16,
1003    NegativeInt32_24,
1004    NegativeInt32_32,
1005
1006    NegativeInt64_0,
1007    NegativeInt64_8,
1008    NegativeInt64_16,
1009    NegativeInt64_24,
1010    NegativeInt64_32,
1011    NegativeInt64_40,
1012    NegativeInt64_48,
1013    NegativeInt64_56,
1014    NegativeInt64_64,
1015
1016    // These are like the ones above, but for unsigned types. The
1017    // situation is slightly simpler as we don't have negatives.
1018    UInt8_0, // i.e., 0
1019    UInt8_8,
1020
1021    UInt16_0,
1022    UInt16_8,
1023    UInt16_16,
1024
1025    UInt32_0,
1026    UInt32_8,
1027    UInt32_16,
1028    UInt32_24,
1029    UInt32_32,
1030
1031    UInt64_0,
1032    UInt64_8,
1033    UInt64_16,
1034    UInt64_24,
1035    UInt64_32,
1036    UInt64_40,
1037    UInt64_48,
1038    UInt64_56,
1039    UInt64_64,
1040}
1041
1042impl Tag {
1043    fn actual_int_length(self) -> Option<usize> {
1044        use Tag::*;
1045        let val = match self {
1046            NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
1047            | UInt32_0 | UInt64_0 => 0,
1048            NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
1049            | UInt32_8 | UInt64_8 => 1,
1050            NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
1051            | UInt32_16 | UInt64_16 => 2,
1052            NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
1053            NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
1054            NonNegativeInt64_40 | UInt64_40 => 5,
1055            NonNegativeInt64_48 | UInt64_48 => 6,
1056            NonNegativeInt64_56 | UInt64_56 => 7,
1057            NonNegativeInt64_64 | UInt64_64 => 8,
1058            NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
1059            NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
1060            NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
1061            NegativeInt32_24 | NegativeInt64_24 => 3,
1062            NegativeInt32_32 | NegativeInt64_32 => 4,
1063            NegativeInt64_40 => 5,
1064            NegativeInt64_48 => 6,
1065            NegativeInt64_56 => 7,
1066            NegativeInt64_64 => 8,
1067
1068            _ => return None,
1069        };
1070        Some(val)
1071    }
1072}
1073
1074// --------------------------------------------------------------------------------
1075// reading data
1076
1077/// Read a byte slice starting at byte `offset`.
1078///
1079/// Updates `offset` to point to the first byte after the end of the read region.
1080fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
1081    let len = u64::from_le_bytes(read_byte_array(data));
1082    let len = usize::cast_from(len);
1083    let (bytes, next) = data.split_at(len);
1084    *data = next;
1085    bytes
1086}
1087
1088/// Read a data whose length is encoded in the row before its contents.
1089///
1090/// Updates `offset` to point to the first byte after the end of the read region.
1091///
1092/// # Safety
1093///
1094/// This function is safe if the datum's length and contents were previously written by `push_lengthed_bytes`,
1095/// and it was only written with a `String` tag if it was indeed UTF-8.
1096unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
1097    let len = match tag {
1098        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
1099        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1100            usize::from(u16::from_le_bytes(read_byte_array(data)))
1101        }
1102        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1103            usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
1104        }
1105        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1106            usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
1107        }
1108        _ => unreachable!(),
1109    };
1110    let (bytes, next) = data.split_at(len);
1111    *data = next;
1112    match tag {
1113        Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
1114        Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
1115            Datum::String(str::from_utf8_unchecked(bytes))
1116        }
1117        Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
1118            Datum::List(DatumList { data: bytes })
1119        }
1120        _ => unreachable!(),
1121    }
1122}
1123
1124fn read_byte(data: &mut &[u8]) -> u8 {
1125    let byte = data[0];
1126    *data = &data[1..];
1127    byte
1128}
1129
1130/// Read `length` bytes from `data` at `offset`, updating the
1131/// latter. Extend the resulting buffer to an array of `N` bytes by
1132/// inserting `FILL` in the k most significant bytes, where k = N - length.
1133///
1134/// SAFETY:
1135///   * length <= N
1136///   * offset + length <= data.len()
1137fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1138    data: &mut &[u8],
1139    length: usize,
1140) -> [u8; N] {
1141    let mut raw = [FILL; N];
1142    let (prev, next) = data.split_at(length);
1143    (raw[..prev.len()]).copy_from_slice(prev);
1144    *data = next;
1145    raw
1146}
1147/// Read `length` bytes from `data` at `offset`, updating the
1148/// latter. Extend the resulting buffer to a negative `N`-byte
1149/// twos complement integer by filling the remaining bits with 1.
1150///
1151/// SAFETY:
1152///   * length <= N
1153///   * offset + length <= data.len()
1154fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1155    read_byte_array_sign_extending::<N, 255>(data, length)
1156}
1157
1158/// Read `length` bytes from `data` at `offset`, updating the
1159/// latter. Extend the resulting buffer to a positive or zero `N`-byte
1160/// twos complement integer by filling the remaining bits with 0.
1161///
1162/// SAFETY:
1163///   * length <= N
1164///   * offset + length <= data.len()
1165fn read_byte_array_extending_nonnegative<const N: usize>(
1166    data: &mut &[u8],
1167    length: usize,
1168) -> [u8; N] {
1169    read_byte_array_sign_extending::<N, 0>(data, length)
1170}
1171
1172pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1173    let (prev, next) = data.split_first_chunk().unwrap();
1174    *data = next;
1175    *prev
1176}
1177
1178pub(super) fn read_date(data: &mut &[u8]) -> Date {
1179    let days = i32::from_le_bytes(read_byte_array(data));
1180    Date::from_pg_epoch(days).expect("unexpected date")
1181}
1182
1183pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1184    let year = i32::from_le_bytes(read_byte_array(data));
1185    let ordinal = u32::from_le_bytes(read_byte_array(data));
1186    NaiveDate::from_yo_opt(year, ordinal).unwrap()
1187}
1188
1189pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1190    let secs = u32::from_le_bytes(read_byte_array(data));
1191    let nanos = u32::from_le_bytes(read_byte_array(data));
1192    NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1193}
1194
1195/// Read a datum starting at byte `offset`.
1196///
1197/// Updates `offset` to point to the first byte after the end of the read region.
1198///
1199/// # Safety
1200///
1201/// This function is safe if a `Datum` was previously written at this offset by `push_datum`.
1202/// Otherwise it could return invalid values, which is Undefined Behavior.
1203pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1204    let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1205    match tag {
1206        Tag::Null => Datum::Null,
1207        Tag::False => Datum::False,
1208        Tag::True => Datum::True,
1209        Tag::UInt8_0 | Tag::UInt8_8 => {
1210            let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1211                data,
1212                tag.actual_int_length()
1213                    .expect("returns a value for variable-length-encoded integer tags"),
1214            ));
1215            Datum::UInt8(i)
1216        }
1217        Tag::Int16 => {
1218            let i = i16::from_le_bytes(read_byte_array(data));
1219            Datum::Int16(i)
1220        }
1221        Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1222            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1223            // and `data` is big enough because it was encoded validly. These assumptions
1224            // are checked in debug asserts.
1225            let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1226                data,
1227                tag.actual_int_length()
1228                    .expect("returns a value for variable-length-encoded integer tags"),
1229            ));
1230            Datum::Int16(i)
1231        }
1232        Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1233            let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1234                data,
1235                tag.actual_int_length()
1236                    .expect("returns a value for variable-length-encoded integer tags"),
1237            ));
1238            Datum::UInt16(i)
1239        }
1240        Tag::Int32 => {
1241            let i = i32::from_le_bytes(read_byte_array(data));
1242            Datum::Int32(i)
1243        }
1244        Tag::NonNegativeInt32_0
1245        | Tag::NonNegativeInt32_32
1246        | Tag::NonNegativeInt32_8
1247        | Tag::NonNegativeInt32_16
1248        | Tag::NonNegativeInt32_24 => {
1249            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1250            // and `data` is big enough because it was encoded validly. These assumptions
1251            // are checked in debug asserts.
1252            let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1253                data,
1254                tag.actual_int_length()
1255                    .expect("returns a value for variable-length-encoded integer tags"),
1256            ));
1257            Datum::Int32(i)
1258        }
1259        Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1260            let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1261                data,
1262                tag.actual_int_length()
1263                    .expect("returns a value for variable-length-encoded integer tags"),
1264            ));
1265            Datum::UInt32(i)
1266        }
1267        Tag::Int64 => {
1268            let i = i64::from_le_bytes(read_byte_array(data));
1269            Datum::Int64(i)
1270        }
1271        Tag::NonNegativeInt64_0
1272        | Tag::NonNegativeInt64_64
1273        | Tag::NonNegativeInt64_8
1274        | Tag::NonNegativeInt64_16
1275        | Tag::NonNegativeInt64_24
1276        | Tag::NonNegativeInt64_32
1277        | Tag::NonNegativeInt64_40
1278        | Tag::NonNegativeInt64_48
1279        | Tag::NonNegativeInt64_56 => {
1280            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1281            // and `data` is big enough because it was encoded validly. These assumptions
1282            // are checked in debug asserts.
1283
1284            let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1285                data,
1286                tag.actual_int_length()
1287                    .expect("returns a value for variable-length-encoded integer tags"),
1288            ));
1289            Datum::Int64(i)
1290        }
1291        Tag::UInt64_0
1292        | Tag::UInt64_8
1293        | Tag::UInt64_16
1294        | Tag::UInt64_24
1295        | Tag::UInt64_32
1296        | Tag::UInt64_40
1297        | Tag::UInt64_48
1298        | Tag::UInt64_56
1299        | Tag::UInt64_64 => {
1300            let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1301                data,
1302                tag.actual_int_length()
1303                    .expect("returns a value for variable-length-encoded integer tags"),
1304            ));
1305            Datum::UInt64(i)
1306        }
1307        Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1308            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1309            // and `data` is big enough because it was encoded validly. These assumptions
1310            // are checked in debug asserts.
1311            let i = i16::from_le_bytes(read_byte_array_extending_negative(
1312                data,
1313                tag.actual_int_length()
1314                    .expect("returns a value for variable-length-encoded integer tags"),
1315            ));
1316            Datum::Int16(i)
1317        }
1318        Tag::NegativeInt32_0
1319        | Tag::NegativeInt32_32
1320        | Tag::NegativeInt32_8
1321        | Tag::NegativeInt32_16
1322        | Tag::NegativeInt32_24 => {
1323            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1324            // and `data` is big enough because it was encoded validly. These assumptions
1325            // are checked in debug asserts.
1326            let i = i32::from_le_bytes(read_byte_array_extending_negative(
1327                data,
1328                tag.actual_int_length()
1329                    .expect("returns a value for variable-length-encoded integer tags"),
1330            ));
1331            Datum::Int32(i)
1332        }
1333        Tag::NegativeInt64_0
1334        | Tag::NegativeInt64_64
1335        | Tag::NegativeInt64_8
1336        | Tag::NegativeInt64_16
1337        | Tag::NegativeInt64_24
1338        | Tag::NegativeInt64_32
1339        | Tag::NegativeInt64_40
1340        | Tag::NegativeInt64_48
1341        | Tag::NegativeInt64_56 => {
1342            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1343            // and `data` is big enough because the row was encoded validly. These assumptions
1344            // are checked in debug asserts.
1345            let i = i64::from_le_bytes(read_byte_array_extending_negative(
1346                data,
1347                tag.actual_int_length()
1348                    .expect("returns a value for variable-length-encoded integer tags"),
1349            ));
1350            Datum::Int64(i)
1351        }
1352
1353        Tag::UInt8 => {
1354            let i = u8::from_le_bytes(read_byte_array(data));
1355            Datum::UInt8(i)
1356        }
1357        Tag::UInt16 => {
1358            let i = u16::from_le_bytes(read_byte_array(data));
1359            Datum::UInt16(i)
1360        }
1361        Tag::UInt32 => {
1362            let i = u32::from_le_bytes(read_byte_array(data));
1363            Datum::UInt32(i)
1364        }
1365        Tag::UInt64 => {
1366            let i = u64::from_le_bytes(read_byte_array(data));
1367            Datum::UInt64(i)
1368        }
1369        Tag::Float32 => {
1370            let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1371            Datum::Float32(OrderedFloat::from(f))
1372        }
1373        Tag::Float64 => {
1374            let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1375            Datum::Float64(OrderedFloat::from(f))
1376        }
1377        Tag::Date => Datum::Date(read_date(data)),
1378        Tag::Time => Datum::Time(read_time(data)),
1379        Tag::CheapTimestamp => {
1380            let ts = i64::from_le_bytes(read_byte_array(data));
1381            let secs = ts.div_euclid(1_000_000_000);
1382            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1383            let ndt = DateTime::from_timestamp(secs, nsecs)
1384                .expect("We only write round-trippable timestamps")
1385                .naive_utc();
1386            Datum::Timestamp(
1387                CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1388            )
1389        }
1390        Tag::CheapTimestampTz => {
1391            let ts = i64::from_le_bytes(read_byte_array(data));
1392            let secs = ts.div_euclid(1_000_000_000);
1393            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1394            let dt = DateTime::from_timestamp(secs, nsecs)
1395                .expect("We only write round-trippable timestamps");
1396            Datum::TimestampTz(
1397                CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1398            )
1399        }
1400        Tag::Timestamp => {
1401            let date = read_naive_date(data);
1402            let time = read_time(data);
1403            Datum::Timestamp(
1404                CheckedTimestamp::from_timestamplike(date.and_time(time))
1405                    .expect("unexpected timestamp"),
1406            )
1407        }
1408        Tag::TimestampTz => {
1409            let date = read_naive_date(data);
1410            let time = read_time(data);
1411            Datum::TimestampTz(
1412                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1413                    date.and_time(time),
1414                    Utc,
1415                ))
1416                .expect("unexpected timestamptz"),
1417            )
1418        }
1419        Tag::Interval => {
1420            let months = i32::from_le_bytes(read_byte_array(data));
1421            let days = i32::from_le_bytes(read_byte_array(data));
1422            let micros = i64::from_le_bytes(read_byte_array(data));
1423            Datum::Interval(Interval {
1424                months,
1425                days,
1426                micros,
1427            })
1428        }
1429        Tag::BytesTiny
1430        | Tag::BytesShort
1431        | Tag::BytesLong
1432        | Tag::BytesHuge
1433        | Tag::StringTiny
1434        | Tag::StringShort
1435        | Tag::StringLong
1436        | Tag::StringHuge
1437        | Tag::ListTiny
1438        | Tag::ListShort
1439        | Tag::ListLong
1440        | Tag::ListHuge => read_lengthed_datum(data, tag),
1441        Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1442        Tag::Array => {
1443            // See the comment in `Row::push_array` for details on the encoding
1444            // of arrays.
1445            let ndims = read_byte(data);
1446            let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1447            let (dims, next) = data.split_at(dims_size);
1448            *data = next;
1449            let bytes = read_untagged_bytes(data);
1450            Datum::Array(Array {
1451                dims: ArrayDimensions { data: dims },
1452                elements: DatumList { data: bytes },
1453            })
1454        }
1455        Tag::Dict => {
1456            let bytes = read_untagged_bytes(data);
1457            Datum::Map(DatumMap { data: bytes })
1458        }
1459        Tag::JsonNull => Datum::JsonNull,
1460        Tag::Dummy => Datum::Dummy,
1461        Tag::Numeric => {
1462            let digits = read_byte(data).into();
1463            let exponent = i8::reinterpret_cast(read_byte(data));
1464            let bits = read_byte(data);
1465
1466            let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1467            let lsu_u8_len = lsu_u16_len * 2;
1468            let (lsu_u8, next) = data.split_at(lsu_u8_len);
1469            *data = next;
1470
1471            // TODO: if we refactor the decimal library to accept the owned
1472            // array as a parameter to `from_raw_parts` below, we could likely
1473            // avoid a copy because it is exactly the value we want
1474            let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1475            for (i, c) in lsu_u8.chunks(2).enumerate() {
1476                lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1477            }
1478
1479            let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1480            Datum::from(d)
1481        }
1482        Tag::MzTimestamp => {
1483            let t = Timestamp::decode(read_byte_array(data));
1484            Datum::MzTimestamp(t)
1485        }
1486        Tag::Range => {
1487            // See notes on `push_range_with` for details about encoding.
1488            let flag_byte = read_byte(data);
1489            let flags = range::InternalFlags::from_bits(flag_byte)
1490                .expect("range flags must be encoded validly");
1491
1492            if flags.contains(range::InternalFlags::EMPTY) {
1493                assert!(
1494                    flags == range::InternalFlags::EMPTY,
1495                    "empty ranges contain only RANGE_EMPTY flag"
1496                );
1497
1498                return Datum::Range(Range { inner: None });
1499            }
1500
1501            let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1502                None
1503            } else {
1504                Some(DatumNested::extract(data))
1505            };
1506
1507            let lower = RangeBound {
1508                inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1509                bound: lower_bound,
1510            };
1511
1512            let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1513                None
1514            } else {
1515                Some(DatumNested::extract(data))
1516            };
1517
1518            let upper = RangeBound {
1519                inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1520                bound: upper_bound,
1521            };
1522
1523            Datum::Range(Range {
1524                inner: Some(RangeInner { lower, upper }),
1525            })
1526        }
1527        Tag::MzAclItem => {
1528            const N: usize = MzAclItem::binary_size();
1529            let mz_acl_item =
1530                MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1531            Datum::MzAclItem(mz_acl_item)
1532        }
1533        Tag::AclItem => {
1534            const N: usize = AclItem::binary_size();
1535            let acl_item =
1536                AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1537            Datum::AclItem(acl_item)
1538        }
1539    }
1540}
1541
1542// --------------------------------------------------------------------------------
1543// writing data
1544
1545fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1546where
1547    D: Vector<u8>,
1548{
1549    let len = u64::cast_from(bytes.len());
1550    data.extend_from_slice(&len.to_le_bytes());
1551    data.extend_from_slice(bytes);
1552}
1553
1554fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1555where
1556    D: Vector<u8>,
1557{
1558    match tag {
1559        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1560            let len = bytes.len().to_le_bytes();
1561            data.push(len[0]);
1562        }
1563        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1564            let len = bytes.len().to_le_bytes();
1565            data.extend_from_slice(&len[0..2]);
1566        }
1567        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1568            let len = bytes.len().to_le_bytes();
1569            data.extend_from_slice(&len[0..4]);
1570        }
1571        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1572            let len = bytes.len().to_le_bytes();
1573            data.extend_from_slice(&len);
1574        }
1575        _ => unreachable!(),
1576    }
1577    data.extend_from_slice(bytes);
1578}
1579
1580pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1581    i32::to_le_bytes(date.pg_epoch_days())
1582}
1583
1584fn push_date<D>(data: &mut D, date: Date)
1585where
1586    D: Vector<u8>,
1587{
1588    data.extend_from_slice(&date_to_array(date));
1589}
1590
1591pub(super) fn naive_date_to_arrays(
1592    date: NaiveDate,
1593) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1594    (
1595        i32::to_le_bytes(date.year()),
1596        u32::to_le_bytes(date.ordinal()),
1597    )
1598}
1599
1600fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1601where
1602    D: Vector<u8>,
1603{
1604    let (ds1, ds2) = naive_date_to_arrays(date);
1605    data.extend_from_slice(&ds1);
1606    data.extend_from_slice(&ds2);
1607}
1608
1609pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1610    (
1611        u32::to_le_bytes(time.num_seconds_from_midnight()),
1612        u32::to_le_bytes(time.nanosecond()),
1613    )
1614}
1615
1616fn push_time<D>(data: &mut D, time: NaiveTime)
1617where
1618    D: Vector<u8>,
1619{
1620    let (ts1, ts2) = time_to_arrays(time);
1621    data.extend_from_slice(&ts1);
1622    data.extend_from_slice(&ts2);
1623}
1624
1625/// Returns an i64 representing a `NaiveDateTime`, if
1626/// said i64 can be round-tripped back to a `NaiveDateTime`.
1627///
1628/// The only exotic NDTs for which this can't happen are those that
1629/// are hundreds of years in the future or past, or those that
1630/// represent a leap second. (Note that Materialize does not support
1631/// leap seconds, but this module does).
1632// This function is inspired by `NaiveDateTime::timestamp_nanos`,
1633// with extra checking.
1634fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1635    let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1636    if subsec_nanos >= 1_000_000_000 {
1637        return None;
1638    }
1639    let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1640    as_ns.checked_add(i64::from(subsec_nanos))
1641}
1642
1643// This function is extremely hot, so
1644// we just use `as` to avoid the overhead of
1645// `try_into` followed by `unwrap`.
1646// `leading_ones` and `leading_zeros`
1647// can never return values greater than 64, so the conversion is safe.
1648#[inline(always)]
1649#[allow(clippy::as_conversions)]
1650fn min_bytes_signed<T>(i: T) -> u8
1651where
1652    T: Into<i64>,
1653{
1654    let i: i64 = i.into();
1655
1656    // To fit in n bytes, we require that
1657    // everything but the leading sign bits fits in n*8
1658    // bits.
1659    let n_sign_bits = if i.is_negative() {
1660        i.leading_ones() as u8
1661    } else {
1662        i.leading_zeros() as u8
1663    };
1664
1665    (64 - n_sign_bits + 7) / 8
1666}
1667
1668// In principle we could just use `min_bytes_signed`, rather than
1669// having a separate function here, as long as we made that one take
1670// `T: Into<i128>` instead of 64. But LLVM doesn't seem smart enough
1671// to realize that that function is the same as the current version,
1672// and generates worse code.
1673//
1674// Justification for `as` is the same as in `min_bytes_signed`.
1675#[inline(always)]
1676#[allow(clippy::as_conversions)]
1677fn min_bytes_unsigned<T>(i: T) -> u8
1678where
1679    T: Into<u64>,
1680{
1681    let i: u64 = i.into();
1682
1683    let n_sign_bits = i.leading_zeros() as u8;
1684
1685    (64 - n_sign_bits + 7) / 8
1686}
1687
1688const TINY: usize = 1 << 8;
1689const SHORT: usize = 1 << 16;
1690const LONG: usize = 1 << 32;
1691
1692fn push_datum<D>(data: &mut D, datum: Datum)
1693where
1694    D: Vector<u8>,
1695{
1696    match datum {
1697        Datum::Null => data.push(Tag::Null.into()),
1698        Datum::False => data.push(Tag::False.into()),
1699        Datum::True => data.push(Tag::True.into()),
1700        Datum::Int16(i) => {
1701            let mbs = min_bytes_signed(i);
1702            let tag = u8::from(if i.is_negative() {
1703                Tag::NegativeInt16_0
1704            } else {
1705                Tag::NonNegativeInt16_0
1706            }) + mbs;
1707
1708            data.push(tag);
1709            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1710        }
1711        Datum::Int32(i) => {
1712            let mbs = min_bytes_signed(i);
1713            let tag = u8::from(if i.is_negative() {
1714                Tag::NegativeInt32_0
1715            } else {
1716                Tag::NonNegativeInt32_0
1717            }) + mbs;
1718
1719            data.push(tag);
1720            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1721        }
1722        Datum::Int64(i) => {
1723            let mbs = min_bytes_signed(i);
1724            let tag = u8::from(if i.is_negative() {
1725                Tag::NegativeInt64_0
1726            } else {
1727                Tag::NonNegativeInt64_0
1728            }) + mbs;
1729
1730            data.push(tag);
1731            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1732        }
1733        Datum::UInt8(i) => {
1734            let mbu = min_bytes_unsigned(i);
1735            let tag = u8::from(Tag::UInt8_0) + mbu;
1736            data.push(tag);
1737            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1738        }
1739        Datum::UInt16(i) => {
1740            let mbu = min_bytes_unsigned(i);
1741            let tag = u8::from(Tag::UInt16_0) + mbu;
1742            data.push(tag);
1743            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1744        }
1745        Datum::UInt32(i) => {
1746            let mbu = min_bytes_unsigned(i);
1747            let tag = u8::from(Tag::UInt32_0) + mbu;
1748            data.push(tag);
1749            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1750        }
1751        Datum::UInt64(i) => {
1752            let mbu = min_bytes_unsigned(i);
1753            let tag = u8::from(Tag::UInt64_0) + mbu;
1754            data.push(tag);
1755            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1756        }
1757        Datum::Float32(f) => {
1758            data.push(Tag::Float32.into());
1759            data.extend_from_slice(&f.to_bits().to_le_bytes());
1760        }
1761        Datum::Float64(f) => {
1762            data.push(Tag::Float64.into());
1763            data.extend_from_slice(&f.to_bits().to_le_bytes());
1764        }
1765        Datum::Date(d) => {
1766            data.push(Tag::Date.into());
1767            push_date(data, d);
1768        }
1769        Datum::Time(t) => {
1770            data.push(Tag::Time.into());
1771            push_time(data, t);
1772        }
1773        Datum::Timestamp(t) => {
1774            let datetime = t.to_naive();
1775            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1776                data.push(Tag::CheapTimestamp.into());
1777                data.extend_from_slice(&nanos.to_le_bytes());
1778            } else {
1779                data.push(Tag::Timestamp.into());
1780                push_naive_date(data, datetime.date());
1781                push_time(data, datetime.time());
1782            }
1783        }
1784        Datum::TimestampTz(t) => {
1785            let datetime = t.to_naive();
1786            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1787                data.push(Tag::CheapTimestampTz.into());
1788                data.extend_from_slice(&nanos.to_le_bytes());
1789            } else {
1790                data.push(Tag::TimestampTz.into());
1791                push_naive_date(data, datetime.date());
1792                push_time(data, datetime.time());
1793            }
1794        }
1795        Datum::Interval(i) => {
1796            data.push(Tag::Interval.into());
1797            data.extend_from_slice(&i.months.to_le_bytes());
1798            data.extend_from_slice(&i.days.to_le_bytes());
1799            data.extend_from_slice(&i.micros.to_le_bytes());
1800        }
1801        Datum::Bytes(bytes) => {
1802            let tag = match bytes.len() {
1803                0..TINY => Tag::BytesTiny,
1804                TINY..SHORT => Tag::BytesShort,
1805                SHORT..LONG => Tag::BytesLong,
1806                _ => Tag::BytesHuge,
1807            };
1808            data.push(tag.into());
1809            push_lengthed_bytes(data, bytes, tag);
1810        }
1811        Datum::String(string) => {
1812            let tag = match string.len() {
1813                0..TINY => Tag::StringTiny,
1814                TINY..SHORT => Tag::StringShort,
1815                SHORT..LONG => Tag::StringLong,
1816                _ => Tag::StringHuge,
1817            };
1818            data.push(tag.into());
1819            push_lengthed_bytes(data, string.as_bytes(), tag);
1820        }
1821        Datum::List(list) => {
1822            let tag = match list.data.len() {
1823                0..TINY => Tag::ListTiny,
1824                TINY..SHORT => Tag::ListShort,
1825                SHORT..LONG => Tag::ListLong,
1826                _ => Tag::ListHuge,
1827            };
1828            data.push(tag.into());
1829            push_lengthed_bytes(data, list.data, tag);
1830        }
1831        Datum::Uuid(u) => {
1832            data.push(Tag::Uuid.into());
1833            data.extend_from_slice(u.as_bytes());
1834        }
1835        Datum::Array(array) => {
1836            // See the comment in `Row::push_array` for details on the encoding
1837            // of arrays.
1838            data.push(Tag::Array.into());
1839            data.push(array.dims.ndims());
1840            data.extend_from_slice(array.dims.data);
1841            push_untagged_bytes(data, array.elements.data);
1842        }
1843        Datum::Map(dict) => {
1844            data.push(Tag::Dict.into());
1845            push_untagged_bytes(data, dict.data);
1846        }
1847        Datum::JsonNull => data.push(Tag::JsonNull.into()),
1848        Datum::MzTimestamp(t) => {
1849            data.push(Tag::MzTimestamp.into());
1850            data.extend_from_slice(&t.encode());
1851        }
1852        Datum::Dummy => data.push(Tag::Dummy.into()),
1853        Datum::Numeric(mut n) => {
1854            // Pseudo-canonical representation of decimal values with
1855            // insignificant zeroes trimmed. This compresses the number further
1856            // than `Numeric::trim` by removing all zeroes, and not only those in
1857            // the fractional component.
1858            numeric::cx_datum().reduce(&mut n.0);
1859            let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1860            data.push(Tag::Numeric.into());
1861            data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1862            data.push(
1863                i8::try_from(exponent)
1864                    .expect("exponent to fit within i8; should not exceed +/- 39")
1865                    .to_le_bytes()[0],
1866            );
1867            data.push(bits);
1868
1869            let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1870
1871            // Little endian machines can take the lsu directly from u16 to u8.
1872            if cfg!(target_endian = "little") {
1873                // SAFETY: `lsu` (returned by `coefficient_units()`) is a `&[u16]`, so
1874                // each element can safely be transmuted into two `u8`s.
1875                let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
1876                // The `u8` aligned version of the `lsu` should have twice as many
1877                // elements as we expect for the `u16` version.
1878                soft_assert_no_log!(
1879                    lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
1880                    "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
1881                    Numeric::digits_to_lsu_elements_len(digits) * 2,
1882                    lsu_bytes.len()
1883                );
1884                // There should be no unaligned elements in the prefix or suffix.
1885                soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
1886                data.extend_from_slice(lsu_bytes);
1887            } else {
1888                for u in lsu {
1889                    data.extend_from_slice(&u.to_le_bytes());
1890                }
1891            }
1892        }
1893        Datum::Range(range) => {
1894            // See notes on `push_range_with` for details about encoding.
1895            data.push(Tag::Range.into());
1896            data.push(range.internal_flag_bits());
1897
1898            if let Some(RangeInner { lower, upper }) = range.inner {
1899                for bound in [lower.bound, upper.bound] {
1900                    if let Some(bound) = bound {
1901                        match bound.datum() {
1902                            Datum::Null => panic!("cannot push Datum::Null into range"),
1903                            d => push_datum::<D>(data, d),
1904                        }
1905                    }
1906                }
1907            }
1908        }
1909        Datum::MzAclItem(mz_acl_item) => {
1910            data.push(Tag::MzAclItem.into());
1911            data.extend_from_slice(&mz_acl_item.encode_binary());
1912        }
1913        Datum::AclItem(acl_item) => {
1914            data.push(Tag::AclItem.into());
1915            data.extend_from_slice(&acl_item.encode_binary());
1916        }
1917    }
1918}
1919
1920/// Return the number of bytes these Datums would use if packed as a Row.
1921pub fn row_size<'a, I>(a: I) -> usize
1922where
1923    I: IntoIterator<Item = Datum<'a>>,
1924{
1925    // Using datums_size instead of a.data().len() here is safer because it will
1926    // return the size of the datums if they were packed into a Row. Although
1927    // a.data().len() happens to give the correct answer (and is faster), data()
1928    // is documented as for debugging only.
1929    let sz = datums_size::<_, _>(a);
1930    let size_of_row = std::mem::size_of::<Row>();
1931    // The Row struct attempts to inline data until it can't fit in the
1932    // preallocated size. Otherwise it spills to heap, and uses the Row to point
1933    // to that.
1934    if sz > Row::SIZE {
1935        sz + size_of_row
1936    } else {
1937        size_of_row
1938    }
1939}
1940
1941/// Number of bytes required by the datum.
1942/// This is used to optimistically pre-allocate buffers for packing rows.
1943pub fn datum_size(datum: &Datum) -> usize {
1944    match datum {
1945        Datum::Null => 1,
1946        Datum::False => 1,
1947        Datum::True => 1,
1948        Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
1949        Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
1950        Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
1951        Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1952        Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1953        Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1954        Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1955        Datum::Float32(_) => 1 + size_of::<f32>(),
1956        Datum::Float64(_) => 1 + size_of::<f64>(),
1957        Datum::Date(_) => 1 + size_of::<i32>(),
1958        Datum::Time(_) => 1 + 8,
1959        Datum::Timestamp(t) => {
1960            1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
1961                8
1962            } else {
1963                16
1964            }
1965        }
1966        Datum::TimestampTz(t) => {
1967            1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
1968                8
1969            } else {
1970                16
1971            }
1972        }
1973        Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
1974        Datum::Bytes(bytes) => {
1975            // We use a variable length representation of slice length.
1976            let bytes_for_length = match bytes.len() {
1977                0..TINY => 1,
1978                TINY..SHORT => 2,
1979                SHORT..LONG => 4,
1980                _ => 8,
1981            };
1982            1 + bytes_for_length + bytes.len()
1983        }
1984        Datum::String(string) => {
1985            // We use a variable length representation of slice length.
1986            let bytes_for_length = match string.len() {
1987                0..TINY => 1,
1988                TINY..SHORT => 2,
1989                SHORT..LONG => 4,
1990                _ => 8,
1991            };
1992            1 + bytes_for_length + string.len()
1993        }
1994        Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
1995        Datum::Array(array) => {
1996            1 + size_of::<u8>()
1997                + array.dims.data.len()
1998                + size_of::<u64>()
1999                + array.elements.data.len()
2000        }
2001        Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
2002        Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
2003        Datum::JsonNull => 1,
2004        Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
2005        Datum::Dummy => 1,
2006        Datum::Numeric(d) => {
2007            let mut d = d.0.clone();
2008            // Values must be reduced to determine appropriate number of
2009            // coefficient units.
2010            numeric::cx_datum().reduce(&mut d);
2011            // 4 = 1 bit each for tag, digits, exponent, bits
2012            4 + (d.coefficient_units().len() * 2)
2013        }
2014        Datum::Range(Range { inner }) => {
2015            // Tag + flags
2016            2 + match inner {
2017                None => 0,
2018                Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
2019                    .iter()
2020                    .map(|bound| match bound {
2021                        None => 0,
2022                        Some(bound) => bound.val.len(),
2023                    })
2024                    .sum(),
2025            }
2026        }
2027        Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
2028        Datum::AclItem(_) => 1 + AclItem::binary_size(),
2029    }
2030}
2031
2032/// Number of bytes required by a sequence of datums.
2033///
2034/// This method can be used to right-size the allocation for a `Row`
2035/// before calling [`RowPacker::extend`].
2036pub fn datums_size<'a, I, D>(iter: I) -> usize
2037where
2038    I: IntoIterator<Item = D>,
2039    D: Borrow<Datum<'a>>,
2040{
2041    iter.into_iter().map(|d| datum_size(d.borrow())).sum()
2042}
2043
2044/// Number of bytes required by a list of datums. This computes the size that would be required if
2045/// the given datums were packed into a list.
2046///
2047/// This is used to optimistically pre-allocate buffers for packing rows.
2048pub fn datum_list_size<'a, I, D>(iter: I) -> usize
2049where
2050    I: IntoIterator<Item = D>,
2051    D: Borrow<Datum<'a>>,
2052{
2053    1 + size_of::<u64>() + datums_size(iter)
2054}
2055
2056impl RowPacker<'_> {
2057    /// Constructs a row packer that will pack additional datums into the
2058    /// provided row.
2059    ///
2060    /// This function is intentionally somewhat inconvenient to call. You
2061    /// usually want to call [`Row::packer`] instead to start packing from
2062    /// scratch.
2063    pub fn for_existing_row(row: &mut Row) -> RowPacker<'_> {
2064        RowPacker { row }
2065    }
2066
2067    /// Extend an existing `Row` with a `Datum`.
2068    #[inline]
2069    pub fn push<'a, D>(&mut self, datum: D)
2070    where
2071        D: Borrow<Datum<'a>>,
2072    {
2073        push_datum(&mut self.row.data, *datum.borrow());
2074    }
2075
2076    /// Extend an existing `Row` with additional `Datum`s.
2077    #[inline]
2078    pub fn extend<'a, I, D>(&mut self, iter: I)
2079    where
2080        I: IntoIterator<Item = D>,
2081        D: Borrow<Datum<'a>>,
2082    {
2083        for datum in iter {
2084            push_datum(&mut self.row.data, *datum.borrow())
2085        }
2086    }
2087
2088    /// Extend an existing `Row` with additional `Datum`s.
2089    ///
2090    /// In the case the iterator produces an error, the pushing of
2091    /// datums in terminated and the error returned. The `Row` will
2092    /// be incomplete, but it will be safe to read datums from it.
2093    #[inline]
2094    pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
2095    where
2096        I: IntoIterator<Item = Result<D, E>>,
2097        D: Borrow<Datum<'a>>,
2098    {
2099        for datum in iter {
2100            push_datum(&mut self.row.data, *datum?.borrow());
2101        }
2102        Ok(())
2103    }
2104
2105    /// Appends the datums of an entire `Row`.
2106    pub fn extend_by_row(&mut self, row: &Row) {
2107        self.row.data.extend_from_slice(row.data.as_slice());
2108    }
2109
2110    /// Appends the slice of data representing an entire `Row`. The data is not validated.
2111    ///
2112    /// # Safety
2113    ///
2114    /// The requirements from [`Row::from_bytes_unchecked`] apply here, too:
2115    /// This method relies on `data` being an appropriate row encoding, and can
2116    /// result in unsafety if this is not the case.
2117    #[inline]
2118    pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
2119        self.row.data.extend_from_slice(data)
2120    }
2121
2122    /// Pushes a [`DatumList`] that is built from a closure.
2123    ///
2124    /// The supplied closure will be invoked once with a `Row` that can be used
2125    /// to populate the list. It is valid to call any method on the
2126    /// [`RowPacker`] except for [`RowPacker::clear`], [`RowPacker::truncate`],
2127    /// or [`RowPacker::truncate_datums`].
2128    ///
2129    /// Returns the value returned by the closure, if any.
2130    ///
2131    /// ```
2132    /// # use mz_repr::{Row, Datum};
2133    /// let mut row = Row::default();
2134    /// row.packer().push_list_with(|row| {
2135    ///     row.push(Datum::String("age"));
2136    ///     row.push(Datum::Int64(42));
2137    /// });
2138    /// assert_eq!(
2139    ///     row.unpack_first().unwrap_list().iter().collect::<Vec<_>>(),
2140    ///     vec![Datum::String("age"), Datum::Int64(42)],
2141    /// );
2142    /// ```
2143    #[inline]
2144    pub fn push_list_with<F, R>(&mut self, f: F) -> R
2145    where
2146        F: FnOnce(&mut RowPacker) -> R,
2147    {
2148        // First, assume that the list will fit in 255 bytes, and thus the length will fit in
2149        // 1 byte. If not, we'll fix it up later.
2150        let start = self.row.data.len();
2151        self.row.data.push(Tag::ListTiny.into());
2152        // Write a dummy len, will fix it up later.
2153        self.row.data.push(0);
2154
2155        let out = f(self);
2156
2157        // The `- 1 - 1` is for the tag and the len.
2158        let len = self.row.data.len() - start - 1 - 1;
2159        // We now know the real len.
2160        if len < TINY {
2161            // If the len fits in 1 byte, we just need to fix up the len.
2162            self.row.data[start + 1] = len.to_le_bytes()[0];
2163        } else {
2164            // Note: We move this code path into its own function, so that the common case can be
2165            // inlined.
2166            long_list(&mut self.row.data, start, len);
2167        }
2168
2169        /// 1. Fix up the tag.
2170        /// 2. Move the actual data a bit (for which we also need to make room at the end).
2171        /// 3. Fix up the len.
2172        /// `data`: The row's backing data.
2173        /// `start`: where `push_list_with` started writing in `data`.
2174        /// `len`: the length of the data, excluding the tag and the length.
2175        #[cold]
2176        fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2177            // `len_len`: the length of the length. (Possible values are: 2, 4, 8. 1 is handled
2178            // elsewhere.) The other parameters are the same as for `long_list`.
2179            let long_list_inner = |data: &mut CompactBytes, len_len| {
2180                // We'll need memory for the new, bigger length, so make the `CompactBytes` bigger.
2181                // The `- 1` is because the old length was 1 byte.
2182                const ZEROS: [u8; 8] = [0; 8];
2183                data.extend_from_slice(&ZEROS[0..len_len - 1]);
2184                // Move the data to the end of the `CompactBytes`, to make space for the new length.
2185                // Originally, it started after the 1-byte tag and the 1-byte length, now it will
2186                // start after the 1-byte tag and the len_len-byte length.
2187                //
2188                // Note that this is the only operation in `long_list` whose cost is proportional
2189                // to `len`. Since `len` is at least 256 here, the other operations' cost are
2190                // negligible. `copy_within` is a memmove, which is probably a fair bit faster per
2191                // Datum than a Datum encoding in the `f` closure.
2192                data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2193                // Write the new length.
2194                data[start + 1..start + 1 + len_len]
2195                    .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2196            };
2197            match len {
2198                0..TINY => {
2199                    unreachable!()
2200                }
2201                TINY..SHORT => {
2202                    data[start] = Tag::ListShort.into();
2203                    long_list_inner(data, 2);
2204                }
2205                SHORT..LONG => {
2206                    data[start] = Tag::ListLong.into();
2207                    long_list_inner(data, 4);
2208                }
2209                _ => {
2210                    data[start] = Tag::ListHuge.into();
2211                    long_list_inner(data, 8);
2212                }
2213            };
2214        }
2215
2216        out
2217    }
2218
2219    /// Pushes a [`DatumMap`] that is built from a closure.
2220    ///
2221    /// The supplied closure will be invoked once with a `Row` that can be used
2222    /// to populate the dict.
2223    ///
2224    /// The closure **must** alternate pushing string keys and arbitrary values,
2225    /// otherwise reading the dict will cause a panic.
2226    ///
2227    /// The closure **must** push keys in ascending order, otherwise equality
2228    /// checks on the resulting `Row` may be wrong and reading the dict IN DEBUG
2229    /// MODE will cause a panic.
2230    ///
2231    /// The closure **must not** call [`RowPacker::clear`],
2232    /// [`RowPacker::truncate`], or [`RowPacker::truncate_datums`].
2233    ///
2234    /// # Example
2235    ///
2236    /// ```
2237    /// # use mz_repr::{Row, Datum};
2238    /// let mut row = Row::default();
2239    /// row.packer().push_dict_with(|row| {
2240    ///
2241    ///     // key
2242    ///     row.push(Datum::String("age"));
2243    ///     // value
2244    ///     row.push(Datum::Int64(42));
2245    ///
2246    ///     // key
2247    ///     row.push(Datum::String("name"));
2248    ///     // value
2249    ///     row.push(Datum::String("bob"));
2250    /// });
2251    /// assert_eq!(
2252    ///     row.unpack_first().unwrap_map().iter().collect::<Vec<_>>(),
2253    ///     vec![("age", Datum::Int64(42)), ("name", Datum::String("bob"))]
2254    /// );
2255    /// ```
2256    pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2257    where
2258        F: FnOnce(&mut RowPacker) -> R,
2259    {
2260        self.row.data.push(Tag::Dict.into());
2261        let start = self.row.data.len();
2262        // write a dummy len, will fix it up later
2263        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2264
2265        let res = f(self);
2266
2267        let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2268        // fix up the len
2269        self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2270
2271        res
2272    }
2273
2274    /// Convenience function to construct an array from an iter of `Datum`s.
2275    ///
2276    /// Returns an error if the number of elements in `iter` does not match
2277    /// the cardinality of the array as described by `dims`, or if the
2278    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2279    /// occurs, the packer's state will be unchanged.
2280    pub fn try_push_array<'a, I, D>(
2281        &mut self,
2282        dims: &[ArrayDimension],
2283        iter: I,
2284    ) -> Result<(), InvalidArrayError>
2285    where
2286        I: IntoIterator<Item = D>,
2287        D: Borrow<Datum<'a>>,
2288    {
2289        // SAFETY: The function returns the exact number of elements pushed into the array.
2290        unsafe {
2291            self.push_array_with_unchecked(dims, |packer| {
2292                let mut nelements = 0;
2293                for datum in iter {
2294                    packer.push(datum);
2295                    nelements += 1;
2296                }
2297                Ok::<_, InvalidArrayError>(nelements)
2298            })
2299        }
2300    }
2301
2302    /// Convenience function to construct an array from a function. The function must return the
2303    /// number of elements it pushed into the array. It is undefined behavior if the function returns
2304    /// a number different to the number of elements it pushed.
2305    ///
2306    /// Returns an error if the number of elements pushed by `f` does not match
2307    /// the cardinality of the array as described by `dims`, or if the
2308    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`], or if `f` errors. If an error
2309    /// occurs, the packer's state will be unchanged.
2310    pub unsafe fn push_array_with_unchecked<F, E>(
2311        &mut self,
2312        dims: &[ArrayDimension],
2313        f: F,
2314    ) -> Result<(), E>
2315    where
2316        F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2317        E: From<InvalidArrayError>,
2318    {
2319        // Arrays are encoded as follows.
2320        //
2321        // u8    ndims
2322        // u64   dim_0 lower bound
2323        // u64   dim_0 length
2324        // ...
2325        // u64   dim_n lower bound
2326        // u64   dim_n length
2327        // u64   element data size in bytes
2328        // u8    element data, where elements are encoded in row-major order
2329
2330        if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2331            return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2332        }
2333
2334        let start = self.row.data.len();
2335        self.row.data.push(Tag::Array.into());
2336
2337        // Write dimension information.
2338        self.row
2339            .data
2340            .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2341        for dim in dims {
2342            self.row
2343                .data
2344                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2345            self.row
2346                .data
2347                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2348        }
2349
2350        // Write elements.
2351        let off = self.row.data.len();
2352        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2353        let nelements = match f(self) {
2354            Ok(nelements) => nelements,
2355            Err(e) => {
2356                self.row.data.truncate(start);
2357                return Err(e);
2358            }
2359        };
2360        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2361        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2362
2363        // Check that the number of elements written matches the dimension
2364        // information.
2365        let cardinality = match dims {
2366            [] => 0,
2367            dims => dims.iter().map(|d| d.length).product(),
2368        };
2369        if nelements != cardinality {
2370            self.row.data.truncate(start);
2371            return Err(InvalidArrayError::WrongCardinality {
2372                actual: nelements,
2373                expected: cardinality,
2374            }
2375            .into());
2376        }
2377
2378        Ok(())
2379    }
2380
2381    /// Pushes an [`Array`] that is built from a closure.
2382    ///
2383    /// __WARNING__: This is fairly "sharp" tool that is easy to get wrong. You
2384    /// should prefer [`RowPacker::try_push_array`] when possible.
2385    ///
2386    /// Returns an error if the number of elements pushed does not match
2387    /// the cardinality of the array as described by `dims`, or if the
2388    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2389    /// occurs, the packer's state will be unchanged.
2390    pub fn push_array_with_row_major<F, I>(
2391        &mut self,
2392        dims: I,
2393        f: F,
2394    ) -> Result<(), InvalidArrayError>
2395    where
2396        I: IntoIterator<Item = ArrayDimension>,
2397        F: FnOnce(&mut RowPacker) -> usize,
2398    {
2399        let start = self.row.data.len();
2400        self.row.data.push(Tag::Array.into());
2401
2402        // Write dummy dimension length for now, we'll fix it up.
2403        let dims_start = self.row.data.len();
2404        self.row.data.push(42);
2405
2406        let mut num_dims: u8 = 0;
2407        let mut cardinality: usize = 1;
2408        for dim in dims {
2409            num_dims += 1;
2410            cardinality *= dim.length;
2411
2412            self.row
2413                .data
2414                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2415            self.row
2416                .data
2417                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2418        }
2419
2420        if num_dims > MAX_ARRAY_DIMENSIONS {
2421            // Reset the packer state so we don't have invalid data.
2422            self.row.data.truncate(start);
2423            return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2424        }
2425        // Fix up our dimension length.
2426        self.row.data[dims_start..dims_start + size_of::<u8>()]
2427            .copy_from_slice(&num_dims.to_le_bytes());
2428
2429        // Write elements.
2430        let off = self.row.data.len();
2431        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2432
2433        let nelements = f(self);
2434
2435        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2436        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2437
2438        // Check that the number of elements written matches the dimension
2439        // information.
2440        let cardinality = match num_dims {
2441            0 => 0,
2442            _ => cardinality,
2443        };
2444        if nelements != cardinality {
2445            self.row.data.truncate(start);
2446            return Err(InvalidArrayError::WrongCardinality {
2447                actual: nelements,
2448                expected: cardinality,
2449            });
2450        }
2451
2452        Ok(())
2453    }
2454
2455    /// Convenience function to push a `DatumList` from an iter of `Datum`s
2456    ///
2457    /// See [`RowPacker::push_dict_with`] if you need to be able to handle errors
2458    pub fn push_list<'a, I, D>(&mut self, iter: I)
2459    where
2460        I: IntoIterator<Item = D>,
2461        D: Borrow<Datum<'a>>,
2462    {
2463        self.push_list_with(|packer| {
2464            for elem in iter {
2465                packer.push(*elem.borrow())
2466            }
2467        });
2468    }
2469
2470    /// Convenience function to push a `DatumMap` from an iter of `(&str, Datum)` pairs
2471    pub fn push_dict<'a, I, D>(&mut self, iter: I)
2472    where
2473        I: IntoIterator<Item = (&'a str, D)>,
2474        D: Borrow<Datum<'a>>,
2475    {
2476        self.push_dict_with(|packer| {
2477            for (k, v) in iter {
2478                packer.push(Datum::String(k));
2479                packer.push(*v.borrow())
2480            }
2481        })
2482    }
2483
2484    /// Pushes a `Datum::Range` derived from the `Range<Datum<'a>`.
2485    ///
2486    /// # Panics
2487    /// - If lower and upper express finite values and they are datums of
2488    ///   different types.
2489    /// - If lower or upper express finite values and are equal to
2490    ///   `Datum::Null`. To handle `Datum::Null` properly, use
2491    ///   [`RangeBound::new`].
2492    ///
2493    /// # Notes
2494    /// - This function canonicalizes the range before pushing it to the row.
2495    /// - Prefer this function over `push_range_with` because of its
2496    ///   canonicaliztion.
2497    /// - Prefer creating [`RangeBound`]s using [`RangeBound::new`], which
2498    ///   handles `Datum::Null` in a SQL-friendly way.
2499    pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2500        range.canonicalize()?;
2501        match range.inner {
2502            None => {
2503                self.row.data.push(Tag::Range.into());
2504                // Untagged bytes only contains the `RANGE_EMPTY` flag value.
2505                self.row.data.push(range::InternalFlags::EMPTY.bits());
2506                Ok(())
2507            }
2508            Some(inner) => self.push_range_with(
2509                RangeLowerBound {
2510                    inclusive: inner.lower.inclusive,
2511                    bound: inner
2512                        .lower
2513                        .bound
2514                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2515                },
2516                RangeUpperBound {
2517                    inclusive: inner.upper.inclusive,
2518                    bound: inner
2519                        .upper
2520                        .bound
2521                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2522                },
2523            ),
2524        }
2525    }
2526
2527    /// Pushes a `DatumRange` built from the specified arguments.
2528    ///
2529    /// # Warning
2530    /// Unlike `push_range`, `push_range_with` _does not_ canonicalize its
2531    /// inputs. Consequentially, this means it's possible to generate ranges
2532    /// that will not reflect the proper ordering and equality.
2533    ///
2534    /// # Panics
2535    /// - If lower or upper expresses a finite value and does not push exactly
2536    ///   one value into the `RowPacker`.
2537    /// - If lower and upper express finite values and they are datums of
2538    ///   different types.
2539    /// - If lower or upper express finite values and push `Datum::Null`.
2540    ///
2541    /// # Notes
2542    /// - Prefer `push_range_with` over this function. This function should be
2543    ///   used only when you are not pushing `Datum`s to the inner row.
2544    /// - Range encoding is `[<flag bytes>,<lower>?,<upper>?]`, where `lower`
2545    ///   and `upper` are optional, contingent on the flag value expressing an
2546    ///   empty range (where neither will be present) or infinite bounds (where
2547    ///   each infinite bound will be absent).
2548    /// - To push an emtpy range, use `push_range` using `Range { inner: None }`.
2549    pub fn push_range_with<L, U, E>(
2550        &mut self,
2551        lower: RangeLowerBound<L>,
2552        upper: RangeUpperBound<U>,
2553    ) -> Result<(), E>
2554    where
2555        L: FnOnce(&mut RowPacker) -> Result<(), E>,
2556        U: FnOnce(&mut RowPacker) -> Result<(), E>,
2557        E: From<InvalidRangeError>,
2558    {
2559        let start = self.row.data.len();
2560        self.row.data.push(Tag::Range.into());
2561
2562        let mut flags = range::InternalFlags::empty();
2563
2564        flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2565        flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2566        flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2567        flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2568
2569        let mut expected_datums = 0;
2570
2571        self.row.data.push(flags.bits());
2572
2573        let datum_check = self.row.data.len();
2574
2575        if let Some(value) = lower.bound {
2576            let start = self.row.data.len();
2577            value(self)?;
2578            assert!(
2579                start < self.row.data.len(),
2580                "finite values must each push exactly one value; expected 1 but got 0"
2581            );
2582            expected_datums += 1;
2583        }
2584
2585        if let Some(value) = upper.bound {
2586            let start = self.row.data.len();
2587            value(self)?;
2588            assert!(
2589                start < self.row.data.len(),
2590                "finite values must each push exactly one value; expected 1 but got 0"
2591            );
2592            expected_datums += 1;
2593        }
2594
2595        // Validate the invariants that 0, 1, or 2 elements were pushed, none are Null,
2596        // and if two are pushed then the second is not less than the first. Panic in
2597        // some cases and error in others.
2598        let mut actual_datums = 0;
2599        let mut seen = None;
2600        let mut dataz = &self.row.data[datum_check..];
2601        while !dataz.is_empty() {
2602            let d = unsafe { read_datum(&mut dataz) };
2603            assert!(d != Datum::Null, "cannot push Datum::Null into range");
2604
2605            match seen {
2606                None => seen = Some(d),
2607                Some(seen) => {
2608                    let seen_kind = DatumKind::from(seen);
2609                    let d_kind = DatumKind::from(d);
2610                    assert!(
2611                        seen_kind == d_kind,
2612                        "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2613                    );
2614
2615                    if seen > d {
2616                        self.row.data.truncate(start);
2617                        return Err(InvalidRangeError::MisorderedRangeBounds.into());
2618                    }
2619                }
2620            }
2621            actual_datums += 1;
2622        }
2623
2624        assert!(
2625            actual_datums == expected_datums,
2626            "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2627        );
2628
2629        Ok(())
2630    }
2631
2632    /// Clears the contents of the packer without de-allocating its backing memory.
2633    pub fn clear(&mut self) {
2634        self.row.data.clear();
2635    }
2636
2637    /// Truncates the underlying storage to the specified byte position.
2638    ///
2639    /// # Safety
2640    ///
2641    /// `pos` MUST specify a byte offset that lies on a datum boundary.
2642    /// If `pos` specifies a byte offset that is *within* a datum, the row
2643    /// packer will produce an invalid row, the unpacking of which may
2644    /// trigger undefined behavior!
2645    ///
2646    /// To find the byte offset of a datum boundary, inspect the packer's
2647    /// byte length by calling `packer.data().len()` after pushing the desired
2648    /// number of datums onto the packer.
2649    pub unsafe fn truncate(&mut self, pos: usize) {
2650        self.row.data.truncate(pos)
2651    }
2652
2653    /// Truncates the underlying row to contain at most the first `n` datums.
2654    pub fn truncate_datums(&mut self, n: usize) {
2655        let prev_len = self.row.data.len();
2656        let mut iter = self.row.iter();
2657        for _ in iter.by_ref().take(n) {}
2658        let next_len = iter.data.len();
2659        // SAFETY: iterator offsets always lie on a datum boundary.
2660        unsafe { self.truncate(prev_len - next_len) }
2661    }
2662
2663    /// Returns the total amount of bytes used by the underlying row.
2664    pub fn byte_len(&self) -> usize {
2665        self.row.byte_len()
2666    }
2667}
2668
2669impl<'a> IntoIterator for &'a Row {
2670    type Item = Datum<'a>;
2671    type IntoIter = DatumListIter<'a>;
2672    fn into_iter(self) -> DatumListIter<'a> {
2673        self.iter()
2674    }
2675}
2676
2677impl fmt::Debug for Row {
2678    /// Debug representation using the internal datums
2679    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2680        f.write_str("Row{")?;
2681        f.debug_list().entries(self.iter()).finish()?;
2682        f.write_str("}")
2683    }
2684}
2685
2686impl fmt::Display for Row {
2687    /// Display representation using the internal datums
2688    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2689        f.write_str("(")?;
2690        for (i, datum) in self.iter().enumerate() {
2691            if i != 0 {
2692                f.write_str(", ")?;
2693            }
2694            write!(f, "{}", datum)?;
2695        }
2696        f.write_str(")")
2697    }
2698}
2699
2700impl<'a> DatumList<'a> {
2701    pub fn empty() -> DatumList<'static> {
2702        DatumList { data: &[] }
2703    }
2704
2705    pub fn iter(&self) -> DatumListIter<'a> {
2706        DatumListIter { data: self.data }
2707    }
2708
2709    /// For debugging only
2710    pub fn data(&self) -> &'a [u8] {
2711        self.data
2712    }
2713}
2714
2715impl<'a> IntoIterator for DatumList<'a> {
2716    type Item = Datum<'a>;
2717    type IntoIter = DatumListIter<'a>;
2718    fn into_iter(self) -> DatumListIter<'a> {
2719        self.iter()
2720    }
2721}
2722
2723impl<'a> Iterator for DatumListIter<'a> {
2724    type Item = Datum<'a>;
2725    fn next(&mut self) -> Option<Self::Item> {
2726        if self.data.is_empty() {
2727            None
2728        } else {
2729            Some(unsafe { read_datum(&mut self.data) })
2730        }
2731    }
2732}
2733
2734impl<'a> DatumMap<'a> {
2735    pub fn empty() -> DatumMap<'static> {
2736        DatumMap { data: &[] }
2737    }
2738
2739    pub fn iter(&self) -> DatumDictIter<'a> {
2740        DatumDictIter {
2741            data: self.data,
2742            prev_key: None,
2743        }
2744    }
2745
2746    /// For debugging only
2747    pub fn data(&self) -> &'a [u8] {
2748        self.data
2749    }
2750}
2751
2752impl<'a> Debug for DatumMap<'a> {
2753    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2754        f.debug_map().entries(self.iter()).finish()
2755    }
2756}
2757
2758impl<'a> IntoIterator for &'a DatumMap<'a> {
2759    type Item = (&'a str, Datum<'a>);
2760    type IntoIter = DatumDictIter<'a>;
2761    fn into_iter(self) -> DatumDictIter<'a> {
2762        self.iter()
2763    }
2764}
2765
2766impl<'a> Iterator for DatumDictIter<'a> {
2767    type Item = (&'a str, Datum<'a>);
2768    fn next(&mut self) -> Option<Self::Item> {
2769        if self.data.is_empty() {
2770            None
2771        } else {
2772            let key_tag =
2773                Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2774            assert!(
2775                key_tag == Tag::StringTiny
2776                    || key_tag == Tag::StringShort
2777                    || key_tag == Tag::StringLong
2778                    || key_tag == Tag::StringHuge,
2779                "Dict keys must be strings, got {:?}",
2780                key_tag
2781            );
2782            let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
2783            let val = unsafe { read_datum(&mut self.data) };
2784
2785            // if in debug mode, sanity check keys
2786            if cfg!(debug_assertions) {
2787                if let Some(prev_key) = self.prev_key {
2788                    debug_assert!(
2789                        prev_key < key,
2790                        "Dict keys must be unique and given in ascending order: {} came before {}",
2791                        prev_key,
2792                        key
2793                    );
2794                }
2795                self.prev_key = Some(key);
2796            }
2797
2798            Some((key, val))
2799        }
2800    }
2801}
2802
2803impl RowArena {
2804    pub fn new() -> Self {
2805        RowArena {
2806            inner: RefCell::new(vec![]),
2807        }
2808    }
2809
2810    /// Creates a `RowArena` with a hint of how many rows will be created in the arena, to avoid
2811    /// reallocations of its internal vector.
2812    pub fn with_capacity(capacity: usize) -> Self {
2813        RowArena {
2814            inner: RefCell::new(Vec::with_capacity(capacity)),
2815        }
2816    }
2817
2818    /// Does a `reserve` on the underlying `Vec`. Call this when you expect `additional` more datums
2819    /// to be created in this arena.
2820    pub fn reserve(&self, additional: usize) {
2821        self.inner.borrow_mut().reserve(additional);
2822    }
2823
2824    /// Take ownership of `bytes` for the lifetime of the arena.
2825    #[allow(clippy::transmute_ptr_to_ptr)]
2826    pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
2827        let mut inner = self.inner.borrow_mut();
2828        inner.push(bytes);
2829        let owned_bytes = &inner[inner.len() - 1];
2830        unsafe {
2831            // This is safe because:
2832            //   * We only ever append to self.inner, so the byte vector
2833            //     will live as long as the arena.
2834            //   * We return a reference to the byte vector's contents, so it's
2835            //     okay if self.inner reallocates and moves the byte
2836            //     vector.
2837            //   * We don't allow access to the byte vector itself, so it will
2838            //     never reallocate.
2839            transmute::<&[u8], &'a [u8]>(owned_bytes)
2840        }
2841    }
2842
2843    /// Take ownership of `string` for the lifetime of the arena.
2844    pub fn push_string<'a>(&'a self, string: String) -> &'a str {
2845        let owned_bytes = self.push_bytes(string.into_bytes());
2846        unsafe {
2847            // This is safe because we know it was a `String` just before.
2848            std::str::from_utf8_unchecked(owned_bytes)
2849        }
2850    }
2851
2852    /// Take ownership of `row` for the lifetime of the arena, returning a
2853    /// reference to the first datum in the row.
2854    ///
2855    /// If we had an owned datum type, this method would be much clearer, and
2856    /// would be called `push_owned_datum`.
2857    pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
2858        let mut inner = self.inner.borrow_mut();
2859        inner.push(row.data.into_vec());
2860        unsafe {
2861            // This is safe because:
2862            //   * We only ever append to self.inner, so the row data will live
2863            //     as long as the arena.
2864            //   * We force the row data into its own heap allocation--
2865            //     importantly, we do NOT store the SmallVec, which might be
2866            //     storing data inline--so it's okay if self.inner reallocates
2867            //     and moves the row.
2868            //   * We don't allow access to the byte vector itself, so it will
2869            //     never reallocate.
2870            let datum = read_datum(&mut &inner[inner.len() - 1][..]);
2871            transmute::<Datum<'_>, Datum<'a>>(datum)
2872        }
2873    }
2874
2875    /// Equivalent to `push_unary_row` but returns a `DatumNested` rather than a
2876    /// `Datum`.
2877    fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
2878        let mut inner = self.inner.borrow_mut();
2879        inner.push(row.data.into_vec());
2880        unsafe {
2881            // This is safe because:
2882            //   * We only ever append to self.inner, so the row data will live
2883            //     as long as the arena.
2884            //   * We force the row data into its own heap allocation--
2885            //     importantly, we do NOT store the SmallVec, which might be
2886            //     storing data inline--so it's okay if self.inner reallocates
2887            //     and moves the row.
2888            //   * We don't allow access to the byte vector itself, so it will
2889            //     never reallocate.
2890            let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
2891            transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
2892        }
2893    }
2894
2895    /// Convenience function to make a new `Row` containing a single datum, and
2896    /// take ownership of it for the lifetime of the arena
2897    ///
2898    /// ```
2899    /// # use mz_repr::{RowArena, Datum};
2900    /// let arena = RowArena::new();
2901    /// let datum = arena.make_datum(|packer| {
2902    ///   packer.push_list(&[Datum::String("hello"), Datum::String("world")]);
2903    /// });
2904    /// assert_eq!(datum.unwrap_list().iter().collect::<Vec<_>>(), vec![Datum::String("hello"), Datum::String("world")]);
2905    /// ```
2906    pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
2907    where
2908        F: FnOnce(&mut RowPacker),
2909    {
2910        let mut row = Row::default();
2911        f(&mut row.packer());
2912        self.push_unary_row(row)
2913    }
2914
2915    /// Convenience function identical to `make_datum` but instead returns a
2916    /// `DatumNested`.
2917    pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
2918    where
2919        F: FnOnce(&mut RowPacker),
2920    {
2921        let mut row = Row::default();
2922        f(&mut row.packer());
2923        self.push_unary_row_datum_nested(row)
2924    }
2925
2926    /// Like [`RowArena::make_datum`], but the provided closure can return an error.
2927    pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
2928    where
2929        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2930    {
2931        let mut row = Row::default();
2932        f(&mut row.packer())?;
2933        Ok(self.push_unary_row(row))
2934    }
2935
2936    /// Clear the contents of the arena.
2937    pub fn clear(&mut self) {
2938        self.inner.borrow_mut().clear();
2939    }
2940}
2941
2942impl Default for RowArena {
2943    fn default() -> RowArena {
2944        RowArena::new()
2945    }
2946}
2947
2948/// A thread-local row, which can be borrowed and returned.
2949/// # Example
2950///
2951/// Use this type instead of creating a new row:
2952/// ```
2953/// use mz_repr::SharedRow;
2954///
2955/// let mut row_builder = SharedRow::get();
2956/// ```
2957///
2958/// This allows us to reuse an existing row allocation instead of creating a new one or retaining
2959/// an allocation locally. Additionally, we can observe the size of the local row in a central
2960/// place and potentially reallocate to reduce memory needs.
2961///
2962/// # Panic
2963///
2964/// [`SharedRow::get`] panics when trying to obtain multiple references to the shared row.
2965#[derive(Debug)]
2966pub struct SharedRow(Row);
2967
2968impl SharedRow {
2969    thread_local! {
2970        /// A thread-local slot containing a shared Row that can be temporarily used by a function.
2971        /// There can be at most one active user of this Row, which is tracked by the state of the
2972        /// `Option<_>` wrapper. When it is `Some(..)`, the row is available for using. When it
2973        /// is `None`, it is not, and the constructor will panic if a thread attempts to use it.
2974        static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
2975    }
2976
2977    /// Get the shared row.
2978    ///
2979    /// The row's contents are cleared before returning it.
2980    ///
2981    /// # Panic
2982    ///
2983    /// Panics when the row is already borrowed elsewhere.
2984    pub fn get() -> Self {
2985        let mut row = Self::SHARED_ROW
2986            .take()
2987            .expect("attempted to borrow already borrowed SharedRow");
2988        // Clear row
2989        row.packer();
2990        Self(row)
2991    }
2992
2993    /// Gets the shared row and uses it to pack `iter`.
2994    pub fn pack<'a, I, D>(iter: I) -> Row
2995    where
2996        I: IntoIterator<Item = D>,
2997        D: Borrow<Datum<'a>>,
2998    {
2999        let mut row_builder = Self::get();
3000        let mut row_packer = row_builder.packer();
3001        row_packer.extend(iter);
3002        row_builder.clone()
3003    }
3004}
3005
3006impl std::ops::Deref for SharedRow {
3007    type Target = Row;
3008
3009    fn deref(&self) -> &Self::Target {
3010        &self.0
3011    }
3012}
3013
3014impl std::ops::DerefMut for SharedRow {
3015    fn deref_mut(&mut self) -> &mut Self::Target {
3016        &mut self.0
3017    }
3018}
3019
3020impl Drop for SharedRow {
3021    fn drop(&mut self) {
3022        // Take the Row allocation from this instance and put it back in the thread local slot for
3023        // the next user. The Row in `self` is replaced with an empty Row which does not allocate.
3024        Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
3025    }
3026}
3027
3028#[cfg(test)]
3029mod tests {
3030    use std::cmp::Ordering;
3031    use std::collections::hash_map::DefaultHasher;
3032    use std::hash::{Hash, Hasher};
3033
3034    use chrono::{DateTime, NaiveDate};
3035    use itertools::Itertools;
3036    use mz_ore::{assert_err, assert_none};
3037    use ordered_float::OrderedFloat;
3038
3039    use crate::SqlScalarType;
3040
3041    use super::*;
3042
3043    fn hash<T: Hash>(t: &T) -> u64 {
3044        let mut hasher = DefaultHasher::new();
3045        t.hash(&mut hasher);
3046        hasher.finish()
3047    }
3048
3049    #[mz_ore::test]
3050    fn test_assumptions() {
3051        assert_eq!(size_of::<Tag>(), 1);
3052        #[cfg(target_endian = "big")]
3053        {
3054            // if you want to run this on a big-endian cpu, we'll need big-endian versions of the serialization code
3055            assert!(false);
3056        }
3057    }
3058
3059    #[mz_ore::test]
3060    fn miri_test_arena() {
3061        let arena = RowArena::new();
3062
3063        assert_eq!(arena.push_string("".to_owned()), "");
3064        assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
3065
3066        let empty: &[u8] = &[];
3067        assert_eq!(arena.push_bytes(vec![]), empty);
3068        assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
3069
3070        let mut row = Row::default();
3071        let mut packer = row.packer();
3072        packer.push_dict_with(|row| {
3073            row.push(Datum::String("a"));
3074            row.push_list_with(|row| {
3075                row.push(Datum::String("one"));
3076                row.push(Datum::String("two"));
3077                row.push(Datum::String("three"));
3078            });
3079            row.push(Datum::String("b"));
3080            row.push(Datum::String("c"));
3081        });
3082        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
3083    }
3084
3085    #[mz_ore::test]
3086    fn miri_test_round_trip() {
3087        fn round_trip(datums: Vec<Datum>) {
3088            let row = Row::pack(datums.clone());
3089
3090            // When run under miri this catches undefined bytes written to data
3091            // eg by calling push_copy! on a type which contains undefined padding values
3092            println!("{:?}", row.data());
3093
3094            let datums2 = row.iter().collect::<Vec<_>>();
3095            let datums3 = row.unpack();
3096            assert_eq!(datums, datums2);
3097            assert_eq!(datums, datums3);
3098        }
3099
3100        round_trip(vec![]);
3101        round_trip(
3102            SqlScalarType::enumerate()
3103                .iter()
3104                .flat_map(|r#type| r#type.interesting_datums())
3105                .collect(),
3106        );
3107        round_trip(vec![
3108            Datum::Null,
3109            Datum::Null,
3110            Datum::False,
3111            Datum::True,
3112            Datum::Int16(-21),
3113            Datum::Int32(-42),
3114            Datum::Int64(-2_147_483_648 - 42),
3115            Datum::UInt8(0),
3116            Datum::UInt8(1),
3117            Datum::UInt16(0),
3118            Datum::UInt16(1),
3119            Datum::UInt16(1 << 8),
3120            Datum::UInt32(0),
3121            Datum::UInt32(1),
3122            Datum::UInt32(1 << 8),
3123            Datum::UInt32(1 << 16),
3124            Datum::UInt32(1 << 24),
3125            Datum::UInt64(0),
3126            Datum::UInt64(1),
3127            Datum::UInt64(1 << 8),
3128            Datum::UInt64(1 << 16),
3129            Datum::UInt64(1 << 24),
3130            Datum::UInt64(1 << 32),
3131            Datum::UInt64(1 << 40),
3132            Datum::UInt64(1 << 48),
3133            Datum::UInt64(1 << 56),
3134            Datum::Float32(OrderedFloat::from(-42.12)),
3135            Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
3136            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
3137            Datum::Timestamp(
3138                CheckedTimestamp::from_timestamplike(
3139                    NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
3140                        .unwrap()
3141                        .and_hms_opt(14, 32, 11)
3142                        .unwrap(),
3143                )
3144                .unwrap(),
3145            ),
3146            Datum::TimestampTz(
3147                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3148                    .unwrap(),
3149            ),
3150            Datum::Interval(Interval {
3151                months: 312,
3152                ..Default::default()
3153            }),
3154            Datum::Interval(Interval::new(0, 0, 1_012_312)),
3155            Datum::Bytes(&[]),
3156            Datum::Bytes(&[0, 2, 1, 255]),
3157            Datum::String(""),
3158            Datum::String("العَرَبِيَّة"),
3159        ]);
3160    }
3161
3162    #[mz_ore::test]
3163    fn test_array() {
3164        // Construct an array using `Row::push_array` and verify that it unpacks
3165        // correctly.
3166        const DIM: ArrayDimension = ArrayDimension {
3167            lower_bound: 2,
3168            length: 2,
3169        };
3170        let mut row = Row::default();
3171        let mut packer = row.packer();
3172        packer
3173            .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3174            .unwrap();
3175        let arr1 = row.unpack_first().unwrap_array();
3176        assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3177        assert_eq!(
3178            arr1.elements().into_iter().collect::<Vec<_>>(),
3179            vec![Datum::Int32(1), Datum::Int32(2)]
3180        );
3181
3182        // Pack a previously-constructed `Datum::Array` and verify that it
3183        // unpacks correctly.
3184        let row = Row::pack_slice(&[Datum::Array(arr1)]);
3185        let arr2 = row.unpack_first().unwrap_array();
3186        assert_eq!(arr1, arr2);
3187    }
3188
3189    #[mz_ore::test]
3190    fn test_multidimensional_array() {
3191        let datums = vec![
3192            Datum::Int32(1),
3193            Datum::Int32(2),
3194            Datum::Int32(3),
3195            Datum::Int32(4),
3196            Datum::Int32(5),
3197            Datum::Int32(6),
3198            Datum::Int32(7),
3199            Datum::Int32(8),
3200        ];
3201
3202        let mut row = Row::default();
3203        let mut packer = row.packer();
3204        packer
3205            .try_push_array(
3206                &[
3207                    ArrayDimension {
3208                        lower_bound: 1,
3209                        length: 1,
3210                    },
3211                    ArrayDimension {
3212                        lower_bound: 1,
3213                        length: 4,
3214                    },
3215                    ArrayDimension {
3216                        lower_bound: 1,
3217                        length: 2,
3218                    },
3219                ],
3220                &datums,
3221            )
3222            .unwrap();
3223        let array = row.unpack_first().unwrap_array();
3224        assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3225    }
3226
3227    #[mz_ore::test]
3228    fn test_array_max_dimensions() {
3229        let mut row = Row::default();
3230        let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3231
3232        // An array with one too many dimensions should be rejected.
3233        let res = row.packer().try_push_array(
3234            &vec![
3235                ArrayDimension {
3236                    lower_bound: 1,
3237                    length: 1
3238                };
3239                max_dims + 1
3240            ],
3241            vec![Datum::Int32(4)],
3242        );
3243        assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3244        assert!(row.data.is_empty());
3245
3246        // An array with exactly the maximum allowable dimensions should be
3247        // accepted.
3248        row.packer()
3249            .try_push_array(
3250                &vec![
3251                    ArrayDimension {
3252                        lower_bound: 1,
3253                        length: 1
3254                    };
3255                    max_dims
3256                ],
3257                vec![Datum::Int32(4)],
3258            )
3259            .unwrap();
3260    }
3261
3262    #[mz_ore::test]
3263    fn test_array_wrong_cardinality() {
3264        let mut row = Row::default();
3265        let res = row.packer().try_push_array(
3266            &[
3267                ArrayDimension {
3268                    lower_bound: 1,
3269                    length: 2,
3270                },
3271                ArrayDimension {
3272                    lower_bound: 1,
3273                    length: 3,
3274                },
3275            ],
3276            vec![Datum::Int32(1), Datum::Int32(2)],
3277        );
3278        assert_eq!(
3279            res,
3280            Err(InvalidArrayError::WrongCardinality {
3281                actual: 2,
3282                expected: 6,
3283            })
3284        );
3285        assert!(row.data.is_empty());
3286    }
3287
3288    #[mz_ore::test]
3289    fn test_nesting() {
3290        let mut row = Row::default();
3291        row.packer().push_dict_with(|row| {
3292            row.push(Datum::String("favourites"));
3293            row.push_list_with(|row| {
3294                row.push(Datum::String("ice cream"));
3295                row.push(Datum::String("oreos"));
3296                row.push(Datum::String("cheesecake"));
3297            });
3298            row.push(Datum::String("name"));
3299            row.push(Datum::String("bob"));
3300        });
3301
3302        let mut iter = row.unpack_first().unwrap_map().iter();
3303
3304        let (k, v) = iter.next().unwrap();
3305        assert_eq!(k, "favourites");
3306        assert_eq!(
3307            v.unwrap_list().iter().collect::<Vec<_>>(),
3308            vec![
3309                Datum::String("ice cream"),
3310                Datum::String("oreos"),
3311                Datum::String("cheesecake"),
3312            ]
3313        );
3314
3315        let (k, v) = iter.next().unwrap();
3316        assert_eq!(k, "name");
3317        assert_eq!(v, Datum::String("bob"));
3318    }
3319
3320    #[mz_ore::test]
3321    fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3322        let pack = |ok| {
3323            let mut row = Row::default();
3324            row.packer().push_dict_with(|row| {
3325                if ok {
3326                    row.push(Datum::String("key"));
3327                    row.push(Datum::Int32(42));
3328                    Ok(7)
3329                } else {
3330                    Err("fail")
3331                }
3332            })?;
3333            Ok(row)
3334        };
3335
3336        assert_eq!(pack(false), Err("fail"));
3337
3338        let row = pack(true)?;
3339        let mut dict = row.unpack_first().unwrap_map().iter();
3340        assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3341        assert_eq!(dict.next(), None);
3342
3343        Ok(())
3344    }
3345
3346    #[mz_ore::test]
3347    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
3348    fn test_datum_sizes() {
3349        let arena = RowArena::new();
3350
3351        // Test the claims about various datum sizes.
3352        let values_of_interest = vec![
3353            Datum::Null,
3354            Datum::False,
3355            Datum::Int16(0),
3356            Datum::Int32(0),
3357            Datum::Int64(0),
3358            Datum::UInt8(0),
3359            Datum::UInt8(1),
3360            Datum::UInt16(0),
3361            Datum::UInt16(1),
3362            Datum::UInt16(1 << 8),
3363            Datum::UInt32(0),
3364            Datum::UInt32(1),
3365            Datum::UInt32(1 << 8),
3366            Datum::UInt32(1 << 16),
3367            Datum::UInt32(1 << 24),
3368            Datum::UInt64(0),
3369            Datum::UInt64(1),
3370            Datum::UInt64(1 << 8),
3371            Datum::UInt64(1 << 16),
3372            Datum::UInt64(1 << 24),
3373            Datum::UInt64(1 << 32),
3374            Datum::UInt64(1 << 40),
3375            Datum::UInt64(1 << 48),
3376            Datum::UInt64(1 << 56),
3377            Datum::Float32(OrderedFloat(0.0)),
3378            Datum::Float64(OrderedFloat(0.0)),
3379            Datum::from(numeric::Numeric::from(0)),
3380            Datum::from(numeric::Numeric::from(1000)),
3381            Datum::from(numeric::Numeric::from(9999)),
3382            Datum::Date(
3383                NaiveDate::from_ymd_opt(1, 1, 1)
3384                    .unwrap()
3385                    .try_into()
3386                    .unwrap(),
3387            ),
3388            Datum::Timestamp(
3389                CheckedTimestamp::from_timestamplike(
3390                    DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3391                )
3392                .unwrap(),
3393            ),
3394            Datum::TimestampTz(
3395                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3396                    .unwrap(),
3397            ),
3398            Datum::Interval(Interval::default()),
3399            Datum::Bytes(&[]),
3400            Datum::String(""),
3401            Datum::JsonNull,
3402            Datum::Range(Range { inner: None }),
3403            arena.make_datum(|packer| {
3404                packer
3405                    .push_range(Range::new(Some((
3406                        RangeLowerBound::new(Datum::Int32(-1), true),
3407                        RangeUpperBound::new(Datum::Int32(1), true),
3408                    ))))
3409                    .unwrap();
3410            }),
3411        ];
3412        for value in values_of_interest {
3413            if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3414                panic!("Disparity in claimed size for {:?}", value);
3415            }
3416        }
3417    }
3418
3419    #[mz_ore::test]
3420    fn test_range_errors() {
3421        fn test_range_errors_inner<'a>(
3422            datums: Vec<Vec<Datum<'a>>>,
3423        ) -> Result<(), InvalidRangeError> {
3424            let mut row = Row::default();
3425            let row_len = row.byte_len();
3426            let mut packer = row.packer();
3427            let r = packer.push_range_with(
3428                RangeLowerBound {
3429                    inclusive: true,
3430                    bound: Some(|row: &mut RowPacker| {
3431                        for d in &datums[0] {
3432                            row.push(d);
3433                        }
3434                        Ok(())
3435                    }),
3436                },
3437                RangeUpperBound {
3438                    inclusive: true,
3439                    bound: Some(|row: &mut RowPacker| {
3440                        for d in &datums[1] {
3441                            row.push(d);
3442                        }
3443                        Ok(())
3444                    }),
3445                },
3446            );
3447
3448            assert_eq!(row_len, row.byte_len());
3449
3450            r
3451        }
3452
3453        for panicking_case in [
3454            vec![vec![Datum::Int32(1)], vec![]],
3455            vec![
3456                vec![Datum::Int32(1), Datum::Int32(2)],
3457                vec![Datum::Int32(3)],
3458            ],
3459            vec![
3460                vec![Datum::Int32(1)],
3461                vec![Datum::Int32(2), Datum::Int32(3)],
3462            ],
3463            vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3464            vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3465            vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3466            vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3467        ] {
3468            #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
3469            let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3470            assert_err!(result);
3471        }
3472
3473        let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3474        assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3475    }
3476
3477    /// Lists have a variable-length encoding for their lengths. We test each case here.
3478    #[mz_ore::test]
3479    #[cfg_attr(miri, ignore)] // slow
3480    fn test_list_encoding() {
3481        fn test_list_encoding_inner(len: usize) {
3482            let list_elem = |i: usize| {
3483                if i % 2 == 0 {
3484                    Datum::False
3485                } else {
3486                    Datum::True
3487                }
3488            };
3489            let mut row = Row::default();
3490            {
3491                // Push some stuff.
3492                let mut packer = row.packer();
3493                packer.push(Datum::String("start"));
3494                packer.push_list_with(|packer| {
3495                    for i in 0..len {
3496                        packer.push(list_elem(i));
3497                    }
3498                });
3499                packer.push(Datum::String("end"));
3500            }
3501            // Check that we read back exactly what we pushed.
3502            let mut row_it = row.iter();
3503            assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3504            match row_it.next().unwrap() {
3505                Datum::List(list) => {
3506                    let mut list_it = list.iter();
3507                    for i in 0..len {
3508                        assert_eq!(list_it.next().unwrap(), list_elem(i));
3509                    }
3510                    assert_none!(list_it.next());
3511                }
3512                _ => panic!("expected Datum::List"),
3513            }
3514            assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3515            assert_none!(row_it.next());
3516        }
3517
3518        test_list_encoding_inner(0);
3519        test_list_encoding_inner(1);
3520        test_list_encoding_inner(10);
3521        test_list_encoding_inner(TINY - 1); // tiny
3522        test_list_encoding_inner(TINY + 1); // short
3523        test_list_encoding_inner(SHORT + 1); // long
3524
3525        // The biggest one takes 40 s on my laptop, probably not worth it.
3526        //test_list_encoding_inner(LONG + 1); // huge
3527    }
3528
3529    /// Demonstrates that DatumList's Eq (bytewise) and Ord (datum-by-datum) are now consistent.
3530    /// A list containing -0.0 and one containing +0.0 have different byte representations
3531    /// (IEEE 754 distinguishes them), originally Eq says they are not equal. But after
3532    /// using the new Datum::cmp, Eq says they are equal, which matches what Ord
3533    /// compares via iter().cmp(other.iter()), and them as equal.
3534    #[mz_ore::test]
3535    fn test_datum_list_eq_ord_consistency() {
3536        // Build list containing +0.0
3537        let mut row_pos = Row::default();
3538        row_pos.packer().push_list_with(|p| {
3539            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3540        });
3541        let list_pos = row_pos.unpack_first().unwrap_list();
3542
3543        // Build list containing -0.0 (distinct bit pattern from +0.0)
3544        let mut row_neg = Row::default();
3545        row_neg.packer().push_list_with(|p| {
3546            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3547        });
3548        let list_neg = row_neg.unpack_first().unwrap_list();
3549
3550        // Eq is bytewise: different encodings => not equal
3551        // This was a bug in the past, so we test it.
3552        assert_eq!(
3553            list_pos, list_neg,
3554            "Eq should see different encodings as equal"
3555        );
3556
3557        // Ord is datum-by-datum: -0.0 and +0.0 compare equal as Datums
3558        assert_eq!(
3559            list_pos.cmp(&list_neg),
3560            Ordering::Equal,
3561            "Ord (datum-by-datum) should see -0.0 and +0.0 as equal"
3562        );
3563    }
3564
3565    /// Demonstrates that DatumMap's derived Eq (bytewise) can make maps with equal keys and
3566    /// values compare equal when values have different encodings (e.g. -0.0 vs +0.0).
3567    #[mz_ore::test]
3568    fn test_datum_map_eq_bytewise_consistency() {
3569        // Build map {"k": +0.0}
3570        let mut row_pos = Row::default();
3571        row_pos.packer().push_dict_with(|p| {
3572            p.push(Datum::String("k"));
3573            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3574        });
3575        let map_pos = row_pos.unpack_first().unwrap_map();
3576
3577        // Build map {"k": -0.0}
3578        let mut row_neg = Row::default();
3579        row_neg.packer().push_dict_with(|p| {
3580            p.push(Datum::String("k"));
3581            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3582        });
3583        let map_neg = row_neg.unpack_first().unwrap_map();
3584
3585        // Same keys and semantically equal values, but Eq (bytewise) says not equal
3586        assert_eq!(
3587            map_pos, map_neg,
3588            "DatumMap Eq is semantic; -0.0 and +0.0 have different encodings but are equal"
3589        );
3590        // Verify they have the same logical content
3591        let entries_pos: Vec<_> = map_pos.iter().collect();
3592        let entries_neg: Vec<_> = map_neg.iter().collect();
3593        assert_eq!(entries_pos.len(), entries_neg.len());
3594        for ((k1, v1), (k2, v2)) in entries_pos.iter().zip_eq(entries_neg.iter()) {
3595            assert_eq!(k1, k2);
3596            assert_eq!(
3597                v1, v2,
3598                "Datum-level comparison treats -0.0 and +0.0 as equal"
3599            );
3600        }
3601    }
3602
3603    /// Hash must agree with Eq: equal lists must have the same hash.
3604    #[mz_ore::test]
3605    fn test_datum_list_hash_consistency() {
3606        // Equal lists (including -0.0 vs +0.0) must hash the same
3607        let mut row_pos = Row::default();
3608        row_pos.packer().push_list_with(|p| {
3609            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3610        });
3611        let list_pos = row_pos.unpack_first().unwrap_list();
3612
3613        let mut row_neg = Row::default();
3614        row_neg.packer().push_list_with(|p| {
3615            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3616        });
3617        let list_neg = row_neg.unpack_first().unwrap_list();
3618
3619        assert_eq!(list_pos, list_neg);
3620        assert_eq!(
3621            hash(&list_pos),
3622            hash(&list_neg),
3623            "equal lists must have same hash"
3624        );
3625
3626        // Unequal lists should have different hashes (with asymptotic probability 1)
3627        let mut row_a = Row::default();
3628        row_a.packer().push_list_with(|p| {
3629            p.push(Datum::Int32(1));
3630            p.push(Datum::Int32(2));
3631        });
3632        let list_a = row_a.unpack_first().unwrap_list();
3633
3634        let mut row_b = Row::default();
3635        row_b.packer().push_list_with(|p| {
3636            p.push(Datum::Int32(1));
3637            p.push(Datum::Int32(3));
3638        });
3639        let list_b = row_b.unpack_first().unwrap_list();
3640
3641        assert_ne!(list_a, list_b);
3642        assert_ne!(
3643            hash(&list_a),
3644            hash(&list_b),
3645            "unequal lists must have different hashes"
3646        );
3647    }
3648
3649    /// Ord/PartialOrd for DatumList: less, equal, greater.
3650    #[mz_ore::test]
3651    fn test_datum_list_ordering() {
3652        let mut row_12 = Row::default();
3653        row_12.packer().push_list_with(|p| {
3654            p.push(Datum::Int32(1));
3655            p.push(Datum::Int32(2));
3656        });
3657        let list_12 = row_12.unpack_first().unwrap_list();
3658
3659        let mut row_13 = Row::default();
3660        row_13.packer().push_list_with(|p| {
3661            p.push(Datum::Int32(1));
3662            p.push(Datum::Int32(3));
3663        });
3664        let list_13 = row_13.unpack_first().unwrap_list();
3665
3666        let mut row_123 = Row::default();
3667        row_123.packer().push_list_with(|p| {
3668            p.push(Datum::Int32(1));
3669            p.push(Datum::Int32(2));
3670            p.push(Datum::Int32(3));
3671        });
3672        let list_123 = row_123.unpack_first().unwrap_list();
3673
3674        // [1, 2] < [1, 3] due to the second element being different
3675        assert_eq!(list_12.cmp(&list_13), Ordering::Less);
3676        assert_eq!(list_13.cmp(&list_12), Ordering::Greater);
3677        assert_eq!(list_12.cmp(&list_12), Ordering::Equal);
3678        // shorter prefix compares less
3679        assert_eq!(list_12.cmp(&list_123), Ordering::Less);
3680    }
3681
3682    /// Hash must agree with Eq: equal maps must have the same hash.
3683    #[mz_ore::test]
3684    fn test_datum_map_hash_consistency() {
3685        let mut row_pos = Row::default();
3686        row_pos.packer().push_dict_with(|p| {
3687            p.push(Datum::String("x"));
3688            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3689        });
3690        let map_pos = row_pos.unpack_first().unwrap_map();
3691
3692        let mut row_neg = Row::default();
3693        row_neg.packer().push_dict_with(|p| {
3694            p.push(Datum::String("x"));
3695            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3696        });
3697        let map_neg = row_neg.unpack_first().unwrap_map();
3698
3699        assert_eq!(map_pos, map_neg);
3700        assert_eq!(
3701            hash(&map_pos),
3702            hash(&map_neg),
3703            "equal maps must have same hash"
3704        );
3705
3706        let mut row_a = Row::default();
3707        row_a.packer().push_dict_with(|p| {
3708            p.push(Datum::String("a"));
3709            p.push(Datum::Int32(1));
3710        });
3711        let map_a = row_a.unpack_first().unwrap_map();
3712
3713        let mut row_b = Row::default();
3714        row_b.packer().push_dict_with(|p| {
3715            p.push(Datum::String("a"));
3716            p.push(Datum::Int32(2));
3717        });
3718        let map_b = row_b.unpack_first().unwrap_map();
3719
3720        assert_ne!(map_a, map_b);
3721        assert_ne!(
3722            hash(&map_a),
3723            hash(&map_b),
3724            "unequal maps must have different hashes"
3725        );
3726    }
3727
3728    /// Ord/PartialOrd for DatumMap: less, equal, greater (by key then value).
3729    #[mz_ore::test]
3730    fn test_datum_map_ordering() {
3731        let mut row_a1 = Row::default();
3732        row_a1.packer().push_dict_with(|p| {
3733            p.push(Datum::String("a"));
3734            p.push(Datum::Int32(1));
3735        });
3736        let map_a1 = row_a1.unpack_first().unwrap_map();
3737
3738        let mut row_a2 = Row::default();
3739        row_a2.packer().push_dict_with(|p| {
3740            p.push(Datum::String("a"));
3741            p.push(Datum::Int32(2));
3742        });
3743        let map_a2 = row_a2.unpack_first().unwrap_map();
3744
3745        let mut row_b1 = Row::default();
3746        row_b1.packer().push_dict_with(|p| {
3747            p.push(Datum::String("b"));
3748            p.push(Datum::Int32(1));
3749        });
3750        let map_b1 = row_b1.unpack_first().unwrap_map();
3751
3752        assert_eq!(map_a1.cmp(&map_a2), Ordering::Less);
3753        assert_eq!(map_a2.cmp(&map_a1), Ordering::Greater);
3754        assert_eq!(map_a1.cmp(&map_a1), Ordering::Equal);
3755        assert_eq!(map_a1.cmp(&map_b1), Ordering::Less); // "a" < "b"
3756    }
3757
3758    /// Datum puts Null last in the enum so that nulls sort last (PostgreSQL default).
3759    /// This ordering is used when comparing DatumList/DatumMap (e.g. jsonb_agg tiebreaker).
3760    #[mz_ore::test]
3761    fn test_datum_list_and_map_null_sorts_last() {
3762        // DatumList: [1] < [null] so non-null sorts before null
3763        let mut row_list_1 = Row::default();
3764        row_list_1
3765            .packer()
3766            .push_list_with(|p| p.push(Datum::Int32(1)));
3767        let list_1 = row_list_1.unpack_first().unwrap_list();
3768
3769        let mut row_list_null = Row::default();
3770        row_list_null
3771            .packer()
3772            .push_list_with(|p| p.push(Datum::Null));
3773        let list_null = row_list_null.unpack_first().unwrap_list();
3774
3775        assert_eq!(list_1.cmp(&list_null), Ordering::Less);
3776        assert_eq!(list_null.cmp(&list_1), Ordering::Greater);
3777
3778        // DatumMap: {"k": 1} < {"k": null} so non-null sorts before null (same as jsonb_agg)
3779        let mut row_map_1 = Row::default();
3780        row_map_1.packer().push_dict_with(|p| {
3781            p.push(Datum::String("k"));
3782            p.push(Datum::Int32(1));
3783        });
3784        let map_1 = row_map_1.unpack_first().unwrap_map();
3785
3786        let mut row_map_null = Row::default();
3787        row_map_null.packer().push_dict_with(|p| {
3788            p.push(Datum::String("k"));
3789            p.push(Datum::Null);
3790        });
3791        let map_null = row_map_null.unpack_first().unwrap_map();
3792
3793        assert_eq!(map_1.cmp(&map_null), Ordering::Less);
3794        assert_eq!(map_null.cmp(&map_1), Ordering::Greater);
3795    }
3796}