mz_repr/
row.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::cell::{Cell, RefCell};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::mem::{size_of, transmute};
16use std::ops::Deref;
17use std::str;
18
19use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
20use compact_bytes::CompactBytes;
21use mz_ore::cast::{CastFrom, ReinterpretCast};
22use mz_ore::soft_assert_no_log;
23use mz_ore::vec::Vector;
24use mz_persist_types::Codec64;
25use num_enum::{IntoPrimitive, TryFromPrimitive};
26use ordered_float::OrderedFloat;
27use proptest::prelude::*;
28use proptest::strategy::{BoxedStrategy, Strategy};
29use serde::{Deserialize, Serialize};
30use uuid::Uuid;
31
32use crate::adt::array::{
33    Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
34};
35use crate::adt::date::Date;
36use crate::adt::interval::Interval;
37use crate::adt::mz_acl_item::{AclItem, MzAclItem};
38use crate::adt::numeric;
39use crate::adt::numeric::Numeric;
40use crate::adt::range::{
41    self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
42};
43use crate::adt::timestamp::CheckedTimestamp;
44use crate::scalar::{DatumKind, arb_datum};
45use crate::{Datum, RelationDesc, Timestamp};
46
47pub(crate) mod encode;
48pub mod iter;
49
50include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
51
52/// A packed representation for `Datum`s.
53///
54/// `Datum` is easy to work with but very space inefficient. A `Datum::Int32(42)`
55/// is laid out in memory like this:
56///
57///   tag: 3
58///   padding: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
59///   data: 0 0 0 42
60///   padding: 0 0 0 0 0 0 0 0 0 0 0 0
61///
62/// For a total of 32 bytes! The second set of padding is needed in case we were
63/// to write a 16-byte datum into this location. The first set of padding is
64/// needed to align that hypothetical decimal to a 16 bytes boundary.
65///
66/// A `Row` stores zero or more `Datum`s without any padding. We avoid the need
67/// for the first set of padding by only providing access to the `Datum`s via
68/// calls to `ptr::read_unaligned`, which on modern x86 is barely penalized. We
69/// avoid the need for the second set of padding by not providing mutable access
70/// to the `Datum`. Instead, `Row` is append-only.
71///
72/// A `Row` can be built from a collection of `Datum`s using `Row::pack`, but it
73/// is more efficient to use `Row::pack_slice` so that a right-sized allocation
74/// can be created. If that is not possible, consider using the row buffer
75/// pattern: allocate one row, pack into it, and then call [`Row::clone`] to
76/// receive a copy of that row, leaving behind the original allocation to pack
77/// future rows.
78///
79/// Creating a row via [`Row::pack_slice`]:
80///
81/// ```
82/// # use mz_repr::{Row, Datum};
83/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
84/// assert_eq!(row.unpack(), vec![Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)])
85/// ```
86///
87/// `Row`s can be unpacked by iterating over them:
88///
89/// ```
90/// # use mz_repr::{Row, Datum};
91/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
92/// assert_eq!(row.iter().nth(1).unwrap(), Datum::Int32(1));
93/// ```
94///
95/// If you want random access to the `Datum`s in a `Row`, use `Row::unpack` to create a `Vec<Datum>`
96/// ```
97/// # use mz_repr::{Row, Datum};
98/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
99/// let datums = row.unpack();
100/// assert_eq!(datums[1], Datum::Int32(1));
101/// ```
102///
103/// # Performance
104///
105/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
106/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
107/// avoids the allocations involved in `Row::new()`.
108#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
109pub struct Row {
110    data: CompactBytes,
111}
112
113impl Row {
114    const SIZE: usize = CompactBytes::MAX_INLINE;
115
116    /// A variant of `Row::from_proto` that allows for reuse of internal allocs
117    /// and validates the decoding against a provided [`RelationDesc`].
118    pub fn decode_from_proto(
119        &mut self,
120        proto: &ProtoRow,
121        desc: &RelationDesc,
122    ) -> Result<(), String> {
123        let mut packer = self.packer();
124        for (col_idx, _, _) in desc.iter_all() {
125            let d = match proto.datums.get(col_idx.to_raw()) {
126                Some(x) => x,
127                None => {
128                    packer.push(Datum::Null);
129                    continue;
130                }
131            };
132            packer.try_push_proto(d)?;
133        }
134
135        Ok(())
136    }
137
138    /// Allocate an empty `Row` with a pre-allocated capacity.
139    #[inline]
140    pub fn with_capacity(cap: usize) -> Self {
141        Self {
142            data: CompactBytes::with_capacity(cap),
143        }
144    }
145
146    /// Create an empty `Row`.
147    #[inline]
148    pub const fn empty() -> Self {
149        Self {
150            data: CompactBytes::empty(),
151        }
152    }
153
154    /// Creates a new row from supplied bytes.
155    ///
156    /// # Safety
157    ///
158    /// This method relies on `data` being an appropriate row encoding, and can
159    /// result in unsafety if this is not the case.
160    pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
161        Row {
162            data: CompactBytes::new(data),
163        }
164    }
165
166    /// Constructs a [`RowPacker`] that will pack datums into this row's
167    /// allocation.
168    ///
169    /// This method clears the existing contents of the row, but retains the
170    /// allocation.
171    pub fn packer(&mut self) -> RowPacker<'_> {
172        self.clear();
173        RowPacker { row: self }
174    }
175
176    /// Take some `Datum`s and pack them into a `Row`.
177    ///
178    /// This method builds a `Row` by repeatedly increasing the backing
179    /// allocation. If the contents of the iterator are known ahead of
180    /// time, consider [`Row::with_capacity`] to right-size the allocation
181    /// first, and then [`RowPacker::extend`] to populate it with `Datum`s.
182    /// This avoids the repeated allocation resizing and copying.
183    pub fn pack<'a, I, D>(iter: I) -> Row
184    where
185        I: IntoIterator<Item = D>,
186        D: Borrow<Datum<'a>>,
187    {
188        let mut row = Row::default();
189        row.packer().extend(iter);
190        row
191    }
192
193    /// Use `self` to pack `iter`, and then clone the result.
194    ///
195    /// This is a convenience method meant to reduce boilerplate around row
196    /// formation.
197    pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
198    where
199        I: IntoIterator<Item = D>,
200        D: Borrow<Datum<'a>>,
201    {
202        self.packer().extend(iter);
203        self.clone()
204    }
205
206    /// Like [`Row::pack`], but the provided iterator is allowed to produce an
207    /// error, in which case the packing operation is aborted and the error
208    /// returned.
209    pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
210    where
211        I: IntoIterator<Item = Result<D, E>>,
212        D: Borrow<Datum<'a>>,
213    {
214        let mut row = Row::default();
215        row.packer().try_extend(iter)?;
216        Ok(row)
217    }
218
219    /// Pack a slice of `Datum`s into a `Row`.
220    ///
221    /// This method has the advantage over `pack` that it can determine the required
222    /// allocation before packing the elements, ensuring only one allocation and no
223    /// redundant copies required.
224    pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
225        // Pre-allocate the needed number of bytes.
226        let mut row = Row::with_capacity(datums_size(slice.iter()));
227        row.packer().extend(slice.iter());
228        row
229    }
230
231    /// Returns the total amount of bytes used by this row.
232    pub fn byte_len(&self) -> usize {
233        let heap_size = if self.data.spilled() {
234            self.data.len()
235        } else {
236            0
237        };
238        let inline_size = std::mem::size_of::<Self>();
239        inline_size.saturating_add(heap_size)
240    }
241
242    /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself.
243    pub fn data_len(&self) -> usize {
244        self.data.len()
245    }
246
247    /// Returns the total capacity in bytes used by this row.
248    pub fn byte_capacity(&self) -> usize {
249        self.data.capacity()
250    }
251
252    /// Extracts a Row slice containing the entire [`Row`].
253    #[inline]
254    pub fn as_row_ref(&self) -> &RowRef {
255        RowRef::from_slice(self.data.as_slice())
256    }
257
258    /// Clear the contents of the [`Row`], leaving any allocation in place.
259    #[inline]
260    fn clear(&mut self) {
261        self.data.clear();
262    }
263}
264
265impl Borrow<RowRef> for Row {
266    #[inline]
267    fn borrow(&self) -> &RowRef {
268        self.as_row_ref()
269    }
270}
271
272impl AsRef<RowRef> for Row {
273    #[inline]
274    fn as_ref(&self) -> &RowRef {
275        self.as_row_ref()
276    }
277}
278
279impl Deref for Row {
280    type Target = RowRef;
281
282    #[inline]
283    fn deref(&self) -> &Self::Target {
284        self.as_row_ref()
285    }
286}
287
288// Nothing depends on Row being exactly 24, we just want to add visibility to the size.
289static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
290
291impl Clone for Row {
292    fn clone(&self) -> Self {
293        Row {
294            data: self.data.clone(),
295        }
296    }
297
298    fn clone_from(&mut self, source: &Self) {
299        self.data.clone_from(&source.data);
300    }
301}
302
303// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
304impl std::hash::Hash for Row {
305    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
306        self.as_row_ref().hash(state)
307    }
308}
309
310impl Arbitrary for Row {
311    type Parameters = prop::collection::SizeRange;
312    type Strategy = BoxedStrategy<Row>;
313
314    fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
315        prop::collection::vec(arb_datum(), size)
316            .prop_map(|items| {
317                let mut row = Row::default();
318                let mut packer = row.packer();
319                for item in items.iter() {
320                    let datum: Datum<'_> = item.into();
321                    packer.push(datum);
322                }
323                row
324            })
325            .boxed()
326    }
327}
328
329impl PartialOrd for Row {
330    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
331        Some(self.cmp(other))
332    }
333}
334
335impl Ord for Row {
336    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
337        self.as_ref().cmp(other.as_ref())
338    }
339}
340
341#[allow(missing_debug_implementations)]
342mod columnation {
343    use columnation::{Columnation, Region};
344    use mz_ore::region::LgAllocRegion;
345
346    use crate::Row;
347
348    /// Region allocation for `Row` data.
349    ///
350    /// Content bytes are stored in stable contiguous memory locations,
351    /// and then a `Row` referencing them is falsified.
352    pub struct RowStack {
353        region: LgAllocRegion<u8>,
354    }
355
356    impl RowStack {
357        const LIMIT: usize = 2 << 20;
358    }
359
360    // Implement `Default` manually to specify a region allocation limit.
361    impl Default for RowStack {
362        fn default() -> Self {
363            Self {
364                // Limit the region size to 2MiB.
365                region: LgAllocRegion::with_limit(Self::LIMIT),
366            }
367        }
368    }
369
370    impl Columnation for Row {
371        type InnerRegion = RowStack;
372    }
373
374    impl Region for RowStack {
375        type Item = Row;
376        #[inline]
377        fn clear(&mut self) {
378            self.region.clear();
379        }
380        #[inline(always)]
381        unsafe fn copy(&mut self, item: &Row) -> Row {
382            if item.data.spilled() {
383                let bytes = self.region.copy_slice(&item.data[..]);
384                Row {
385                    data: compact_bytes::CompactBytes::from_raw_parts(
386                        bytes.as_mut_ptr(),
387                        item.data.len(),
388                        item.data.capacity(),
389                    ),
390                }
391            } else {
392                item.clone()
393            }
394        }
395
396        fn reserve_items<'a, I>(&mut self, items: I)
397        where
398            Self: 'a,
399            I: Iterator<Item = &'a Self::Item> + Clone,
400        {
401            let size = items
402                .filter(|row| row.data.spilled())
403                .map(|row| row.data.len())
404                .sum();
405            let size = std::cmp::min(size, Self::LIMIT);
406            self.region.reserve(size);
407        }
408
409        fn reserve_regions<'a, I>(&mut self, regions: I)
410        where
411            Self: 'a,
412            I: Iterator<Item = &'a Self> + Clone,
413        {
414            let size = regions.map(|r| r.region.len()).sum();
415            let size = std::cmp::min(size, Self::LIMIT);
416            self.region.reserve(size);
417        }
418
419        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
420            self.region.heap_size(callback)
421        }
422    }
423}
424
425mod columnar {
426    use columnar::common::PushIndexAs;
427    use columnar::{
428        AsBytes, Borrow, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
429    };
430    use mz_ore::cast::CastFrom;
431    use std::ops::Range;
432
433    use crate::{Row, RowRef};
434
435    #[derive(Copy, Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
436    pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
437        /// Bounds container; provides indexed access to offsets.
438        pub bounds: BC,
439        /// Values container; provides slice access to bytes.
440        pub values: VC,
441    }
442
443    impl Columnar for Row {
444        #[inline(always)]
445        fn copy_from(&mut self, other: columnar::Ref<'_, Self>) {
446            self.clear();
447            self.data.extend_from_slice(other.data());
448        }
449        #[inline(always)]
450        fn into_owned(other: columnar::Ref<'_, Self>) -> Self {
451            other.to_owned()
452        }
453        type Container = Rows;
454        #[inline(always)]
455        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
456        where
457            Self: 'a,
458        {
459            thing
460        }
461    }
462
463    impl<BC: PushIndexAs<u64>> Borrow for Rows<BC, Vec<u8>> {
464        type Ref<'a> = &'a RowRef;
465        type Borrowed<'a>
466            = Rows<BC::Borrowed<'a>, &'a [u8]>
467        where
468            Self: 'a;
469        #[inline(always)]
470        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
471            Rows {
472                bounds: self.bounds.borrow(),
473                values: self.values.borrow(),
474            }
475        }
476        #[inline(always)]
477        fn reborrow<'c, 'a: 'c>(item: Self::Borrowed<'a>) -> Self::Borrowed<'c>
478        where
479            Self: 'a,
480        {
481            Rows {
482                bounds: BC::reborrow(item.bounds),
483                values: item.values,
484            }
485        }
486
487        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
488        where
489            Self: 'a,
490        {
491            item
492        }
493    }
494
495    impl<BC: PushIndexAs<u64>> Container for Rows<BC, Vec<u8>> {
496        fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
497            if !range.is_empty() {
498                // Imported bounds will be relative to this starting offset.
499                let values_len: u64 = self.values.len().try_into().expect("must fit");
500
501                // Push all bytes that we can, all at once.
502                let other_lower = if range.start == 0 {
503                    0
504                } else {
505                    other.bounds.index_as(range.start - 1)
506                };
507                let other_upper = other.bounds.index_as(range.end - 1);
508                self.values.extend_from_self(
509                    other.values,
510                    usize::try_from(other_lower).expect("must fit")
511                        ..usize::try_from(other_upper).expect("must fit"),
512                );
513
514                // Each bound needs to be shifted by `values_len - other_lower`.
515                if values_len == other_lower {
516                    self.bounds.extend_from_self(other.bounds, range);
517                } else {
518                    for index in range {
519                        let shifted = other.bounds.index_as(index) - other_lower + values_len;
520                        self.bounds.push(&shifted)
521                    }
522                }
523            }
524        }
525        fn reserve_for<'a, I>(&mut self, selves: I)
526        where
527            Self: 'a,
528            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
529        {
530            self.bounds.reserve_for(selves.clone().map(|r| r.bounds));
531            self.values.reserve_for(selves.map(|r| r.values));
532        }
533    }
534
535    impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
536        #[inline(always)]
537        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
538            columnar::chain(self.bounds.as_bytes(), self.values.as_bytes())
539        }
540    }
541    impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
542        #[inline(always)]
543        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
544            Self {
545                bounds: FromBytes::from_bytes(bytes),
546                values: FromBytes::from_bytes(bytes),
547            }
548        }
549    }
550
551    impl<BC: Len, VC> Len for Rows<BC, VC> {
552        #[inline(always)]
553        fn len(&self) -> usize {
554            self.bounds.len()
555        }
556    }
557
558    impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
559        type Ref = &'a RowRef;
560        #[inline(always)]
561        fn get(&self, index: usize) -> Self::Ref {
562            let lower = if index == 0 {
563                0
564            } else {
565                self.bounds.index_as(index - 1)
566            };
567            let upper = self.bounds.index_as(index);
568            let lower = usize::cast_from(lower);
569            let upper = usize::cast_from(upper);
570            RowRef::from_slice(&self.values[lower..upper])
571        }
572    }
573    impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
574        type Ref = &'a RowRef;
575        #[inline(always)]
576        fn get(&self, index: usize) -> Self::Ref {
577            let lower = if index == 0 {
578                0
579            } else {
580                self.bounds.index_as(index - 1)
581            };
582            let upper = self.bounds.index_as(index);
583            let lower = usize::cast_from(lower);
584            let upper = usize::cast_from(upper);
585            RowRef::from_slice(&self.values[lower..upper])
586        }
587    }
588
589    impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
590        #[inline(always)]
591        fn push(&mut self, item: &Row) {
592            self.values.extend_from_slice(item.data.as_slice());
593            self.bounds.push(u64::cast_from(self.values.len()));
594        }
595    }
596    impl<BC: for<'a> Push<&'a u64>> Push<&RowRef> for Rows<BC> {
597        #[inline(always)]
598        fn push(&mut self, item: &RowRef) {
599            self.values.extend_from_slice(item.data());
600            self.bounds.push(&u64::cast_from(self.values.len()));
601        }
602    }
603    impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
604        #[inline(always)]
605        fn clear(&mut self) {
606            self.bounds.clear();
607            self.values.clear();
608        }
609    }
610    impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
611        #[inline(always)]
612        fn heap_size(&self) -> (usize, usize) {
613            let (l0, c0) = self.bounds.heap_size();
614            let (l1, c1) = self.values.heap_size();
615            (l0 + l1, c0 + c1)
616        }
617    }
618}
619
620/// A contiguous slice of bytes that are row data.
621///
622/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
623#[derive(PartialEq, Eq, Hash)]
624#[repr(transparent)]
625pub struct RowRef([u8]);
626
627impl RowRef {
628    /// Create a [`RowRef`] from a slice of data.
629    ///
630    /// We do not check that the provided slice is valid [`Row`] data, will panic on read
631    /// if the data is invalid.
632    pub fn from_slice(row: &[u8]) -> &RowRef {
633        #[allow(clippy::as_conversions)]
634        let ptr = row as *const [u8] as *const RowRef;
635        // SAFETY: We know `ptr` is non-null and aligned because it came from a &[u8].
636        unsafe { &*ptr }
637    }
638
639    /// Unpack `self` into a `Vec<Datum>` for efficient random access.
640    pub fn unpack(&self) -> Vec<Datum<'_>> {
641        // It's usually cheaper to unpack twice to figure out the right length than it is to grow the vec as we go
642        let len = self.iter().count();
643        let mut vec = Vec::with_capacity(len);
644        vec.extend(self.iter());
645        vec
646    }
647
648    /// Return the first [`Datum`] in `self`
649    ///
650    /// Panics if the [`RowRef`] is empty.
651    pub fn unpack_first(&self) -> Datum<'_> {
652        self.iter().next().unwrap()
653    }
654
655    /// Iterate the [`Datum`] elements of the [`RowRef`].
656    pub fn iter(&self) -> DatumListIter<'_> {
657        DatumListIter { data: &self.0 }
658    }
659
660    /// Return the byte length of this [`RowRef`].
661    pub fn byte_len(&self) -> usize {
662        self.0.len()
663    }
664
665    /// For debugging only.
666    pub fn data(&self) -> &[u8] {
667        &self.0
668    }
669
670    /// True iff there is no data in this [`RowRef`].
671    pub fn is_empty(&self) -> bool {
672        self.0.is_empty()
673    }
674}
675
676impl ToOwned for RowRef {
677    type Owned = Row;
678
679    fn to_owned(&self) -> Self::Owned {
680        // SAFETY: RowRef has the invariant that the wrapped data must be a valid Row encoding.
681        unsafe { Row::from_bytes_unchecked(&self.0) }
682    }
683}
684
685impl<'a> IntoIterator for &'a RowRef {
686    type Item = Datum<'a>;
687    type IntoIter = DatumListIter<'a>;
688
689    fn into_iter(self) -> DatumListIter<'a> {
690        DatumListIter { data: &self.0 }
691    }
692}
693
694/// These implementations order first by length, and then by slice contents.
695/// This allows many comparisons to complete without dereferencing memory.
696/// Warning: These order by the u8 array representation, and NOT by Datum::cmp.
697impl PartialOrd for RowRef {
698    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
699        Some(self.cmp(other))
700    }
701}
702
703impl Ord for RowRef {
704    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
705        match self.0.len().cmp(&other.0.len()) {
706            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
707            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
708            std::cmp::Ordering::Equal => self.0.cmp(&other.0),
709        }
710    }
711}
712
713impl fmt::Debug for RowRef {
714    /// Debug representation using the internal datums
715    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
716        f.write_str("RowRef{")?;
717        f.debug_list().entries(self.into_iter()).finish()?;
718        f.write_str("}")
719    }
720}
721
722/// Packs datums into a [`Row`].
723///
724/// Creating a `RowPacker` via [`Row::packer`] starts a packing operation on the
725/// row. A packing operation always starts from scratch: the existing contents
726/// of the underlying row are cleared.
727///
728/// To complete a packing operation, drop the `RowPacker`.
729#[derive(Debug)]
730pub struct RowPacker<'a> {
731    row: &'a mut Row,
732}
733
734#[derive(Debug, Clone)]
735pub struct DatumListIter<'a> {
736    data: &'a [u8],
737}
738
739#[derive(Debug, Clone)]
740pub struct DatumDictIter<'a> {
741    data: &'a [u8],
742    prev_key: Option<&'a str>,
743}
744
745/// `RowArena` is used to hold on to temporary `Row`s for functions like `eval` that need to create complex `Datum`s but don't have a `Row` to put them in yet.
746#[derive(Debug)]
747pub struct RowArena {
748    // Semantically, this field would be better represented by a `Vec<Box<[u8]>>`,
749    // as once the arena takes ownership of a byte vector the vector is never
750    // modified. But `RowArena::push_bytes` takes ownership of a `Vec<u8>`, so
751    // storing that `Vec<u8>` directly avoids an allocation. The cost is
752    // additional memory use, as the vector may have spare capacity, but row
753    // arenas are short lived so this is the better tradeoff.
754    inner: RefCell<Vec<Vec<u8>>>,
755}
756
757// DatumList and DatumDict defined here rather than near Datum because we need private access to the unsafe data field
758
759/// A sequence of Datums
760#[derive(Clone, Copy, Eq, PartialEq, Hash)]
761pub struct DatumList<'a> {
762    /// Points at the serialized datums
763    data: &'a [u8],
764}
765
766impl<'a> Debug for DatumList<'a> {
767    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
768        f.debug_list().entries(self.iter()).finish()
769    }
770}
771
772impl Ord for DatumList<'_> {
773    fn cmp(&self, other: &DatumList) -> Ordering {
774        self.iter().cmp(other.iter())
775    }
776}
777
778impl PartialOrd for DatumList<'_> {
779    fn partial_cmp(&self, other: &DatumList) -> Option<Ordering> {
780        Some(self.cmp(other))
781    }
782}
783
784/// A mapping from string keys to Datums
785#[derive(Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
786pub struct DatumMap<'a> {
787    /// Points at the serialized datums, which should be sorted in key order
788    data: &'a [u8],
789}
790
791/// Represents a single `Datum`, appropriate to be nested inside other
792/// `Datum`s.
793#[derive(Clone, Copy, Eq, PartialEq, Hash)]
794pub struct DatumNested<'a> {
795    val: &'a [u8],
796}
797
798impl<'a> std::fmt::Display for DatumNested<'a> {
799    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
800        std::fmt::Display::fmt(&self.datum(), f)
801    }
802}
803
804impl<'a> std::fmt::Debug for DatumNested<'a> {
805    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
806        f.debug_struct("DatumNested")
807            .field("val", &self.datum())
808            .finish()
809    }
810}
811
812impl<'a> DatumNested<'a> {
813    // Figure out which bytes `read_datum` returns (e.g. including the tag),
814    // and then store a reference to those bytes, so we can "replay" this same
815    // call later on without storing the datum itself.
816    pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
817        let prev = *data;
818        let _ = unsafe { read_datum(data) };
819        DatumNested {
820            val: &prev[..(prev.len() - data.len())],
821        }
822    }
823
824    /// Returns the datum `self` contains.
825    pub fn datum(&self) -> Datum<'a> {
826        let mut temp = self.val;
827        unsafe { read_datum(&mut temp) }
828    }
829}
830
831impl<'a> Ord for DatumNested<'a> {
832    fn cmp(&self, other: &Self) -> Ordering {
833        self.datum().cmp(&other.datum())
834    }
835}
836
837impl<'a> PartialOrd for DatumNested<'a> {
838    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
839        Some(self.cmp(other))
840    }
841}
842
843// Prefer adding new tags to the end of the enum. Certain behavior, like row ordering and EXPLAIN
844// PHYSICAL PLAN, rely on the ordering of this enum. Neither of these are breaking changes, but
845// it's annoying when they change.
846#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
847#[repr(u8)]
848enum Tag {
849    Null,
850    False,
851    True,
852    Int16,
853    Int32,
854    Int64,
855    UInt8,
856    UInt32,
857    Float32,
858    Float64,
859    Date,
860    Time,
861    Timestamp,
862    TimestampTz,
863    Interval,
864    BytesTiny,
865    BytesShort,
866    BytesLong,
867    BytesHuge,
868    StringTiny,
869    StringShort,
870    StringLong,
871    StringHuge,
872    Uuid,
873    Array,
874    ListTiny,
875    ListShort,
876    ListLong,
877    ListHuge,
878    Dict,
879    JsonNull,
880    Dummy,
881    Numeric,
882    UInt16,
883    UInt64,
884    MzTimestamp,
885    Range,
886    MzAclItem,
887    AclItem,
888    // Everything except leap seconds and times beyond the range of
889    // i64 nanoseconds. (Note that Materialize does not support leap
890    // seconds, but this module does).
891    CheapTimestamp,
892    // Everything except leap seconds and times beyond the range of
893    // i64 nanoseconds. (Note that Materialize does not support leap
894    // seconds, but this module does).
895    CheapTimestampTz,
896    // The next several tags are for variable-length signed integer encoding.
897    // The basic idea is that `NonNegativeIntN_K` is used to encode a datum of type
898    // IntN whose actual value is positive or zero and fits in K bits, and similarly for
899    // NegativeIntN_K with negative values.
900    //
901    // The order of these tags matters, because we want to be able to choose the
902    // tag for a given datum quickly, with arithmetic, rather than slowly, with a
903    // stack of `if` statements.
904    //
905    // Separate tags for non-negative and negative numbers are used to avoid having to
906    // waste one bit in the actual data space to encode the sign.
907    NonNegativeInt16_0, // i.e., 0
908    NonNegativeInt16_8,
909    NonNegativeInt16_16,
910
911    NonNegativeInt32_0,
912    NonNegativeInt32_8,
913    NonNegativeInt32_16,
914    NonNegativeInt32_24,
915    NonNegativeInt32_32,
916
917    NonNegativeInt64_0,
918    NonNegativeInt64_8,
919    NonNegativeInt64_16,
920    NonNegativeInt64_24,
921    NonNegativeInt64_32,
922    NonNegativeInt64_40,
923    NonNegativeInt64_48,
924    NonNegativeInt64_56,
925    NonNegativeInt64_64,
926
927    NegativeInt16_0, // i.e., -1
928    NegativeInt16_8,
929    NegativeInt16_16,
930
931    NegativeInt32_0,
932    NegativeInt32_8,
933    NegativeInt32_16,
934    NegativeInt32_24,
935    NegativeInt32_32,
936
937    NegativeInt64_0,
938    NegativeInt64_8,
939    NegativeInt64_16,
940    NegativeInt64_24,
941    NegativeInt64_32,
942    NegativeInt64_40,
943    NegativeInt64_48,
944    NegativeInt64_56,
945    NegativeInt64_64,
946
947    // These are like the ones above, but for unsigned types. The
948    // situation is slightly simpler as we don't have negatives.
949    UInt8_0, // i.e., 0
950    UInt8_8,
951
952    UInt16_0,
953    UInt16_8,
954    UInt16_16,
955
956    UInt32_0,
957    UInt32_8,
958    UInt32_16,
959    UInt32_24,
960    UInt32_32,
961
962    UInt64_0,
963    UInt64_8,
964    UInt64_16,
965    UInt64_24,
966    UInt64_32,
967    UInt64_40,
968    UInt64_48,
969    UInt64_56,
970    UInt64_64,
971}
972
973impl Tag {
974    fn actual_int_length(self) -> Option<usize> {
975        use Tag::*;
976        let val = match self {
977            NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
978            | UInt32_0 | UInt64_0 => 0,
979            NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
980            | UInt32_8 | UInt64_8 => 1,
981            NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
982            | UInt32_16 | UInt64_16 => 2,
983            NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
984            NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
985            NonNegativeInt64_40 | UInt64_40 => 5,
986            NonNegativeInt64_48 | UInt64_48 => 6,
987            NonNegativeInt64_56 | UInt64_56 => 7,
988            NonNegativeInt64_64 | UInt64_64 => 8,
989            NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
990            NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
991            NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
992            NegativeInt32_24 | NegativeInt64_24 => 3,
993            NegativeInt32_32 | NegativeInt64_32 => 4,
994            NegativeInt64_40 => 5,
995            NegativeInt64_48 => 6,
996            NegativeInt64_56 => 7,
997            NegativeInt64_64 => 8,
998
999            _ => return None,
1000        };
1001        Some(val)
1002    }
1003}
1004
1005// --------------------------------------------------------------------------------
1006// reading data
1007
1008/// Read a byte slice starting at byte `offset`.
1009///
1010/// Updates `offset` to point to the first byte after the end of the read region.
1011fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
1012    let len = u64::from_le_bytes(read_byte_array(data));
1013    let len = usize::cast_from(len);
1014    let (bytes, next) = data.split_at(len);
1015    *data = next;
1016    bytes
1017}
1018
1019/// Read a data whose length is encoded in the row before its contents.
1020///
1021/// Updates `offset` to point to the first byte after the end of the read region.
1022///
1023/// # Safety
1024///
1025/// This function is safe if the datum's length and contents were previously written by `push_lengthed_bytes`,
1026/// and it was only written with a `String` tag if it was indeed UTF-8.
1027unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
1028    let len = match tag {
1029        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
1030        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1031            usize::from(u16::from_le_bytes(read_byte_array(data)))
1032        }
1033        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1034            usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
1035        }
1036        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1037            usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
1038        }
1039        _ => unreachable!(),
1040    };
1041    let (bytes, next) = data.split_at(len);
1042    *data = next;
1043    match tag {
1044        Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
1045        Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
1046            Datum::String(str::from_utf8_unchecked(bytes))
1047        }
1048        Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
1049            Datum::List(DatumList { data: bytes })
1050        }
1051        _ => unreachable!(),
1052    }
1053}
1054
1055fn read_byte(data: &mut &[u8]) -> u8 {
1056    let byte = data[0];
1057    *data = &data[1..];
1058    byte
1059}
1060
1061/// Read `length` bytes from `data` at `offset`, updating the
1062/// latter. Extend the resulting buffer to an array of `N` bytes by
1063/// inserting `FILL` in the k most significant bytes, where k = N - length.
1064///
1065/// SAFETY:
1066///   * length <= N
1067///   * offset + length <= data.len()
1068fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1069    data: &mut &[u8],
1070    length: usize,
1071) -> [u8; N] {
1072    let mut raw = [FILL; N];
1073    let (prev, next) = data.split_at(length);
1074    (raw[..prev.len()]).copy_from_slice(prev);
1075    *data = next;
1076    raw
1077}
1078/// Read `length` bytes from `data` at `offset`, updating the
1079/// latter. Extend the resulting buffer to a negative `N`-byte
1080/// twos complement integer by filling the remaining bits with 1.
1081///
1082/// SAFETY:
1083///   * length <= N
1084///   * offset + length <= data.len()
1085fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1086    read_byte_array_sign_extending::<N, 255>(data, length)
1087}
1088
1089/// Read `length` bytes from `data` at `offset`, updating the
1090/// latter. Extend the resulting buffer to a positive or zero `N`-byte
1091/// twos complement integer by filling the remaining bits with 0.
1092///
1093/// SAFETY:
1094///   * length <= N
1095///   * offset + length <= data.len()
1096fn read_byte_array_extending_nonnegative<const N: usize>(
1097    data: &mut &[u8],
1098    length: usize,
1099) -> [u8; N] {
1100    read_byte_array_sign_extending::<N, 0>(data, length)
1101}
1102
1103pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1104    let (prev, next) = data.split_first_chunk().unwrap();
1105    *data = next;
1106    *prev
1107}
1108
1109pub(super) fn read_date(data: &mut &[u8]) -> Date {
1110    let days = i32::from_le_bytes(read_byte_array(data));
1111    Date::from_pg_epoch(days).expect("unexpected date")
1112}
1113
1114pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1115    let year = i32::from_le_bytes(read_byte_array(data));
1116    let ordinal = u32::from_le_bytes(read_byte_array(data));
1117    NaiveDate::from_yo_opt(year, ordinal).unwrap()
1118}
1119
1120pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1121    let secs = u32::from_le_bytes(read_byte_array(data));
1122    let nanos = u32::from_le_bytes(read_byte_array(data));
1123    NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1124}
1125
1126/// Read a datum starting at byte `offset`.
1127///
1128/// Updates `offset` to point to the first byte after the end of the read region.
1129///
1130/// # Safety
1131///
1132/// This function is safe if a `Datum` was previously written at this offset by `push_datum`.
1133/// Otherwise it could return invalid values, which is Undefined Behavior.
1134pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1135    let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1136    match tag {
1137        Tag::Null => Datum::Null,
1138        Tag::False => Datum::False,
1139        Tag::True => Datum::True,
1140        Tag::UInt8_0 | Tag::UInt8_8 => {
1141            let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1142                data,
1143                tag.actual_int_length()
1144                    .expect("returns a value for variable-length-encoded integer tags"),
1145            ));
1146            Datum::UInt8(i)
1147        }
1148        Tag::Int16 => {
1149            let i = i16::from_le_bytes(read_byte_array(data));
1150            Datum::Int16(i)
1151        }
1152        Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1153            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1154            // and `data` is big enough because it was encoded validly. These assumptions
1155            // are checked in debug asserts.
1156            let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1157                data,
1158                tag.actual_int_length()
1159                    .expect("returns a value for variable-length-encoded integer tags"),
1160            ));
1161            Datum::Int16(i)
1162        }
1163        Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1164            let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1165                data,
1166                tag.actual_int_length()
1167                    .expect("returns a value for variable-length-encoded integer tags"),
1168            ));
1169            Datum::UInt16(i)
1170        }
1171        Tag::Int32 => {
1172            let i = i32::from_le_bytes(read_byte_array(data));
1173            Datum::Int32(i)
1174        }
1175        Tag::NonNegativeInt32_0
1176        | Tag::NonNegativeInt32_32
1177        | Tag::NonNegativeInt32_8
1178        | Tag::NonNegativeInt32_16
1179        | Tag::NonNegativeInt32_24 => {
1180            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1181            // and `data` is big enough because it was encoded validly. These assumptions
1182            // are checked in debug asserts.
1183            let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1184                data,
1185                tag.actual_int_length()
1186                    .expect("returns a value for variable-length-encoded integer tags"),
1187            ));
1188            Datum::Int32(i)
1189        }
1190        Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1191            let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1192                data,
1193                tag.actual_int_length()
1194                    .expect("returns a value for variable-length-encoded integer tags"),
1195            ));
1196            Datum::UInt32(i)
1197        }
1198        Tag::Int64 => {
1199            let i = i64::from_le_bytes(read_byte_array(data));
1200            Datum::Int64(i)
1201        }
1202        Tag::NonNegativeInt64_0
1203        | Tag::NonNegativeInt64_64
1204        | Tag::NonNegativeInt64_8
1205        | Tag::NonNegativeInt64_16
1206        | Tag::NonNegativeInt64_24
1207        | Tag::NonNegativeInt64_32
1208        | Tag::NonNegativeInt64_40
1209        | Tag::NonNegativeInt64_48
1210        | Tag::NonNegativeInt64_56 => {
1211            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1212            // and `data` is big enough because it was encoded validly. These assumptions
1213            // are checked in debug asserts.
1214
1215            let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1216                data,
1217                tag.actual_int_length()
1218                    .expect("returns a value for variable-length-encoded integer tags"),
1219            ));
1220            Datum::Int64(i)
1221        }
1222        Tag::UInt64_0
1223        | Tag::UInt64_8
1224        | Tag::UInt64_16
1225        | Tag::UInt64_24
1226        | Tag::UInt64_32
1227        | Tag::UInt64_40
1228        | Tag::UInt64_48
1229        | Tag::UInt64_56
1230        | Tag::UInt64_64 => {
1231            let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1232                data,
1233                tag.actual_int_length()
1234                    .expect("returns a value for variable-length-encoded integer tags"),
1235            ));
1236            Datum::UInt64(i)
1237        }
1238        Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1239            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1240            // and `data` is big enough because it was encoded validly. These assumptions
1241            // are checked in debug asserts.
1242            let i = i16::from_le_bytes(read_byte_array_extending_negative(
1243                data,
1244                tag.actual_int_length()
1245                    .expect("returns a value for variable-length-encoded integer tags"),
1246            ));
1247            Datum::Int16(i)
1248        }
1249        Tag::NegativeInt32_0
1250        | Tag::NegativeInt32_32
1251        | Tag::NegativeInt32_8
1252        | Tag::NegativeInt32_16
1253        | Tag::NegativeInt32_24 => {
1254            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1255            // and `data` is big enough because it was encoded validly. These assumptions
1256            // are checked in debug asserts.
1257            let i = i32::from_le_bytes(read_byte_array_extending_negative(
1258                data,
1259                tag.actual_int_length()
1260                    .expect("returns a value for variable-length-encoded integer tags"),
1261            ));
1262            Datum::Int32(i)
1263        }
1264        Tag::NegativeInt64_0
1265        | Tag::NegativeInt64_64
1266        | Tag::NegativeInt64_8
1267        | Tag::NegativeInt64_16
1268        | Tag::NegativeInt64_24
1269        | Tag::NegativeInt64_32
1270        | Tag::NegativeInt64_40
1271        | Tag::NegativeInt64_48
1272        | Tag::NegativeInt64_56 => {
1273            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1274            // and `data` is big enough because the row was encoded validly. These assumptions
1275            // are checked in debug asserts.
1276            let i = i64::from_le_bytes(read_byte_array_extending_negative(
1277                data,
1278                tag.actual_int_length()
1279                    .expect("returns a value for variable-length-encoded integer tags"),
1280            ));
1281            Datum::Int64(i)
1282        }
1283
1284        Tag::UInt8 => {
1285            let i = u8::from_le_bytes(read_byte_array(data));
1286            Datum::UInt8(i)
1287        }
1288        Tag::UInt16 => {
1289            let i = u16::from_le_bytes(read_byte_array(data));
1290            Datum::UInt16(i)
1291        }
1292        Tag::UInt32 => {
1293            let i = u32::from_le_bytes(read_byte_array(data));
1294            Datum::UInt32(i)
1295        }
1296        Tag::UInt64 => {
1297            let i = u64::from_le_bytes(read_byte_array(data));
1298            Datum::UInt64(i)
1299        }
1300        Tag::Float32 => {
1301            let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1302            Datum::Float32(OrderedFloat::from(f))
1303        }
1304        Tag::Float64 => {
1305            let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1306            Datum::Float64(OrderedFloat::from(f))
1307        }
1308        Tag::Date => Datum::Date(read_date(data)),
1309        Tag::Time => Datum::Time(read_time(data)),
1310        Tag::CheapTimestamp => {
1311            let ts = i64::from_le_bytes(read_byte_array(data));
1312            let secs = ts.div_euclid(1_000_000_000);
1313            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1314            let ndt = DateTime::from_timestamp(secs, nsecs)
1315                .expect("We only write round-trippable timestamps")
1316                .naive_utc();
1317            Datum::Timestamp(
1318                CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1319            )
1320        }
1321        Tag::CheapTimestampTz => {
1322            let ts = i64::from_le_bytes(read_byte_array(data));
1323            let secs = ts.div_euclid(1_000_000_000);
1324            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1325            let dt = DateTime::from_timestamp(secs, nsecs)
1326                .expect("We only write round-trippable timestamps");
1327            Datum::TimestampTz(
1328                CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1329            )
1330        }
1331        Tag::Timestamp => {
1332            let date = read_naive_date(data);
1333            let time = read_time(data);
1334            Datum::Timestamp(
1335                CheckedTimestamp::from_timestamplike(date.and_time(time))
1336                    .expect("unexpected timestamp"),
1337            )
1338        }
1339        Tag::TimestampTz => {
1340            let date = read_naive_date(data);
1341            let time = read_time(data);
1342            Datum::TimestampTz(
1343                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1344                    date.and_time(time),
1345                    Utc,
1346                ))
1347                .expect("unexpected timestamptz"),
1348            )
1349        }
1350        Tag::Interval => {
1351            let months = i32::from_le_bytes(read_byte_array(data));
1352            let days = i32::from_le_bytes(read_byte_array(data));
1353            let micros = i64::from_le_bytes(read_byte_array(data));
1354            Datum::Interval(Interval {
1355                months,
1356                days,
1357                micros,
1358            })
1359        }
1360        Tag::BytesTiny
1361        | Tag::BytesShort
1362        | Tag::BytesLong
1363        | Tag::BytesHuge
1364        | Tag::StringTiny
1365        | Tag::StringShort
1366        | Tag::StringLong
1367        | Tag::StringHuge
1368        | Tag::ListTiny
1369        | Tag::ListShort
1370        | Tag::ListLong
1371        | Tag::ListHuge => read_lengthed_datum(data, tag),
1372        Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1373        Tag::Array => {
1374            // See the comment in `Row::push_array` for details on the encoding
1375            // of arrays.
1376            let ndims = read_byte(data);
1377            let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1378            let (dims, next) = data.split_at(dims_size);
1379            *data = next;
1380            let bytes = read_untagged_bytes(data);
1381            Datum::Array(Array {
1382                dims: ArrayDimensions { data: dims },
1383                elements: DatumList { data: bytes },
1384            })
1385        }
1386        Tag::Dict => {
1387            let bytes = read_untagged_bytes(data);
1388            Datum::Map(DatumMap { data: bytes })
1389        }
1390        Tag::JsonNull => Datum::JsonNull,
1391        Tag::Dummy => Datum::Dummy,
1392        Tag::Numeric => {
1393            let digits = read_byte(data).into();
1394            let exponent = i8::reinterpret_cast(read_byte(data));
1395            let bits = read_byte(data);
1396
1397            let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1398            let lsu_u8_len = lsu_u16_len * 2;
1399            let (lsu_u8, next) = data.split_at(lsu_u8_len);
1400            *data = next;
1401
1402            // TODO: if we refactor the decimal library to accept the owned
1403            // array as a parameter to `from_raw_parts` below, we could likely
1404            // avoid a copy because it is exactly the value we want
1405            let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1406            for (i, c) in lsu_u8.chunks(2).enumerate() {
1407                lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1408            }
1409
1410            let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1411            Datum::from(d)
1412        }
1413        Tag::MzTimestamp => {
1414            let t = Timestamp::decode(read_byte_array(data));
1415            Datum::MzTimestamp(t)
1416        }
1417        Tag::Range => {
1418            // See notes on `push_range_with` for details about encoding.
1419            let flag_byte = read_byte(data);
1420            let flags = range::InternalFlags::from_bits(flag_byte)
1421                .expect("range flags must be encoded validly");
1422
1423            if flags.contains(range::InternalFlags::EMPTY) {
1424                assert!(
1425                    flags == range::InternalFlags::EMPTY,
1426                    "empty ranges contain only RANGE_EMPTY flag"
1427                );
1428
1429                return Datum::Range(Range { inner: None });
1430            }
1431
1432            let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1433                None
1434            } else {
1435                Some(DatumNested::extract(data))
1436            };
1437
1438            let lower = RangeBound {
1439                inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1440                bound: lower_bound,
1441            };
1442
1443            let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1444                None
1445            } else {
1446                Some(DatumNested::extract(data))
1447            };
1448
1449            let upper = RangeBound {
1450                inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1451                bound: upper_bound,
1452            };
1453
1454            Datum::Range(Range {
1455                inner: Some(RangeInner { lower, upper }),
1456            })
1457        }
1458        Tag::MzAclItem => {
1459            const N: usize = MzAclItem::binary_size();
1460            let mz_acl_item =
1461                MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1462            Datum::MzAclItem(mz_acl_item)
1463        }
1464        Tag::AclItem => {
1465            const N: usize = AclItem::binary_size();
1466            let acl_item =
1467                AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1468            Datum::AclItem(acl_item)
1469        }
1470    }
1471}
1472
1473// --------------------------------------------------------------------------------
1474// writing data
1475
1476fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1477where
1478    D: Vector<u8>,
1479{
1480    let len = u64::cast_from(bytes.len());
1481    data.extend_from_slice(&len.to_le_bytes());
1482    data.extend_from_slice(bytes);
1483}
1484
1485fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1486where
1487    D: Vector<u8>,
1488{
1489    match tag {
1490        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1491            let len = bytes.len().to_le_bytes();
1492            data.push(len[0]);
1493        }
1494        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1495            let len = bytes.len().to_le_bytes();
1496            data.extend_from_slice(&len[0..2]);
1497        }
1498        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1499            let len = bytes.len().to_le_bytes();
1500            data.extend_from_slice(&len[0..4]);
1501        }
1502        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1503            let len = bytes.len().to_le_bytes();
1504            data.extend_from_slice(&len);
1505        }
1506        _ => unreachable!(),
1507    }
1508    data.extend_from_slice(bytes);
1509}
1510
1511pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1512    i32::to_le_bytes(date.pg_epoch_days())
1513}
1514
1515fn push_date<D>(data: &mut D, date: Date)
1516where
1517    D: Vector<u8>,
1518{
1519    data.extend_from_slice(&date_to_array(date));
1520}
1521
1522pub(super) fn naive_date_to_arrays(
1523    date: NaiveDate,
1524) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1525    (
1526        i32::to_le_bytes(date.year()),
1527        u32::to_le_bytes(date.ordinal()),
1528    )
1529}
1530
1531fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1532where
1533    D: Vector<u8>,
1534{
1535    let (ds1, ds2) = naive_date_to_arrays(date);
1536    data.extend_from_slice(&ds1);
1537    data.extend_from_slice(&ds2);
1538}
1539
1540pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1541    (
1542        u32::to_le_bytes(time.num_seconds_from_midnight()),
1543        u32::to_le_bytes(time.nanosecond()),
1544    )
1545}
1546
1547fn push_time<D>(data: &mut D, time: NaiveTime)
1548where
1549    D: Vector<u8>,
1550{
1551    let (ts1, ts2) = time_to_arrays(time);
1552    data.extend_from_slice(&ts1);
1553    data.extend_from_slice(&ts2);
1554}
1555
1556/// Returns an i64 representing a `NaiveDateTime`, if
1557/// said i64 can be round-tripped back to a `NaiveDateTime`.
1558///
1559/// The only exotic NDTs for which this can't happen are those that
1560/// are hundreds of years in the future or past, or those that
1561/// represent a leap second. (Note that Materialize does not support
1562/// leap seconds, but this module does).
1563// This function is inspired by `NaiveDateTime::timestamp_nanos`,
1564// with extra checking.
1565fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1566    let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1567    if subsec_nanos >= 1_000_000_000 {
1568        return None;
1569    }
1570    let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1571    as_ns.checked_add(i64::from(subsec_nanos))
1572}
1573
1574// This function is extremely hot, so
1575// we just use `as` to avoid the overhead of
1576// `try_into` followed by `unwrap`.
1577// `leading_ones` and `leading_zeros`
1578// can never return values greater than 64, so the conversion is safe.
1579#[inline(always)]
1580#[allow(clippy::as_conversions)]
1581fn min_bytes_signed<T>(i: T) -> u8
1582where
1583    T: Into<i64>,
1584{
1585    let i: i64 = i.into();
1586
1587    // To fit in n bytes, we require that
1588    // everything but the leading sign bits fits in n*8
1589    // bits.
1590    let n_sign_bits = if i.is_negative() {
1591        i.leading_ones() as u8
1592    } else {
1593        i.leading_zeros() as u8
1594    };
1595
1596    (64 - n_sign_bits + 7) / 8
1597}
1598
1599// In principle we could just use `min_bytes_signed`, rather than
1600// having a separate function here, as long as we made that one take
1601// `T: Into<i128>` instead of 64. But LLVM doesn't seem smart enough
1602// to realize that that function is the same as the current version,
1603// and generates worse code.
1604//
1605// Justification for `as` is the same as in `min_bytes_signed`.
1606#[inline(always)]
1607#[allow(clippy::as_conversions)]
1608fn min_bytes_unsigned<T>(i: T) -> u8
1609where
1610    T: Into<u64>,
1611{
1612    let i: u64 = i.into();
1613
1614    let n_sign_bits = i.leading_zeros() as u8;
1615
1616    (64 - n_sign_bits + 7) / 8
1617}
1618
1619const TINY: usize = 1 << 8;
1620const SHORT: usize = 1 << 16;
1621const LONG: usize = 1 << 32;
1622
1623fn push_datum<D>(data: &mut D, datum: Datum)
1624where
1625    D: Vector<u8>,
1626{
1627    match datum {
1628        Datum::Null => data.push(Tag::Null.into()),
1629        Datum::False => data.push(Tag::False.into()),
1630        Datum::True => data.push(Tag::True.into()),
1631        Datum::Int16(i) => {
1632            let mbs = min_bytes_signed(i);
1633            let tag = u8::from(if i.is_negative() {
1634                Tag::NegativeInt16_0
1635            } else {
1636                Tag::NonNegativeInt16_0
1637            }) + mbs;
1638
1639            data.push(tag);
1640            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1641        }
1642        Datum::Int32(i) => {
1643            let mbs = min_bytes_signed(i);
1644            let tag = u8::from(if i.is_negative() {
1645                Tag::NegativeInt32_0
1646            } else {
1647                Tag::NonNegativeInt32_0
1648            }) + mbs;
1649
1650            data.push(tag);
1651            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1652        }
1653        Datum::Int64(i) => {
1654            let mbs = min_bytes_signed(i);
1655            let tag = u8::from(if i.is_negative() {
1656                Tag::NegativeInt64_0
1657            } else {
1658                Tag::NonNegativeInt64_0
1659            }) + mbs;
1660
1661            data.push(tag);
1662            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1663        }
1664        Datum::UInt8(i) => {
1665            let mbu = min_bytes_unsigned(i);
1666            let tag = u8::from(Tag::UInt8_0) + mbu;
1667            data.push(tag);
1668            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1669        }
1670        Datum::UInt16(i) => {
1671            let mbu = min_bytes_unsigned(i);
1672            let tag = u8::from(Tag::UInt16_0) + mbu;
1673            data.push(tag);
1674            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1675        }
1676        Datum::UInt32(i) => {
1677            let mbu = min_bytes_unsigned(i);
1678            let tag = u8::from(Tag::UInt32_0) + mbu;
1679            data.push(tag);
1680            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1681        }
1682        Datum::UInt64(i) => {
1683            let mbu = min_bytes_unsigned(i);
1684            let tag = u8::from(Tag::UInt64_0) + mbu;
1685            data.push(tag);
1686            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1687        }
1688        Datum::Float32(f) => {
1689            data.push(Tag::Float32.into());
1690            data.extend_from_slice(&f.to_bits().to_le_bytes());
1691        }
1692        Datum::Float64(f) => {
1693            data.push(Tag::Float64.into());
1694            data.extend_from_slice(&f.to_bits().to_le_bytes());
1695        }
1696        Datum::Date(d) => {
1697            data.push(Tag::Date.into());
1698            push_date(data, d);
1699        }
1700        Datum::Time(t) => {
1701            data.push(Tag::Time.into());
1702            push_time(data, t);
1703        }
1704        Datum::Timestamp(t) => {
1705            let datetime = t.to_naive();
1706            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1707                data.push(Tag::CheapTimestamp.into());
1708                data.extend_from_slice(&nanos.to_le_bytes());
1709            } else {
1710                data.push(Tag::Timestamp.into());
1711                push_naive_date(data, datetime.date());
1712                push_time(data, datetime.time());
1713            }
1714        }
1715        Datum::TimestampTz(t) => {
1716            let datetime = t.to_naive();
1717            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1718                data.push(Tag::CheapTimestampTz.into());
1719                data.extend_from_slice(&nanos.to_le_bytes());
1720            } else {
1721                data.push(Tag::TimestampTz.into());
1722                push_naive_date(data, datetime.date());
1723                push_time(data, datetime.time());
1724            }
1725        }
1726        Datum::Interval(i) => {
1727            data.push(Tag::Interval.into());
1728            data.extend_from_slice(&i.months.to_le_bytes());
1729            data.extend_from_slice(&i.days.to_le_bytes());
1730            data.extend_from_slice(&i.micros.to_le_bytes());
1731        }
1732        Datum::Bytes(bytes) => {
1733            let tag = match bytes.len() {
1734                0..TINY => Tag::BytesTiny,
1735                TINY..SHORT => Tag::BytesShort,
1736                SHORT..LONG => Tag::BytesLong,
1737                _ => Tag::BytesHuge,
1738            };
1739            data.push(tag.into());
1740            push_lengthed_bytes(data, bytes, tag);
1741        }
1742        Datum::String(string) => {
1743            let tag = match string.len() {
1744                0..TINY => Tag::StringTiny,
1745                TINY..SHORT => Tag::StringShort,
1746                SHORT..LONG => Tag::StringLong,
1747                _ => Tag::StringHuge,
1748            };
1749            data.push(tag.into());
1750            push_lengthed_bytes(data, string.as_bytes(), tag);
1751        }
1752        Datum::List(list) => {
1753            let tag = match list.data.len() {
1754                0..TINY => Tag::ListTiny,
1755                TINY..SHORT => Tag::ListShort,
1756                SHORT..LONG => Tag::ListLong,
1757                _ => Tag::ListHuge,
1758            };
1759            data.push(tag.into());
1760            push_lengthed_bytes(data, list.data, tag);
1761        }
1762        Datum::Uuid(u) => {
1763            data.push(Tag::Uuid.into());
1764            data.extend_from_slice(u.as_bytes());
1765        }
1766        Datum::Array(array) => {
1767            // See the comment in `Row::push_array` for details on the encoding
1768            // of arrays.
1769            data.push(Tag::Array.into());
1770            data.push(array.dims.ndims());
1771            data.extend_from_slice(array.dims.data);
1772            push_untagged_bytes(data, array.elements.data);
1773        }
1774        Datum::Map(dict) => {
1775            data.push(Tag::Dict.into());
1776            push_untagged_bytes(data, dict.data);
1777        }
1778        Datum::JsonNull => data.push(Tag::JsonNull.into()),
1779        Datum::MzTimestamp(t) => {
1780            data.push(Tag::MzTimestamp.into());
1781            data.extend_from_slice(&t.encode());
1782        }
1783        Datum::Dummy => data.push(Tag::Dummy.into()),
1784        Datum::Numeric(mut n) => {
1785            // Pseudo-canonical representation of decimal values with
1786            // insignificant zeroes trimmed. This compresses the number further
1787            // than `Numeric::trim` by removing all zeroes, and not only those in
1788            // the fractional component.
1789            numeric::cx_datum().reduce(&mut n.0);
1790            let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1791            data.push(Tag::Numeric.into());
1792            data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1793            data.push(
1794                i8::try_from(exponent)
1795                    .expect("exponent to fit within i8; should not exceed +/- 39")
1796                    .to_le_bytes()[0],
1797            );
1798            data.push(bits);
1799
1800            let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1801
1802            // Little endian machines can take the lsu directly from u16 to u8.
1803            if cfg!(target_endian = "little") {
1804                // SAFETY: `lsu` (returned by `coefficient_units()`) is a `&[u16]`, so
1805                // each element can safely be transmuted into two `u8`s.
1806                let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
1807                // The `u8` aligned version of the `lsu` should have twice as many
1808                // elements as we expect for the `u16` version.
1809                soft_assert_no_log!(
1810                    lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
1811                    "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
1812                    Numeric::digits_to_lsu_elements_len(digits) * 2,
1813                    lsu_bytes.len()
1814                );
1815                // There should be no unaligned elements in the prefix or suffix.
1816                soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
1817                data.extend_from_slice(lsu_bytes);
1818            } else {
1819                for u in lsu {
1820                    data.extend_from_slice(&u.to_le_bytes());
1821                }
1822            }
1823        }
1824        Datum::Range(range) => {
1825            // See notes on `push_range_with` for details about encoding.
1826            data.push(Tag::Range.into());
1827            data.push(range.internal_flag_bits());
1828
1829            if let Some(RangeInner { lower, upper }) = range.inner {
1830                for bound in [lower.bound, upper.bound] {
1831                    if let Some(bound) = bound {
1832                        match bound.datum() {
1833                            Datum::Null => panic!("cannot push Datum::Null into range"),
1834                            d => push_datum::<D>(data, d),
1835                        }
1836                    }
1837                }
1838            }
1839        }
1840        Datum::MzAclItem(mz_acl_item) => {
1841            data.push(Tag::MzAclItem.into());
1842            data.extend_from_slice(&mz_acl_item.encode_binary());
1843        }
1844        Datum::AclItem(acl_item) => {
1845            data.push(Tag::AclItem.into());
1846            data.extend_from_slice(&acl_item.encode_binary());
1847        }
1848    }
1849}
1850
1851/// Return the number of bytes these Datums would use if packed as a Row.
1852pub fn row_size<'a, I>(a: I) -> usize
1853where
1854    I: IntoIterator<Item = Datum<'a>>,
1855{
1856    // Using datums_size instead of a.data().len() here is safer because it will
1857    // return the size of the datums if they were packed into a Row. Although
1858    // a.data().len() happens to give the correct answer (and is faster), data()
1859    // is documented as for debugging only.
1860    let sz = datums_size::<_, _>(a);
1861    let size_of_row = std::mem::size_of::<Row>();
1862    // The Row struct attempts to inline data until it can't fit in the
1863    // preallocated size. Otherwise it spills to heap, and uses the Row to point
1864    // to that.
1865    if sz > Row::SIZE {
1866        sz + size_of_row
1867    } else {
1868        size_of_row
1869    }
1870}
1871
1872/// Number of bytes required by the datum.
1873/// This is used to optimistically pre-allocate buffers for packing rows.
1874pub fn datum_size(datum: &Datum) -> usize {
1875    match datum {
1876        Datum::Null => 1,
1877        Datum::False => 1,
1878        Datum::True => 1,
1879        Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
1880        Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
1881        Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
1882        Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1883        Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1884        Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1885        Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1886        Datum::Float32(_) => 1 + size_of::<f32>(),
1887        Datum::Float64(_) => 1 + size_of::<f64>(),
1888        Datum::Date(_) => 1 + size_of::<i32>(),
1889        Datum::Time(_) => 1 + 8,
1890        Datum::Timestamp(t) => {
1891            1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
1892                8
1893            } else {
1894                16
1895            }
1896        }
1897        Datum::TimestampTz(t) => {
1898            1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
1899                8
1900            } else {
1901                16
1902            }
1903        }
1904        Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
1905        Datum::Bytes(bytes) => {
1906            // We use a variable length representation of slice length.
1907            let bytes_for_length = match bytes.len() {
1908                0..TINY => 1,
1909                TINY..SHORT => 2,
1910                SHORT..LONG => 4,
1911                _ => 8,
1912            };
1913            1 + bytes_for_length + bytes.len()
1914        }
1915        Datum::String(string) => {
1916            // We use a variable length representation of slice length.
1917            let bytes_for_length = match string.len() {
1918                0..TINY => 1,
1919                TINY..SHORT => 2,
1920                SHORT..LONG => 4,
1921                _ => 8,
1922            };
1923            1 + bytes_for_length + string.len()
1924        }
1925        Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
1926        Datum::Array(array) => {
1927            1 + size_of::<u8>()
1928                + array.dims.data.len()
1929                + size_of::<u64>()
1930                + array.elements.data.len()
1931        }
1932        Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
1933        Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
1934        Datum::JsonNull => 1,
1935        Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
1936        Datum::Dummy => 1,
1937        Datum::Numeric(d) => {
1938            let mut d = d.0.clone();
1939            // Values must be reduced to determine appropriate number of
1940            // coefficient units.
1941            numeric::cx_datum().reduce(&mut d);
1942            // 4 = 1 bit each for tag, digits, exponent, bits
1943            4 + (d.coefficient_units().len() * 2)
1944        }
1945        Datum::Range(Range { inner }) => {
1946            // Tag + flags
1947            2 + match inner {
1948                None => 0,
1949                Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
1950                    .iter()
1951                    .map(|bound| match bound {
1952                        None => 0,
1953                        Some(bound) => bound.val.len(),
1954                    })
1955                    .sum(),
1956            }
1957        }
1958        Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
1959        Datum::AclItem(_) => 1 + AclItem::binary_size(),
1960    }
1961}
1962
1963/// Number of bytes required by a sequence of datums.
1964///
1965/// This method can be used to right-size the allocation for a `Row`
1966/// before calling [`RowPacker::extend`].
1967pub fn datums_size<'a, I, D>(iter: I) -> usize
1968where
1969    I: IntoIterator<Item = D>,
1970    D: Borrow<Datum<'a>>,
1971{
1972    iter.into_iter().map(|d| datum_size(d.borrow())).sum()
1973}
1974
1975/// Number of bytes required by a list of datums. This computes the size that would be required if
1976/// the given datums were packed into a list.
1977///
1978/// This is used to optimistically pre-allocate buffers for packing rows.
1979pub fn datum_list_size<'a, I, D>(iter: I) -> usize
1980where
1981    I: IntoIterator<Item = D>,
1982    D: Borrow<Datum<'a>>,
1983{
1984    1 + size_of::<u64>() + datums_size(iter)
1985}
1986
1987impl RowPacker<'_> {
1988    /// Constructs a row packer that will pack additional datums into the
1989    /// provided row.
1990    ///
1991    /// This function is intentionally somewhat inconvenient to call. You
1992    /// usually want to call [`Row::packer`] instead to start packing from
1993    /// scratch.
1994    pub fn for_existing_row(row: &mut Row) -> RowPacker<'_> {
1995        RowPacker { row }
1996    }
1997
1998    /// Extend an existing `Row` with a `Datum`.
1999    #[inline]
2000    pub fn push<'a, D>(&mut self, datum: D)
2001    where
2002        D: Borrow<Datum<'a>>,
2003    {
2004        push_datum(&mut self.row.data, *datum.borrow());
2005    }
2006
2007    /// Extend an existing `Row` with additional `Datum`s.
2008    #[inline]
2009    pub fn extend<'a, I, D>(&mut self, iter: I)
2010    where
2011        I: IntoIterator<Item = D>,
2012        D: Borrow<Datum<'a>>,
2013    {
2014        for datum in iter {
2015            push_datum(&mut self.row.data, *datum.borrow())
2016        }
2017    }
2018
2019    /// Extend an existing `Row` with additional `Datum`s.
2020    ///
2021    /// In the case the iterator produces an error, the pushing of
2022    /// datums in terminated and the error returned. The `Row` will
2023    /// be incomplete, but it will be safe to read datums from it.
2024    #[inline]
2025    pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
2026    where
2027        I: IntoIterator<Item = Result<D, E>>,
2028        D: Borrow<Datum<'a>>,
2029    {
2030        for datum in iter {
2031            push_datum(&mut self.row.data, *datum?.borrow());
2032        }
2033        Ok(())
2034    }
2035
2036    /// Appends the datums of an entire `Row`.
2037    pub fn extend_by_row(&mut self, row: &Row) {
2038        self.row.data.extend_from_slice(row.data.as_slice());
2039    }
2040
2041    /// Appends the slice of data representing an entire `Row`. The data is not validated.
2042    ///
2043    /// # Safety
2044    ///
2045    /// The requirements from [`Row::from_bytes_unchecked`] apply here, too:
2046    /// This method relies on `data` being an appropriate row encoding, and can
2047    /// result in unsafety if this is not the case.
2048    #[inline]
2049    pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
2050        self.row.data.extend_from_slice(data)
2051    }
2052
2053    /// Pushes a [`DatumList`] that is built from a closure.
2054    ///
2055    /// The supplied closure will be invoked once with a `Row` that can be used
2056    /// to populate the list. It is valid to call any method on the
2057    /// [`RowPacker`] except for [`RowPacker::clear`], [`RowPacker::truncate`],
2058    /// or [`RowPacker::truncate_datums`].
2059    ///
2060    /// Returns the value returned by the closure, if any.
2061    ///
2062    /// ```
2063    /// # use mz_repr::{Row, Datum};
2064    /// let mut row = Row::default();
2065    /// row.packer().push_list_with(|row| {
2066    ///     row.push(Datum::String("age"));
2067    ///     row.push(Datum::Int64(42));
2068    /// });
2069    /// assert_eq!(
2070    ///     row.unpack_first().unwrap_list().iter().collect::<Vec<_>>(),
2071    ///     vec![Datum::String("age"), Datum::Int64(42)],
2072    /// );
2073    /// ```
2074    #[inline]
2075    pub fn push_list_with<F, R>(&mut self, f: F) -> R
2076    where
2077        F: FnOnce(&mut RowPacker) -> R,
2078    {
2079        // First, assume that the list will fit in 255 bytes, and thus the length will fit in
2080        // 1 byte. If not, we'll fix it up later.
2081        let start = self.row.data.len();
2082        self.row.data.push(Tag::ListTiny.into());
2083        // Write a dummy len, will fix it up later.
2084        self.row.data.push(0);
2085
2086        let out = f(self);
2087
2088        // The `- 1 - 1` is for the tag and the len.
2089        let len = self.row.data.len() - start - 1 - 1;
2090        // We now know the real len.
2091        if len < TINY {
2092            // If the len fits in 1 byte, we just need to fix up the len.
2093            self.row.data[start + 1] = len.to_le_bytes()[0];
2094        } else {
2095            // Note: We move this code path into its own function, so that the common case can be
2096            // inlined.
2097            long_list(&mut self.row.data, start, len);
2098        }
2099
2100        /// 1. Fix up the tag.
2101        /// 2. Move the actual data a bit (for which we also need to make room at the end).
2102        /// 3. Fix up the len.
2103        /// `data`: The row's backing data.
2104        /// `start`: where `push_list_with` started writing in `data`.
2105        /// `len`: the length of the data, excluding the tag and the length.
2106        #[cold]
2107        fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2108            // `len_len`: the length of the length. (Possible values are: 2, 4, 8. 1 is handled
2109            // elsewhere.) The other parameters are the same as for `long_list`.
2110            let long_list_inner = |data: &mut CompactBytes, len_len| {
2111                // We'll need memory for the new, bigger length, so make the `CompactBytes` bigger.
2112                // The `- 1` is because the old length was 1 byte.
2113                const ZEROS: [u8; 8] = [0; 8];
2114                data.extend_from_slice(&ZEROS[0..len_len - 1]);
2115                // Move the data to the end of the `CompactBytes`, to make space for the new length.
2116                // Originally, it started after the 1-byte tag and the 1-byte length, now it will
2117                // start after the 1-byte tag and the len_len-byte length.
2118                //
2119                // Note that this is the only operation in `long_list` whose cost is proportional
2120                // to `len`. Since `len` is at least 256 here, the other operations' cost are
2121                // negligible. `copy_within` is a memmove, which is probably a fair bit faster per
2122                // Datum than a Datum encoding in the `f` closure.
2123                data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2124                // Write the new length.
2125                data[start + 1..start + 1 + len_len]
2126                    .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2127            };
2128            match len {
2129                0..TINY => {
2130                    unreachable!()
2131                }
2132                TINY..SHORT => {
2133                    data[start] = Tag::ListShort.into();
2134                    long_list_inner(data, 2);
2135                }
2136                SHORT..LONG => {
2137                    data[start] = Tag::ListLong.into();
2138                    long_list_inner(data, 4);
2139                }
2140                _ => {
2141                    data[start] = Tag::ListHuge.into();
2142                    long_list_inner(data, 8);
2143                }
2144            };
2145        }
2146
2147        out
2148    }
2149
2150    /// Pushes a [`DatumMap`] that is built from a closure.
2151    ///
2152    /// The supplied closure will be invoked once with a `Row` that can be used
2153    /// to populate the dict.
2154    ///
2155    /// The closure **must** alternate pushing string keys and arbitrary values,
2156    /// otherwise reading the dict will cause a panic.
2157    ///
2158    /// The closure **must** push keys in ascending order, otherwise equality
2159    /// checks on the resulting `Row` may be wrong and reading the dict IN DEBUG
2160    /// MODE will cause a panic.
2161    ///
2162    /// The closure **must not** call [`RowPacker::clear`],
2163    /// [`RowPacker::truncate`], or [`RowPacker::truncate_datums`].
2164    ///
2165    /// # Example
2166    ///
2167    /// ```
2168    /// # use mz_repr::{Row, Datum};
2169    /// let mut row = Row::default();
2170    /// row.packer().push_dict_with(|row| {
2171    ///
2172    ///     // key
2173    ///     row.push(Datum::String("age"));
2174    ///     // value
2175    ///     row.push(Datum::Int64(42));
2176    ///
2177    ///     // key
2178    ///     row.push(Datum::String("name"));
2179    ///     // value
2180    ///     row.push(Datum::String("bob"));
2181    /// });
2182    /// assert_eq!(
2183    ///     row.unpack_first().unwrap_map().iter().collect::<Vec<_>>(),
2184    ///     vec![("age", Datum::Int64(42)), ("name", Datum::String("bob"))]
2185    /// );
2186    /// ```
2187    pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2188    where
2189        F: FnOnce(&mut RowPacker) -> R,
2190    {
2191        self.row.data.push(Tag::Dict.into());
2192        let start = self.row.data.len();
2193        // write a dummy len, will fix it up later
2194        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2195
2196        let res = f(self);
2197
2198        let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2199        // fix up the len
2200        self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2201
2202        res
2203    }
2204
2205    /// Convenience function to construct an array from an iter of `Datum`s.
2206    ///
2207    /// Returns an error if the number of elements in `iter` does not match
2208    /// the cardinality of the array as described by `dims`, or if the
2209    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2210    /// occurs, the packer's state will be unchanged.
2211    pub fn try_push_array<'a, I, D>(
2212        &mut self,
2213        dims: &[ArrayDimension],
2214        iter: I,
2215    ) -> Result<(), InvalidArrayError>
2216    where
2217        I: IntoIterator<Item = D>,
2218        D: Borrow<Datum<'a>>,
2219    {
2220        // SAFETY: The function returns the exact number of elements pushed into the array.
2221        unsafe {
2222            self.push_array_with_unchecked(dims, |packer| {
2223                let mut nelements = 0;
2224                for datum in iter {
2225                    packer.push(datum);
2226                    nelements += 1;
2227                }
2228                Ok::<_, InvalidArrayError>(nelements)
2229            })
2230        }
2231    }
2232
2233    /// Convenience function to construct an array from a function. The function must return the
2234    /// number of elements it pushed into the array. It is undefined behavior if the function returns
2235    /// a number different to the number of elements it pushed.
2236    ///
2237    /// Returns an error if the number of elements pushed by `f` does not match
2238    /// the cardinality of the array as described by `dims`, or if the
2239    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`], or if `f` errors. If an error
2240    /// occurs, the packer's state will be unchanged.
2241    pub unsafe fn push_array_with_unchecked<F, E>(
2242        &mut self,
2243        dims: &[ArrayDimension],
2244        f: F,
2245    ) -> Result<(), E>
2246    where
2247        F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2248        E: From<InvalidArrayError>,
2249    {
2250        // Arrays are encoded as follows.
2251        //
2252        // u8    ndims
2253        // u64   dim_0 lower bound
2254        // u64   dim_0 length
2255        // ...
2256        // u64   dim_n lower bound
2257        // u64   dim_n length
2258        // u64   element data size in bytes
2259        // u8    element data, where elements are encoded in row-major order
2260
2261        if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2262            return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2263        }
2264
2265        let start = self.row.data.len();
2266        self.row.data.push(Tag::Array.into());
2267
2268        // Write dimension information.
2269        self.row
2270            .data
2271            .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2272        for dim in dims {
2273            self.row
2274                .data
2275                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2276            self.row
2277                .data
2278                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2279        }
2280
2281        // Write elements.
2282        let off = self.row.data.len();
2283        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2284        let nelements = match f(self) {
2285            Ok(nelements) => nelements,
2286            Err(e) => {
2287                self.row.data.truncate(start);
2288                return Err(e);
2289            }
2290        };
2291        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2292        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2293
2294        // Check that the number of elements written matches the dimension
2295        // information.
2296        let cardinality = match dims {
2297            [] => 0,
2298            dims => dims.iter().map(|d| d.length).product(),
2299        };
2300        if nelements != cardinality {
2301            self.row.data.truncate(start);
2302            return Err(InvalidArrayError::WrongCardinality {
2303                actual: nelements,
2304                expected: cardinality,
2305            }
2306            .into());
2307        }
2308
2309        Ok(())
2310    }
2311
2312    /// Pushes an [`Array`] that is built from a closure.
2313    ///
2314    /// __WARNING__: This is fairly "sharp" tool that is easy to get wrong. You
2315    /// should prefer [`RowPacker::try_push_array`] when possible.
2316    ///
2317    /// Returns an error if the number of elements pushed does not match
2318    /// the cardinality of the array as described by `dims`, or if the
2319    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2320    /// occurs, the packer's state will be unchanged.
2321    pub fn push_array_with_row_major<F, I>(
2322        &mut self,
2323        dims: I,
2324        f: F,
2325    ) -> Result<(), InvalidArrayError>
2326    where
2327        I: IntoIterator<Item = ArrayDimension>,
2328        F: FnOnce(&mut RowPacker) -> usize,
2329    {
2330        let start = self.row.data.len();
2331        self.row.data.push(Tag::Array.into());
2332
2333        // Write dummy dimension length for now, we'll fix it up.
2334        let dims_start = self.row.data.len();
2335        self.row.data.push(42);
2336
2337        let mut num_dims: u8 = 0;
2338        let mut cardinality: usize = 1;
2339        for dim in dims {
2340            num_dims += 1;
2341            cardinality *= dim.length;
2342
2343            self.row
2344                .data
2345                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2346            self.row
2347                .data
2348                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2349        }
2350
2351        if num_dims > MAX_ARRAY_DIMENSIONS {
2352            // Reset the packer state so we don't have invalid data.
2353            self.row.data.truncate(start);
2354            return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2355        }
2356        // Fix up our dimension length.
2357        self.row.data[dims_start..dims_start + size_of::<u8>()]
2358            .copy_from_slice(&num_dims.to_le_bytes());
2359
2360        // Write elements.
2361        let off = self.row.data.len();
2362        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2363
2364        let nelements = f(self);
2365
2366        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2367        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2368
2369        // Check that the number of elements written matches the dimension
2370        // information.
2371        let cardinality = match num_dims {
2372            0 => 0,
2373            _ => cardinality,
2374        };
2375        if nelements != cardinality {
2376            self.row.data.truncate(start);
2377            return Err(InvalidArrayError::WrongCardinality {
2378                actual: nelements,
2379                expected: cardinality,
2380            });
2381        }
2382
2383        Ok(())
2384    }
2385
2386    /// Convenience function to push a `DatumList` from an iter of `Datum`s
2387    ///
2388    /// See [`RowPacker::push_dict_with`] if you need to be able to handle errors
2389    pub fn push_list<'a, I, D>(&mut self, iter: I)
2390    where
2391        I: IntoIterator<Item = D>,
2392        D: Borrow<Datum<'a>>,
2393    {
2394        self.push_list_with(|packer| {
2395            for elem in iter {
2396                packer.push(*elem.borrow())
2397            }
2398        });
2399    }
2400
2401    /// Convenience function to push a `DatumMap` from an iter of `(&str, Datum)` pairs
2402    pub fn push_dict<'a, I, D>(&mut self, iter: I)
2403    where
2404        I: IntoIterator<Item = (&'a str, D)>,
2405        D: Borrow<Datum<'a>>,
2406    {
2407        self.push_dict_with(|packer| {
2408            for (k, v) in iter {
2409                packer.push(Datum::String(k));
2410                packer.push(*v.borrow())
2411            }
2412        })
2413    }
2414
2415    /// Pushes a `Datum::Range` derived from the `Range<Datum<'a>`.
2416    ///
2417    /// # Panics
2418    /// - If lower and upper express finite values and they are datums of
2419    ///   different types.
2420    /// - If lower or upper express finite values and are equal to
2421    ///   `Datum::Null`. To handle `Datum::Null` properly, use
2422    ///   [`RangeBound::new`].
2423    ///
2424    /// # Notes
2425    /// - This function canonicalizes the range before pushing it to the row.
2426    /// - Prefer this function over `push_range_with` because of its
2427    ///   canonicaliztion.
2428    /// - Prefer creating [`RangeBound`]s using [`RangeBound::new`], which
2429    ///   handles `Datum::Null` in a SQL-friendly way.
2430    pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2431        range.canonicalize()?;
2432        match range.inner {
2433            None => {
2434                self.row.data.push(Tag::Range.into());
2435                // Untagged bytes only contains the `RANGE_EMPTY` flag value.
2436                self.row.data.push(range::InternalFlags::EMPTY.bits());
2437                Ok(())
2438            }
2439            Some(inner) => self.push_range_with(
2440                RangeLowerBound {
2441                    inclusive: inner.lower.inclusive,
2442                    bound: inner
2443                        .lower
2444                        .bound
2445                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2446                },
2447                RangeUpperBound {
2448                    inclusive: inner.upper.inclusive,
2449                    bound: inner
2450                        .upper
2451                        .bound
2452                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2453                },
2454            ),
2455        }
2456    }
2457
2458    /// Pushes a `DatumRange` built from the specified arguments.
2459    ///
2460    /// # Warning
2461    /// Unlike `push_range`, `push_range_with` _does not_ canonicalize its
2462    /// inputs. Consequentially, this means it's possible to generate ranges
2463    /// that will not reflect the proper ordering and equality.
2464    ///
2465    /// # Panics
2466    /// - If lower or upper expresses a finite value and does not push exactly
2467    ///   one value into the `RowPacker`.
2468    /// - If lower and upper express finite values and they are datums of
2469    ///   different types.
2470    /// - If lower or upper express finite values and push `Datum::Null`.
2471    ///
2472    /// # Notes
2473    /// - Prefer `push_range_with` over this function. This function should be
2474    ///   used only when you are not pushing `Datum`s to the inner row.
2475    /// - Range encoding is `[<flag bytes>,<lower>?,<upper>?]`, where `lower`
2476    ///   and `upper` are optional, contingent on the flag value expressing an
2477    ///   empty range (where neither will be present) or infinite bounds (where
2478    ///   each infinite bound will be absent).
2479    /// - To push an emtpy range, use `push_range` using `Range { inner: None }`.
2480    pub fn push_range_with<L, U, E>(
2481        &mut self,
2482        lower: RangeLowerBound<L>,
2483        upper: RangeUpperBound<U>,
2484    ) -> Result<(), E>
2485    where
2486        L: FnOnce(&mut RowPacker) -> Result<(), E>,
2487        U: FnOnce(&mut RowPacker) -> Result<(), E>,
2488        E: From<InvalidRangeError>,
2489    {
2490        let start = self.row.data.len();
2491        self.row.data.push(Tag::Range.into());
2492
2493        let mut flags = range::InternalFlags::empty();
2494
2495        flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2496        flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2497        flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2498        flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2499
2500        let mut expected_datums = 0;
2501
2502        self.row.data.push(flags.bits());
2503
2504        let datum_check = self.row.data.len();
2505
2506        if let Some(value) = lower.bound {
2507            let start = self.row.data.len();
2508            value(self)?;
2509            assert!(
2510                start < self.row.data.len(),
2511                "finite values must each push exactly one value; expected 1 but got 0"
2512            );
2513            expected_datums += 1;
2514        }
2515
2516        if let Some(value) = upper.bound {
2517            let start = self.row.data.len();
2518            value(self)?;
2519            assert!(
2520                start < self.row.data.len(),
2521                "finite values must each push exactly one value; expected 1 but got 0"
2522            );
2523            expected_datums += 1;
2524        }
2525
2526        // Validate the invariants that 0, 1, or 2 elements were pushed, none are Null,
2527        // and if two are pushed then the second is not less than the first. Panic in
2528        // some cases and error in others.
2529        let mut actual_datums = 0;
2530        let mut seen = None;
2531        let mut dataz = &self.row.data[datum_check..];
2532        while !dataz.is_empty() {
2533            let d = unsafe { read_datum(&mut dataz) };
2534            assert!(d != Datum::Null, "cannot push Datum::Null into range");
2535
2536            match seen {
2537                None => seen = Some(d),
2538                Some(seen) => {
2539                    let seen_kind = DatumKind::from(seen);
2540                    let d_kind = DatumKind::from(d);
2541                    assert!(
2542                        seen_kind == d_kind,
2543                        "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2544                    );
2545
2546                    if seen > d {
2547                        self.row.data.truncate(start);
2548                        return Err(InvalidRangeError::MisorderedRangeBounds.into());
2549                    }
2550                }
2551            }
2552            actual_datums += 1;
2553        }
2554
2555        assert!(
2556            actual_datums == expected_datums,
2557            "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2558        );
2559
2560        Ok(())
2561    }
2562
2563    /// Clears the contents of the packer without de-allocating its backing memory.
2564    pub fn clear(&mut self) {
2565        self.row.data.clear();
2566    }
2567
2568    /// Truncates the underlying storage to the specified byte position.
2569    ///
2570    /// # Safety
2571    ///
2572    /// `pos` MUST specify a byte offset that lies on a datum boundary.
2573    /// If `pos` specifies a byte offset that is *within* a datum, the row
2574    /// packer will produce an invalid row, the unpacking of which may
2575    /// trigger undefined behavior!
2576    ///
2577    /// To find the byte offset of a datum boundary, inspect the packer's
2578    /// byte length by calling `packer.data().len()` after pushing the desired
2579    /// number of datums onto the packer.
2580    pub unsafe fn truncate(&mut self, pos: usize) {
2581        self.row.data.truncate(pos)
2582    }
2583
2584    /// Truncates the underlying row to contain at most the first `n` datums.
2585    pub fn truncate_datums(&mut self, n: usize) {
2586        let prev_len = self.row.data.len();
2587        let mut iter = self.row.iter();
2588        for _ in iter.by_ref().take(n) {}
2589        let next_len = iter.data.len();
2590        // SAFETY: iterator offsets always lie on a datum boundary.
2591        unsafe { self.truncate(prev_len - next_len) }
2592    }
2593
2594    /// Returns the total amount of bytes used by the underlying row.
2595    pub fn byte_len(&self) -> usize {
2596        self.row.byte_len()
2597    }
2598}
2599
2600impl<'a> IntoIterator for &'a Row {
2601    type Item = Datum<'a>;
2602    type IntoIter = DatumListIter<'a>;
2603    fn into_iter(self) -> DatumListIter<'a> {
2604        self.iter()
2605    }
2606}
2607
2608impl fmt::Debug for Row {
2609    /// Debug representation using the internal datums
2610    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2611        f.write_str("Row{")?;
2612        f.debug_list().entries(self.iter()).finish()?;
2613        f.write_str("}")
2614    }
2615}
2616
2617impl fmt::Display for Row {
2618    /// Display representation using the internal datums
2619    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2620        f.write_str("(")?;
2621        for (i, datum) in self.iter().enumerate() {
2622            if i != 0 {
2623                f.write_str(", ")?;
2624            }
2625            write!(f, "{}", datum)?;
2626        }
2627        f.write_str(")")
2628    }
2629}
2630
2631impl<'a> DatumList<'a> {
2632    pub fn empty() -> DatumList<'static> {
2633        DatumList { data: &[] }
2634    }
2635
2636    pub fn iter(&self) -> DatumListIter<'a> {
2637        DatumListIter { data: self.data }
2638    }
2639
2640    /// For debugging only
2641    pub fn data(&self) -> &'a [u8] {
2642        self.data
2643    }
2644}
2645
2646impl<'a> IntoIterator for &'a DatumList<'a> {
2647    type Item = Datum<'a>;
2648    type IntoIter = DatumListIter<'a>;
2649    fn into_iter(self) -> DatumListIter<'a> {
2650        self.iter()
2651    }
2652}
2653
2654impl<'a> Iterator for DatumListIter<'a> {
2655    type Item = Datum<'a>;
2656    fn next(&mut self) -> Option<Self::Item> {
2657        if self.data.is_empty() {
2658            None
2659        } else {
2660            Some(unsafe { read_datum(&mut self.data) })
2661        }
2662    }
2663}
2664
2665impl<'a> DatumMap<'a> {
2666    pub fn empty() -> DatumMap<'static> {
2667        DatumMap { data: &[] }
2668    }
2669
2670    pub fn iter(&self) -> DatumDictIter<'a> {
2671        DatumDictIter {
2672            data: self.data,
2673            prev_key: None,
2674        }
2675    }
2676
2677    /// For debugging only
2678    pub fn data(&self) -> &'a [u8] {
2679        self.data
2680    }
2681}
2682
2683impl<'a> Debug for DatumMap<'a> {
2684    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2685        f.debug_map().entries(self.iter()).finish()
2686    }
2687}
2688
2689impl<'a> IntoIterator for &'a DatumMap<'a> {
2690    type Item = (&'a str, Datum<'a>);
2691    type IntoIter = DatumDictIter<'a>;
2692    fn into_iter(self) -> DatumDictIter<'a> {
2693        self.iter()
2694    }
2695}
2696
2697impl<'a> Iterator for DatumDictIter<'a> {
2698    type Item = (&'a str, Datum<'a>);
2699    fn next(&mut self) -> Option<Self::Item> {
2700        if self.data.is_empty() {
2701            None
2702        } else {
2703            let key_tag =
2704                Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2705            assert!(
2706                key_tag == Tag::StringTiny
2707                    || key_tag == Tag::StringShort
2708                    || key_tag == Tag::StringLong
2709                    || key_tag == Tag::StringHuge,
2710                "Dict keys must be strings, got {:?}",
2711                key_tag
2712            );
2713            let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
2714            let val = unsafe { read_datum(&mut self.data) };
2715
2716            // if in debug mode, sanity check keys
2717            if cfg!(debug_assertions) {
2718                if let Some(prev_key) = self.prev_key {
2719                    debug_assert!(
2720                        prev_key < key,
2721                        "Dict keys must be unique and given in ascending order: {} came before {}",
2722                        prev_key,
2723                        key
2724                    );
2725                }
2726                self.prev_key = Some(key);
2727            }
2728
2729            Some((key, val))
2730        }
2731    }
2732}
2733
2734impl RowArena {
2735    pub fn new() -> Self {
2736        RowArena {
2737            inner: RefCell::new(vec![]),
2738        }
2739    }
2740
2741    /// Creates a `RowArena` with a hint of how many rows will be created in the arena, to avoid
2742    /// reallocations of its internal vector.
2743    pub fn with_capacity(capacity: usize) -> Self {
2744        RowArena {
2745            inner: RefCell::new(Vec::with_capacity(capacity)),
2746        }
2747    }
2748
2749    /// Does a `reserve` on the underlying `Vec`. Call this when you expect `additional` more datums
2750    /// to be created in this arena.
2751    pub fn reserve(&self, additional: usize) {
2752        self.inner.borrow_mut().reserve(additional);
2753    }
2754
2755    /// Take ownership of `bytes` for the lifetime of the arena.
2756    #[allow(clippy::transmute_ptr_to_ptr)]
2757    pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
2758        let mut inner = self.inner.borrow_mut();
2759        inner.push(bytes);
2760        let owned_bytes = &inner[inner.len() - 1];
2761        unsafe {
2762            // This is safe because:
2763            //   * We only ever append to self.inner, so the byte vector
2764            //     will live as long as the arena.
2765            //   * We return a reference to the byte vector's contents, so it's
2766            //     okay if self.inner reallocates and moves the byte
2767            //     vector.
2768            //   * We don't allow access to the byte vector itself, so it will
2769            //     never reallocate.
2770            transmute::<&[u8], &'a [u8]>(owned_bytes)
2771        }
2772    }
2773
2774    /// Take ownership of `string` for the lifetime of the arena.
2775    pub fn push_string<'a>(&'a self, string: String) -> &'a str {
2776        let owned_bytes = self.push_bytes(string.into_bytes());
2777        unsafe {
2778            // This is safe because we know it was a `String` just before.
2779            std::str::from_utf8_unchecked(owned_bytes)
2780        }
2781    }
2782
2783    /// Take ownership of `row` for the lifetime of the arena, returning a
2784    /// reference to the first datum in the row.
2785    ///
2786    /// If we had an owned datum type, this method would be much clearer, and
2787    /// would be called `push_owned_datum`.
2788    pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
2789        let mut inner = self.inner.borrow_mut();
2790        inner.push(row.data.into_vec());
2791        unsafe {
2792            // This is safe because:
2793            //   * We only ever append to self.inner, so the row data will live
2794            //     as long as the arena.
2795            //   * We force the row data into its own heap allocation--
2796            //     importantly, we do NOT store the SmallVec, which might be
2797            //     storing data inline--so it's okay if self.inner reallocates
2798            //     and moves the row.
2799            //   * We don't allow access to the byte vector itself, so it will
2800            //     never reallocate.
2801            let datum = read_datum(&mut &inner[inner.len() - 1][..]);
2802            transmute::<Datum<'_>, Datum<'a>>(datum)
2803        }
2804    }
2805
2806    /// Equivalent to `push_unary_row` but returns a `DatumNested` rather than a
2807    /// `Datum`.
2808    fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
2809        let mut inner = self.inner.borrow_mut();
2810        inner.push(row.data.into_vec());
2811        unsafe {
2812            // This is safe because:
2813            //   * We only ever append to self.inner, so the row data will live
2814            //     as long as the arena.
2815            //   * We force the row data into its own heap allocation--
2816            //     importantly, we do NOT store the SmallVec, which might be
2817            //     storing data inline--so it's okay if self.inner reallocates
2818            //     and moves the row.
2819            //   * We don't allow access to the byte vector itself, so it will
2820            //     never reallocate.
2821            let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
2822            transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
2823        }
2824    }
2825
2826    /// Convenience function to make a new `Row` containing a single datum, and
2827    /// take ownership of it for the lifetime of the arena
2828    ///
2829    /// ```
2830    /// # use mz_repr::{RowArena, Datum};
2831    /// let arena = RowArena::new();
2832    /// let datum = arena.make_datum(|packer| {
2833    ///   packer.push_list(&[Datum::String("hello"), Datum::String("world")]);
2834    /// });
2835    /// assert_eq!(datum.unwrap_list().iter().collect::<Vec<_>>(), vec![Datum::String("hello"), Datum::String("world")]);
2836    /// ```
2837    pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
2838    where
2839        F: FnOnce(&mut RowPacker),
2840    {
2841        let mut row = Row::default();
2842        f(&mut row.packer());
2843        self.push_unary_row(row)
2844    }
2845
2846    /// Convenience function identical to `make_datum` but instead returns a
2847    /// `DatumNested`.
2848    pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
2849    where
2850        F: FnOnce(&mut RowPacker),
2851    {
2852        let mut row = Row::default();
2853        f(&mut row.packer());
2854        self.push_unary_row_datum_nested(row)
2855    }
2856
2857    /// Like [`RowArena::make_datum`], but the provided closure can return an error.
2858    pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
2859    where
2860        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2861    {
2862        let mut row = Row::default();
2863        f(&mut row.packer())?;
2864        Ok(self.push_unary_row(row))
2865    }
2866
2867    /// Clear the contents of the arena.
2868    pub fn clear(&mut self) {
2869        self.inner.borrow_mut().clear();
2870    }
2871}
2872
2873impl Default for RowArena {
2874    fn default() -> RowArena {
2875        RowArena::new()
2876    }
2877}
2878
2879/// A thread-local row, which can be borrowed and returned.
2880/// # Example
2881///
2882/// Use this type instead of creating a new row:
2883/// ```
2884/// use mz_repr::SharedRow;
2885///
2886/// let mut row_builder = SharedRow::get();
2887/// ```
2888///
2889/// This allows us to reuse an existing row allocation instead of creating a new one or retaining
2890/// an allocation locally. Additionally, we can observe the size of the local row in a central
2891/// place and potentially reallocate to reduce memory needs.
2892///
2893/// # Panic
2894///
2895/// [`SharedRow::get`] panics when trying to obtain multiple references to the shared row.
2896#[derive(Debug)]
2897pub struct SharedRow(Row);
2898
2899impl SharedRow {
2900    thread_local! {
2901        /// A thread-local slot containing a shared Row that can be temporarily used by a function.
2902        /// There can be at most one active user of this Row, which is tracked by the state of the
2903        /// `Option<_>` wrapper. When it is `Some(..)`, the row is available for using. When it
2904        /// is `None`, it is not, and the constructor will panic if a thread attempts to use it.
2905        static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
2906    }
2907
2908    /// Get the shared row.
2909    ///
2910    /// The row's contents are cleared before returning it.
2911    ///
2912    /// # Panic
2913    ///
2914    /// Panics when the row is already borrowed elsewhere.
2915    pub fn get() -> Self {
2916        let mut row = Self::SHARED_ROW
2917            .take()
2918            .expect("attempted to borrow already borrowed SharedRow");
2919        // Clear row
2920        row.packer();
2921        Self(row)
2922    }
2923
2924    /// Gets the shared row and uses it to pack `iter`.
2925    pub fn pack<'a, I, D>(iter: I) -> Row
2926    where
2927        I: IntoIterator<Item = D>,
2928        D: Borrow<Datum<'a>>,
2929    {
2930        let mut row_builder = Self::get();
2931        let mut row_packer = row_builder.packer();
2932        row_packer.extend(iter);
2933        row_builder.clone()
2934    }
2935}
2936
2937impl std::ops::Deref for SharedRow {
2938    type Target = Row;
2939
2940    fn deref(&self) -> &Self::Target {
2941        &self.0
2942    }
2943}
2944
2945impl std::ops::DerefMut for SharedRow {
2946    fn deref_mut(&mut self) -> &mut Self::Target {
2947        &mut self.0
2948    }
2949}
2950
2951impl Drop for SharedRow {
2952    fn drop(&mut self) {
2953        // Take the Row allocation from this instance and put it back in the thread local slot for
2954        // the next user. The Row in `self` is replaced with an empty Row which does not allocate.
2955        Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
2956    }
2957}
2958
2959#[cfg(test)]
2960mod tests {
2961    use chrono::{DateTime, NaiveDate};
2962    use mz_ore::{assert_err, assert_none};
2963
2964    use crate::SqlScalarType;
2965
2966    use super::*;
2967
2968    #[mz_ore::test]
2969    fn test_assumptions() {
2970        assert_eq!(size_of::<Tag>(), 1);
2971        #[cfg(target_endian = "big")]
2972        {
2973            // if you want to run this on a big-endian cpu, we'll need big-endian versions of the serialization code
2974            assert!(false);
2975        }
2976    }
2977
2978    #[mz_ore::test]
2979    fn miri_test_arena() {
2980        let arena = RowArena::new();
2981
2982        assert_eq!(arena.push_string("".to_owned()), "");
2983        assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
2984
2985        let empty: &[u8] = &[];
2986        assert_eq!(arena.push_bytes(vec![]), empty);
2987        assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
2988
2989        let mut row = Row::default();
2990        let mut packer = row.packer();
2991        packer.push_dict_with(|row| {
2992            row.push(Datum::String("a"));
2993            row.push_list_with(|row| {
2994                row.push(Datum::String("one"));
2995                row.push(Datum::String("two"));
2996                row.push(Datum::String("three"));
2997            });
2998            row.push(Datum::String("b"));
2999            row.push(Datum::String("c"));
3000        });
3001        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
3002    }
3003
3004    #[mz_ore::test]
3005    fn miri_test_round_trip() {
3006        fn round_trip(datums: Vec<Datum>) {
3007            let row = Row::pack(datums.clone());
3008
3009            // When run under miri this catches undefined bytes written to data
3010            // eg by calling push_copy! on a type which contains undefined padding values
3011            println!("{:?}", row.data());
3012
3013            let datums2 = row.iter().collect::<Vec<_>>();
3014            let datums3 = row.unpack();
3015            assert_eq!(datums, datums2);
3016            assert_eq!(datums, datums3);
3017        }
3018
3019        round_trip(vec![]);
3020        round_trip(
3021            SqlScalarType::enumerate()
3022                .iter()
3023                .flat_map(|r#type| r#type.interesting_datums())
3024                .collect(),
3025        );
3026        round_trip(vec![
3027            Datum::Null,
3028            Datum::Null,
3029            Datum::False,
3030            Datum::True,
3031            Datum::Int16(-21),
3032            Datum::Int32(-42),
3033            Datum::Int64(-2_147_483_648 - 42),
3034            Datum::UInt8(0),
3035            Datum::UInt8(1),
3036            Datum::UInt16(0),
3037            Datum::UInt16(1),
3038            Datum::UInt16(1 << 8),
3039            Datum::UInt32(0),
3040            Datum::UInt32(1),
3041            Datum::UInt32(1 << 8),
3042            Datum::UInt32(1 << 16),
3043            Datum::UInt32(1 << 24),
3044            Datum::UInt64(0),
3045            Datum::UInt64(1),
3046            Datum::UInt64(1 << 8),
3047            Datum::UInt64(1 << 16),
3048            Datum::UInt64(1 << 24),
3049            Datum::UInt64(1 << 32),
3050            Datum::UInt64(1 << 40),
3051            Datum::UInt64(1 << 48),
3052            Datum::UInt64(1 << 56),
3053            Datum::Float32(OrderedFloat::from(-42.12)),
3054            Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
3055            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
3056            Datum::Timestamp(
3057                CheckedTimestamp::from_timestamplike(
3058                    NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
3059                        .unwrap()
3060                        .and_hms_opt(14, 32, 11)
3061                        .unwrap(),
3062                )
3063                .unwrap(),
3064            ),
3065            Datum::TimestampTz(
3066                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3067                    .unwrap(),
3068            ),
3069            Datum::Interval(Interval {
3070                months: 312,
3071                ..Default::default()
3072            }),
3073            Datum::Interval(Interval::new(0, 0, 1_012_312)),
3074            Datum::Bytes(&[]),
3075            Datum::Bytes(&[0, 2, 1, 255]),
3076            Datum::String(""),
3077            Datum::String("العَرَبِيَّة"),
3078        ]);
3079    }
3080
3081    #[mz_ore::test]
3082    fn test_array() {
3083        // Construct an array using `Row::push_array` and verify that it unpacks
3084        // correctly.
3085        const DIM: ArrayDimension = ArrayDimension {
3086            lower_bound: 2,
3087            length: 2,
3088        };
3089        let mut row = Row::default();
3090        let mut packer = row.packer();
3091        packer
3092            .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3093            .unwrap();
3094        let arr1 = row.unpack_first().unwrap_array();
3095        assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3096        assert_eq!(
3097            arr1.elements().into_iter().collect::<Vec<_>>(),
3098            vec![Datum::Int32(1), Datum::Int32(2)]
3099        );
3100
3101        // Pack a previously-constructed `Datum::Array` and verify that it
3102        // unpacks correctly.
3103        let row = Row::pack_slice(&[Datum::Array(arr1)]);
3104        let arr2 = row.unpack_first().unwrap_array();
3105        assert_eq!(arr1, arr2);
3106    }
3107
3108    #[mz_ore::test]
3109    fn test_multidimensional_array() {
3110        let datums = vec![
3111            Datum::Int32(1),
3112            Datum::Int32(2),
3113            Datum::Int32(3),
3114            Datum::Int32(4),
3115            Datum::Int32(5),
3116            Datum::Int32(6),
3117            Datum::Int32(7),
3118            Datum::Int32(8),
3119        ];
3120
3121        let mut row = Row::default();
3122        let mut packer = row.packer();
3123        packer
3124            .try_push_array(
3125                &[
3126                    ArrayDimension {
3127                        lower_bound: 1,
3128                        length: 1,
3129                    },
3130                    ArrayDimension {
3131                        lower_bound: 1,
3132                        length: 4,
3133                    },
3134                    ArrayDimension {
3135                        lower_bound: 1,
3136                        length: 2,
3137                    },
3138                ],
3139                &datums,
3140            )
3141            .unwrap();
3142        let array = row.unpack_first().unwrap_array();
3143        assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3144    }
3145
3146    #[mz_ore::test]
3147    fn test_array_max_dimensions() {
3148        let mut row = Row::default();
3149        let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3150
3151        // An array with one too many dimensions should be rejected.
3152        let res = row.packer().try_push_array(
3153            &vec![
3154                ArrayDimension {
3155                    lower_bound: 1,
3156                    length: 1
3157                };
3158                max_dims + 1
3159            ],
3160            vec![Datum::Int32(4)],
3161        );
3162        assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3163        assert!(row.data.is_empty());
3164
3165        // An array with exactly the maximum allowable dimensions should be
3166        // accepted.
3167        row.packer()
3168            .try_push_array(
3169                &vec![
3170                    ArrayDimension {
3171                        lower_bound: 1,
3172                        length: 1
3173                    };
3174                    max_dims
3175                ],
3176                vec![Datum::Int32(4)],
3177            )
3178            .unwrap();
3179    }
3180
3181    #[mz_ore::test]
3182    fn test_array_wrong_cardinality() {
3183        let mut row = Row::default();
3184        let res = row.packer().try_push_array(
3185            &[
3186                ArrayDimension {
3187                    lower_bound: 1,
3188                    length: 2,
3189                },
3190                ArrayDimension {
3191                    lower_bound: 1,
3192                    length: 3,
3193                },
3194            ],
3195            vec![Datum::Int32(1), Datum::Int32(2)],
3196        );
3197        assert_eq!(
3198            res,
3199            Err(InvalidArrayError::WrongCardinality {
3200                actual: 2,
3201                expected: 6,
3202            })
3203        );
3204        assert!(row.data.is_empty());
3205    }
3206
3207    #[mz_ore::test]
3208    fn test_nesting() {
3209        let mut row = Row::default();
3210        row.packer().push_dict_with(|row| {
3211            row.push(Datum::String("favourites"));
3212            row.push_list_with(|row| {
3213                row.push(Datum::String("ice cream"));
3214                row.push(Datum::String("oreos"));
3215                row.push(Datum::String("cheesecake"));
3216            });
3217            row.push(Datum::String("name"));
3218            row.push(Datum::String("bob"));
3219        });
3220
3221        let mut iter = row.unpack_first().unwrap_map().iter();
3222
3223        let (k, v) = iter.next().unwrap();
3224        assert_eq!(k, "favourites");
3225        assert_eq!(
3226            v.unwrap_list().iter().collect::<Vec<_>>(),
3227            vec![
3228                Datum::String("ice cream"),
3229                Datum::String("oreos"),
3230                Datum::String("cheesecake"),
3231            ]
3232        );
3233
3234        let (k, v) = iter.next().unwrap();
3235        assert_eq!(k, "name");
3236        assert_eq!(v, Datum::String("bob"));
3237    }
3238
3239    #[mz_ore::test]
3240    fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3241        let pack = |ok| {
3242            let mut row = Row::default();
3243            row.packer().push_dict_with(|row| {
3244                if ok {
3245                    row.push(Datum::String("key"));
3246                    row.push(Datum::Int32(42));
3247                    Ok(7)
3248                } else {
3249                    Err("fail")
3250                }
3251            })?;
3252            Ok(row)
3253        };
3254
3255        assert_eq!(pack(false), Err("fail"));
3256
3257        let row = pack(true)?;
3258        let mut dict = row.unpack_first().unwrap_map().iter();
3259        assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3260        assert_eq!(dict.next(), None);
3261
3262        Ok(())
3263    }
3264
3265    #[mz_ore::test]
3266    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
3267    fn test_datum_sizes() {
3268        let arena = RowArena::new();
3269
3270        // Test the claims about various datum sizes.
3271        let values_of_interest = vec![
3272            Datum::Null,
3273            Datum::False,
3274            Datum::Int16(0),
3275            Datum::Int32(0),
3276            Datum::Int64(0),
3277            Datum::UInt8(0),
3278            Datum::UInt8(1),
3279            Datum::UInt16(0),
3280            Datum::UInt16(1),
3281            Datum::UInt16(1 << 8),
3282            Datum::UInt32(0),
3283            Datum::UInt32(1),
3284            Datum::UInt32(1 << 8),
3285            Datum::UInt32(1 << 16),
3286            Datum::UInt32(1 << 24),
3287            Datum::UInt64(0),
3288            Datum::UInt64(1),
3289            Datum::UInt64(1 << 8),
3290            Datum::UInt64(1 << 16),
3291            Datum::UInt64(1 << 24),
3292            Datum::UInt64(1 << 32),
3293            Datum::UInt64(1 << 40),
3294            Datum::UInt64(1 << 48),
3295            Datum::UInt64(1 << 56),
3296            Datum::Float32(OrderedFloat(0.0)),
3297            Datum::Float64(OrderedFloat(0.0)),
3298            Datum::from(numeric::Numeric::from(0)),
3299            Datum::from(numeric::Numeric::from(1000)),
3300            Datum::from(numeric::Numeric::from(9999)),
3301            Datum::Date(
3302                NaiveDate::from_ymd_opt(1, 1, 1)
3303                    .unwrap()
3304                    .try_into()
3305                    .unwrap(),
3306            ),
3307            Datum::Timestamp(
3308                CheckedTimestamp::from_timestamplike(
3309                    DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3310                )
3311                .unwrap(),
3312            ),
3313            Datum::TimestampTz(
3314                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3315                    .unwrap(),
3316            ),
3317            Datum::Interval(Interval::default()),
3318            Datum::Bytes(&[]),
3319            Datum::String(""),
3320            Datum::JsonNull,
3321            Datum::Range(Range { inner: None }),
3322            arena.make_datum(|packer| {
3323                packer
3324                    .push_range(Range::new(Some((
3325                        RangeLowerBound::new(Datum::Int32(-1), true),
3326                        RangeUpperBound::new(Datum::Int32(1), true),
3327                    ))))
3328                    .unwrap();
3329            }),
3330        ];
3331        for value in values_of_interest {
3332            if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3333                panic!("Disparity in claimed size for {:?}", value);
3334            }
3335        }
3336    }
3337
3338    #[mz_ore::test]
3339    fn test_range_errors() {
3340        fn test_range_errors_inner<'a>(
3341            datums: Vec<Vec<Datum<'a>>>,
3342        ) -> Result<(), InvalidRangeError> {
3343            let mut row = Row::default();
3344            let row_len = row.byte_len();
3345            let mut packer = row.packer();
3346            let r = packer.push_range_with(
3347                RangeLowerBound {
3348                    inclusive: true,
3349                    bound: Some(|row: &mut RowPacker| {
3350                        for d in &datums[0] {
3351                            row.push(d);
3352                        }
3353                        Ok(())
3354                    }),
3355                },
3356                RangeUpperBound {
3357                    inclusive: true,
3358                    bound: Some(|row: &mut RowPacker| {
3359                        for d in &datums[1] {
3360                            row.push(d);
3361                        }
3362                        Ok(())
3363                    }),
3364                },
3365            );
3366
3367            assert_eq!(row_len, row.byte_len());
3368
3369            r
3370        }
3371
3372        for panicking_case in [
3373            vec![vec![Datum::Int32(1)], vec![]],
3374            vec![
3375                vec![Datum::Int32(1), Datum::Int32(2)],
3376                vec![Datum::Int32(3)],
3377            ],
3378            vec![
3379                vec![Datum::Int32(1)],
3380                vec![Datum::Int32(2), Datum::Int32(3)],
3381            ],
3382            vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3383            vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3384            vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3385            vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3386        ] {
3387            #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
3388            let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3389            assert_err!(result);
3390        }
3391
3392        let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3393        assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3394    }
3395
3396    /// Lists have a variable-length encoding for their lengths. We test each case here.
3397    #[mz_ore::test]
3398    #[cfg_attr(miri, ignore)] // slow
3399    fn test_list_encoding() {
3400        fn test_list_encoding_inner(len: usize) {
3401            let list_elem = |i: usize| {
3402                if i % 2 == 0 {
3403                    Datum::False
3404                } else {
3405                    Datum::True
3406                }
3407            };
3408            let mut row = Row::default();
3409            {
3410                // Push some stuff.
3411                let mut packer = row.packer();
3412                packer.push(Datum::String("start"));
3413                packer.push_list_with(|packer| {
3414                    for i in 0..len {
3415                        packer.push(list_elem(i));
3416                    }
3417                });
3418                packer.push(Datum::String("end"));
3419            }
3420            // Check that we read back exactly what we pushed.
3421            let mut row_it = row.iter();
3422            assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3423            match row_it.next().unwrap() {
3424                Datum::List(list) => {
3425                    let mut list_it = list.iter();
3426                    for i in 0..len {
3427                        assert_eq!(list_it.next().unwrap(), list_elem(i));
3428                    }
3429                    assert_none!(list_it.next());
3430                }
3431                _ => panic!("expected Datum::List"),
3432            }
3433            assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3434            assert_none!(row_it.next());
3435        }
3436
3437        test_list_encoding_inner(0);
3438        test_list_encoding_inner(1);
3439        test_list_encoding_inner(10);
3440        test_list_encoding_inner(TINY - 1); // tiny
3441        test_list_encoding_inner(TINY + 1); // short
3442        test_list_encoding_inner(SHORT + 1); // long
3443
3444        // The biggest one takes 40 s on my laptop, probably not worth it.
3445        //test_list_encoding_inner(LONG + 1); // huge
3446    }
3447}