mz_repr/
row.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::cell::{Cell, RefCell};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::mem::{size_of, transmute};
16use std::ops::Deref;
17use std::str;
18
19use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
20use compact_bytes::CompactBytes;
21use mz_ore::cast::{CastFrom, ReinterpretCast};
22use mz_ore::soft_assert_no_log;
23use mz_ore::vec::Vector;
24use mz_persist_types::Codec64;
25use num_enum::{IntoPrimitive, TryFromPrimitive};
26use ordered_float::OrderedFloat;
27use proptest::prelude::*;
28use proptest::strategy::{BoxedStrategy, Strategy};
29use serde::{Deserialize, Serialize};
30use uuid::Uuid;
31
32use crate::adt::array::{
33    Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
34};
35use crate::adt::date::Date;
36use crate::adt::interval::Interval;
37use crate::adt::mz_acl_item::{AclItem, MzAclItem};
38use crate::adt::numeric;
39use crate::adt::numeric::Numeric;
40use crate::adt::range::{
41    self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
42};
43use crate::adt::timestamp::CheckedTimestamp;
44use crate::scalar::{DatumKind, arb_datum};
45use crate::{Datum, RelationDesc, Timestamp};
46
47pub(crate) mod encode;
48pub mod iter;
49
50include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
51
52/// A packed representation for `Datum`s.
53///
54/// `Datum` is easy to work with but very space inefficient. A `Datum::Int32(42)`
55/// is laid out in memory like this:
56///
57///   tag: 3
58///   padding: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
59///   data: 0 0 0 42
60///   padding: 0 0 0 0 0 0 0 0 0 0 0 0
61///
62/// For a total of 32 bytes! The second set of padding is needed in case we were
63/// to write a 16-byte datum into this location. The first set of padding is
64/// needed to align that hypothetical decimal to a 16 bytes boundary.
65///
66/// A `Row` stores zero or more `Datum`s without any padding. We avoid the need
67/// for the first set of padding by only providing access to the `Datum`s via
68/// calls to `ptr::read_unaligned`, which on modern x86 is barely penalized. We
69/// avoid the need for the second set of padding by not providing mutable access
70/// to the `Datum`. Instead, `Row` is append-only.
71///
72/// A `Row` can be built from a collection of `Datum`s using `Row::pack`, but it
73/// is more efficient to use `Row::pack_slice` so that a right-sized allocation
74/// can be created. If that is not possible, consider using the row buffer
75/// pattern: allocate one row, pack into it, and then call [`Row::clone`] to
76/// receive a copy of that row, leaving behind the original allocation to pack
77/// future rows.
78///
79/// Creating a row via [`Row::pack_slice`]:
80///
81/// ```
82/// # use mz_repr::{Row, Datum};
83/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
84/// assert_eq!(row.unpack(), vec![Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)])
85/// ```
86///
87/// `Row`s can be unpacked by iterating over them:
88///
89/// ```
90/// # use mz_repr::{Row, Datum};
91/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
92/// assert_eq!(row.iter().nth(1).unwrap(), Datum::Int32(1));
93/// ```
94///
95/// If you want random access to the `Datum`s in a `Row`, use `Row::unpack` to create a `Vec<Datum>`
96/// ```
97/// # use mz_repr::{Row, Datum};
98/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
99/// let datums = row.unpack();
100/// assert_eq!(datums[1], Datum::Int32(1));
101/// ```
102///
103/// # Performance
104///
105/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
106/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
107/// avoids the allocations involved in `Row::new()`.
108#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
109pub struct Row {
110    data: CompactBytes,
111}
112
113impl Row {
114    const SIZE: usize = CompactBytes::MAX_INLINE;
115
116    /// A variant of `Row::from_proto` that allows for reuse of internal allocs
117    /// and validates the decoding against a provided [`RelationDesc`].
118    pub fn decode_from_proto(
119        &mut self,
120        proto: &ProtoRow,
121        desc: &RelationDesc,
122    ) -> Result<(), String> {
123        let mut packer = self.packer();
124        for (col_idx, _, _) in desc.iter_all() {
125            let d = match proto.datums.get(col_idx.to_raw()) {
126                Some(x) => x,
127                None => {
128                    packer.push(Datum::Null);
129                    continue;
130                }
131            };
132            packer.try_push_proto(d)?;
133        }
134
135        Ok(())
136    }
137
138    /// Allocate an empty `Row` with a pre-allocated capacity.
139    #[inline]
140    pub fn with_capacity(cap: usize) -> Self {
141        Self {
142            data: CompactBytes::with_capacity(cap),
143        }
144    }
145
146    /// Create an empty `Row`.
147    #[inline]
148    pub const fn empty() -> Self {
149        Self {
150            data: CompactBytes::empty(),
151        }
152    }
153
154    /// Creates a new row from supplied bytes.
155    ///
156    /// # Safety
157    ///
158    /// This method relies on `data` being an appropriate row encoding, and can
159    /// result in unsafety if this is not the case.
160    pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
161        Row {
162            data: CompactBytes::new(data),
163        }
164    }
165
166    /// Constructs a [`RowPacker`] that will pack datums into this row's
167    /// allocation.
168    ///
169    /// This method clears the existing contents of the row, but retains the
170    /// allocation.
171    pub fn packer(&mut self) -> RowPacker<'_> {
172        self.clear();
173        RowPacker { row: self }
174    }
175
176    /// Take some `Datum`s and pack them into a `Row`.
177    ///
178    /// This method builds a `Row` by repeatedly increasing the backing
179    /// allocation. If the contents of the iterator are known ahead of
180    /// time, consider [`Row::with_capacity`] to right-size the allocation
181    /// first, and then [`RowPacker::extend`] to populate it with `Datum`s.
182    /// This avoids the repeated allocation resizing and copying.
183    pub fn pack<'a, I, D>(iter: I) -> Row
184    where
185        I: IntoIterator<Item = D>,
186        D: Borrow<Datum<'a>>,
187    {
188        let mut row = Row::default();
189        row.packer().extend(iter);
190        row
191    }
192
193    /// Use `self` to pack `iter`, and then clone the result.
194    ///
195    /// This is a convenience method meant to reduce boilerplate around row
196    /// formation.
197    pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
198    where
199        I: IntoIterator<Item = D>,
200        D: Borrow<Datum<'a>>,
201    {
202        self.packer().extend(iter);
203        self.clone()
204    }
205
206    /// Like [`Row::pack`], but the provided iterator is allowed to produce an
207    /// error, in which case the packing operation is aborted and the error
208    /// returned.
209    pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
210    where
211        I: IntoIterator<Item = Result<D, E>>,
212        D: Borrow<Datum<'a>>,
213    {
214        let mut row = Row::default();
215        row.packer().try_extend(iter)?;
216        Ok(row)
217    }
218
219    /// Pack a slice of `Datum`s into a `Row`.
220    ///
221    /// This method has the advantage over `pack` that it can determine the required
222    /// allocation before packing the elements, ensuring only one allocation and no
223    /// redundant copies required.
224    pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
225        // Pre-allocate the needed number of bytes.
226        let mut row = Row::with_capacity(datums_size(slice.iter()));
227        row.packer().extend(slice.iter());
228        row
229    }
230
231    /// Returns the total amount of bytes used by this row.
232    pub fn byte_len(&self) -> usize {
233        let heap_size = if self.data.spilled() {
234            self.data.len()
235        } else {
236            0
237        };
238        let inline_size = std::mem::size_of::<Self>();
239        inline_size.saturating_add(heap_size)
240    }
241
242    /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself.
243    pub fn data_len(&self) -> usize {
244        self.data.len()
245    }
246
247    /// Returns the total capacity in bytes used by this row.
248    pub fn byte_capacity(&self) -> usize {
249        self.data.capacity()
250    }
251
252    /// Extracts a Row slice containing the entire [`Row`].
253    #[inline]
254    pub fn as_row_ref(&self) -> &RowRef {
255        RowRef::from_slice(self.data.as_slice())
256    }
257
258    /// Clear the contents of the [`Row`], leaving any allocation in place.
259    #[inline]
260    fn clear(&mut self) {
261        self.data.clear();
262    }
263}
264
265impl Borrow<RowRef> for Row {
266    #[inline]
267    fn borrow(&self) -> &RowRef {
268        self.as_row_ref()
269    }
270}
271
272impl AsRef<RowRef> for Row {
273    #[inline]
274    fn as_ref(&self) -> &RowRef {
275        self.as_row_ref()
276    }
277}
278
279impl Deref for Row {
280    type Target = RowRef;
281
282    #[inline]
283    fn deref(&self) -> &Self::Target {
284        self.as_row_ref()
285    }
286}
287
288// Nothing depends on Row being exactly 24, we just want to add visibility to the size.
289static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
290
291impl Clone for Row {
292    fn clone(&self) -> Self {
293        Row {
294            data: self.data.clone(),
295        }
296    }
297
298    fn clone_from(&mut self, source: &Self) {
299        self.data.clone_from(&source.data);
300    }
301}
302
303// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
304impl std::hash::Hash for Row {
305    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
306        self.as_row_ref().hash(state)
307    }
308}
309
310impl Arbitrary for Row {
311    type Parameters = prop::collection::SizeRange;
312    type Strategy = BoxedStrategy<Row>;
313
314    fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
315        prop::collection::vec(arb_datum(), size)
316            .prop_map(|items| {
317                let mut row = Row::default();
318                let mut packer = row.packer();
319                for item in items.iter() {
320                    let datum: Datum<'_> = item.into();
321                    packer.push(datum);
322                }
323                row
324            })
325            .boxed()
326    }
327}
328
329impl PartialOrd for Row {
330    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
331        Some(self.cmp(other))
332    }
333}
334
335impl Ord for Row {
336    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
337        self.as_ref().cmp(other.as_ref())
338    }
339}
340
341#[allow(missing_debug_implementations)]
342mod columnation {
343    use columnation::{Columnation, Region};
344    use mz_ore::region::LgAllocRegion;
345
346    use crate::Row;
347
348    /// Region allocation for `Row` data.
349    ///
350    /// Content bytes are stored in stable contiguous memory locations,
351    /// and then a `Row` referencing them is falsified.
352    pub struct RowStack {
353        region: LgAllocRegion<u8>,
354    }
355
356    impl RowStack {
357        const LIMIT: usize = 2 << 20;
358    }
359
360    // Implement `Default` manually to specify a region allocation limit.
361    impl Default for RowStack {
362        fn default() -> Self {
363            Self {
364                // Limit the region size to 2MiB.
365                region: LgAllocRegion::with_limit(Self::LIMIT),
366            }
367        }
368    }
369
370    impl Columnation for Row {
371        type InnerRegion = RowStack;
372    }
373
374    impl Region for RowStack {
375        type Item = Row;
376        #[inline]
377        fn clear(&mut self) {
378            self.region.clear();
379        }
380        #[inline(always)]
381        unsafe fn copy(&mut self, item: &Row) -> Row {
382            if item.data.spilled() {
383                let bytes = self.region.copy_slice(&item.data[..]);
384                Row {
385                    data: compact_bytes::CompactBytes::from_raw_parts(
386                        bytes.as_mut_ptr(),
387                        item.data.len(),
388                        item.data.capacity(),
389                    ),
390                }
391            } else {
392                item.clone()
393            }
394        }
395
396        fn reserve_items<'a, I>(&mut self, items: I)
397        where
398            Self: 'a,
399            I: Iterator<Item = &'a Self::Item> + Clone,
400        {
401            let size = items
402                .filter(|row| row.data.spilled())
403                .map(|row| row.data.len())
404                .sum();
405            let size = std::cmp::min(size, Self::LIMIT);
406            self.region.reserve(size);
407        }
408
409        fn reserve_regions<'a, I>(&mut self, regions: I)
410        where
411            Self: 'a,
412            I: Iterator<Item = &'a Self> + Clone,
413        {
414            let size = regions.map(|r| r.region.len()).sum();
415            let size = std::cmp::min(size, Self::LIMIT);
416            self.region.reserve(size);
417        }
418
419        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
420            self.region.heap_size(callback)
421        }
422    }
423}
424
425mod columnar {
426    use columnar::{
427        AsBytes, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
428    };
429    use mz_ore::cast::CastFrom;
430
431    use crate::{Row, RowRef};
432
433    #[derive(Copy, Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
434    pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
435        /// Bounds container; provides indexed access to offsets.
436        pub bounds: BC,
437        /// Values container; provides slice access to bytes.
438        pub values: VC,
439    }
440
441    impl Columnar for Row {
442        type Ref<'a> = &'a RowRef;
443        fn copy_from(&mut self, other: Self::Ref<'_>) {
444            self.clear();
445            self.data.extend_from_slice(other.data());
446        }
447        fn into_owned(other: Self::Ref<'_>) -> Self {
448            other.to_owned()
449        }
450        type Container = Rows;
451    }
452
453    impl<'b, BC: Container<u64>> Container<Row> for Rows<BC, &'b [u8]> {
454        type Borrowed<'a>
455            = Rows<BC::Borrowed<'a>, &'a [u8]>
456        where
457            Self: 'a;
458        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
459            Rows {
460                bounds: self.bounds.borrow(),
461                values: self.values,
462            }
463        }
464    }
465    impl<BC: Container<u64>> Container<Row> for Rows<BC, Vec<u8>> {
466        type Borrowed<'a>
467            = Rows<BC::Borrowed<'a>, &'a [u8]>
468        where
469            BC: 'a;
470        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
471            Rows {
472                bounds: self.bounds.borrow(),
473                values: self.values.borrow(),
474            }
475        }
476    }
477
478    impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
479        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
480            self.bounds.as_bytes().chain(self.values.as_bytes())
481        }
482    }
483    impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
484        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
485            Self {
486                bounds: FromBytes::from_bytes(bytes),
487                values: FromBytes::from_bytes(bytes),
488            }
489        }
490    }
491
492    impl<BC: Len, VC> Len for Rows<BC, VC> {
493        #[inline(always)]
494        fn len(&self) -> usize {
495            self.bounds.len()
496        }
497    }
498
499    impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
500        type Ref = &'a RowRef;
501        #[inline(always)]
502        fn get(&self, index: usize) -> Self::Ref {
503            let lower = if index == 0 {
504                0
505            } else {
506                self.bounds.index_as(index - 1)
507            };
508            let upper = self.bounds.index_as(index);
509            let lower = usize::cast_from(lower);
510            let upper = usize::cast_from(upper);
511            RowRef::from_slice(&self.values[lower..upper])
512        }
513    }
514    impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
515        type Ref = &'a RowRef;
516        #[inline(always)]
517        fn get(&self, index: usize) -> Self::Ref {
518            let lower = if index == 0 {
519                0
520            } else {
521                self.bounds.index_as(index - 1)
522            };
523            let upper = self.bounds.index_as(index);
524            let lower = usize::cast_from(lower);
525            let upper = usize::cast_from(upper);
526            RowRef::from_slice(&self.values[lower..upper])
527        }
528    }
529
530    impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
531        #[inline(always)]
532        fn push(&mut self, item: &Row) {
533            self.values.extend_from_slice(item.data.as_slice());
534            self.bounds.push(u64::cast_from(self.values.len()));
535        }
536    }
537    impl<BC: Push<u64>> Push<&RowRef> for Rows<BC> {
538        fn push(&mut self, item: &RowRef) {
539            self.values.extend_from_slice(item.data());
540            self.bounds.push(u64::cast_from(self.values.len()));
541        }
542    }
543    impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
544        fn clear(&mut self) {
545            self.bounds.clear();
546            self.values.clear();
547        }
548    }
549    impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
550        fn heap_size(&self) -> (usize, usize) {
551            let (l0, c0) = self.bounds.heap_size();
552            let (l1, c1) = self.values.heap_size();
553            (l0 + l1, c0 + c1)
554        }
555    }
556}
557
558/// A contiguous slice of bytes that are row data.
559///
560/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
561#[derive(PartialEq, Eq, Hash)]
562#[repr(transparent)]
563pub struct RowRef([u8]);
564
565impl RowRef {
566    /// Create a [`RowRef`] from a slice of data.
567    ///
568    /// We do not check that the provided slice is valid [`Row`] data, will panic on read
569    /// if the data is invalid.
570    pub fn from_slice(row: &[u8]) -> &RowRef {
571        #[allow(clippy::as_conversions)]
572        let ptr = row as *const [u8] as *const RowRef;
573        // SAFETY: We know `ptr` is non-null and aligned because it came from a &[u8].
574        unsafe { &*ptr }
575    }
576
577    /// Unpack `self` into a `Vec<Datum>` for efficient random access.
578    pub fn unpack(&self) -> Vec<Datum> {
579        // It's usually cheaper to unpack twice to figure out the right length than it is to grow the vec as we go
580        let len = self.iter().count();
581        let mut vec = Vec::with_capacity(len);
582        vec.extend(self.iter());
583        vec
584    }
585
586    /// Return the first [`Datum`] in `self`
587    ///
588    /// Panics if the [`RowRef`] is empty.
589    pub fn unpack_first(&self) -> Datum {
590        self.iter().next().unwrap()
591    }
592
593    /// Iterate the [`Datum`] elements of the [`RowRef`].
594    pub fn iter(&self) -> DatumListIter {
595        DatumListIter { data: &self.0 }
596    }
597
598    /// Return the byte length of this [`RowRef`].
599    pub fn byte_len(&self) -> usize {
600        self.0.len()
601    }
602
603    /// For debugging only.
604    pub fn data(&self) -> &[u8] {
605        &self.0
606    }
607
608    /// True iff there is no data in this [`RowRef`].
609    pub fn is_empty(&self) -> bool {
610        self.0.is_empty()
611    }
612}
613
614impl ToOwned for RowRef {
615    type Owned = Row;
616
617    fn to_owned(&self) -> Self::Owned {
618        // SAFETY: RowRef has the invariant that the wrapped data must be a valid Row encoding.
619        unsafe { Row::from_bytes_unchecked(&self.0) }
620    }
621}
622
623impl<'a> IntoIterator for &'a RowRef {
624    type Item = Datum<'a>;
625    type IntoIter = DatumListIter<'a>;
626
627    fn into_iter(self) -> DatumListIter<'a> {
628        DatumListIter { data: &self.0 }
629    }
630}
631
632/// These implementations order first by length, and then by slice contents.
633/// This allows many comparisons to complete without dereferencing memory.
634/// Warning: These order by the u8 array representation, and NOT by Datum::cmp.
635impl PartialOrd for RowRef {
636    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
637        Some(self.cmp(other))
638    }
639}
640
641impl Ord for RowRef {
642    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
643        match self.0.len().cmp(&other.0.len()) {
644            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
645            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
646            std::cmp::Ordering::Equal => self.0.cmp(&other.0),
647        }
648    }
649}
650
651impl fmt::Debug for RowRef {
652    /// Debug representation using the internal datums
653    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
654        f.write_str("RowRef{")?;
655        f.debug_list().entries(self.into_iter()).finish()?;
656        f.write_str("}")
657    }
658}
659
660/// Packs datums into a [`Row`].
661///
662/// Creating a `RowPacker` via [`Row::packer`] starts a packing operation on the
663/// row. A packing operation always starts from scratch: the existing contents
664/// of the underlying row are cleared.
665///
666/// To complete a packing operation, drop the `RowPacker`.
667#[derive(Debug)]
668pub struct RowPacker<'a> {
669    row: &'a mut Row,
670}
671
672#[derive(Debug, Clone)]
673pub struct DatumListIter<'a> {
674    data: &'a [u8],
675}
676
677#[derive(Debug, Clone)]
678pub struct DatumDictIter<'a> {
679    data: &'a [u8],
680    prev_key: Option<&'a str>,
681}
682
683/// `RowArena` is used to hold on to temporary `Row`s for functions like `eval` that need to create complex `Datum`s but don't have a `Row` to put them in yet.
684#[derive(Debug)]
685pub struct RowArena {
686    // Semantically, this field would be better represented by a `Vec<Box<[u8]>>`,
687    // as once the arena takes ownership of a byte vector the vector is never
688    // modified. But `RowArena::push_bytes` takes ownership of a `Vec<u8>`, so
689    // storing that `Vec<u8>` directly avoids an allocation. The cost is
690    // additional memory use, as the vector may have spare capacity, but row
691    // arenas are short lived so this is the better tradeoff.
692    inner: RefCell<Vec<Vec<u8>>>,
693}
694
695// DatumList and DatumDict defined here rather than near Datum because we need private access to the unsafe data field
696
697/// A sequence of Datums
698#[derive(Clone, Copy, Eq, PartialEq, Hash)]
699pub struct DatumList<'a> {
700    /// Points at the serialized datums
701    data: &'a [u8],
702}
703
704impl<'a> Debug for DatumList<'a> {
705    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706        f.debug_list().entries(self.iter()).finish()
707    }
708}
709
710impl Ord for DatumList<'_> {
711    fn cmp(&self, other: &DatumList) -> Ordering {
712        self.iter().cmp(other.iter())
713    }
714}
715
716impl PartialOrd for DatumList<'_> {
717    fn partial_cmp(&self, other: &DatumList) -> Option<Ordering> {
718        Some(self.cmp(other))
719    }
720}
721
722/// A mapping from string keys to Datums
723#[derive(Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
724pub struct DatumMap<'a> {
725    /// Points at the serialized datums, which should be sorted in key order
726    data: &'a [u8],
727}
728
729/// Represents a single `Datum`, appropriate to be nested inside other
730/// `Datum`s.
731#[derive(Clone, Copy, Eq, PartialEq, Hash)]
732pub struct DatumNested<'a> {
733    val: &'a [u8],
734}
735
736impl<'a> std::fmt::Display for DatumNested<'a> {
737    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
738        std::fmt::Display::fmt(&self.datum(), f)
739    }
740}
741
742impl<'a> std::fmt::Debug for DatumNested<'a> {
743    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
744        f.debug_struct("DatumNested")
745            .field("val", &self.datum())
746            .finish()
747    }
748}
749
750impl<'a> DatumNested<'a> {
751    // Figure out which bytes `read_datum` returns (e.g. including the tag),
752    // and then store a reference to those bytes, so we can "replay" this same
753    // call later on without storing the datum itself.
754    pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
755        let prev = *data;
756        let _ = unsafe { read_datum(data) };
757        DatumNested {
758            val: &prev[..(prev.len() - data.len())],
759        }
760    }
761
762    /// Returns the datum `self` contains.
763    pub fn datum(&self) -> Datum<'a> {
764        let mut temp = self.val;
765        unsafe { read_datum(&mut temp) }
766    }
767}
768
769impl<'a> Ord for DatumNested<'a> {
770    fn cmp(&self, other: &Self) -> Ordering {
771        self.datum().cmp(&other.datum())
772    }
773}
774
775impl<'a> PartialOrd for DatumNested<'a> {
776    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
777        Some(self.cmp(other))
778    }
779}
780
781// Prefer adding new tags to the end of the enum. Certain behavior, like row ordering and EXPLAIN
782// PHYSICAL PLAN, rely on the ordering of this enum. Neither of these are breaking changes, but
783// it's annoying when they change.
784#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
785#[repr(u8)]
786enum Tag {
787    Null,
788    False,
789    True,
790    Int16,
791    Int32,
792    Int64,
793    UInt8,
794    UInt32,
795    Float32,
796    Float64,
797    Date,
798    Time,
799    Timestamp,
800    TimestampTz,
801    Interval,
802    BytesTiny,
803    BytesShort,
804    BytesLong,
805    BytesHuge,
806    StringTiny,
807    StringShort,
808    StringLong,
809    StringHuge,
810    Uuid,
811    Array,
812    ListTiny,
813    ListShort,
814    ListLong,
815    ListHuge,
816    Dict,
817    JsonNull,
818    Dummy,
819    Numeric,
820    UInt16,
821    UInt64,
822    MzTimestamp,
823    Range,
824    MzAclItem,
825    AclItem,
826    // Everything except leap seconds and times beyond the range of
827    // i64 nanoseconds. (Note that Materialize does not support leap
828    // seconds, but this module does).
829    CheapTimestamp,
830    // Everything except leap seconds and times beyond the range of
831    // i64 nanoseconds. (Note that Materialize does not support leap
832    // seconds, but this module does).
833    CheapTimestampTz,
834    // The next several tags are for variable-length signed integer encoding.
835    // The basic idea is that `NonNegativeIntN_K` is used to encode a datum of type
836    // IntN whose actual value is positive or zero and fits in K bits, and similarly for
837    // NegativeIntN_K with negative values.
838    //
839    // The order of these tags matters, because we want to be able to choose the
840    // tag for a given datum quickly, with arithmetic, rather than slowly, with a
841    // stack of `if` statements.
842    //
843    // Separate tags for non-negative and negative numbers are used to avoid having to
844    // waste one bit in the actual data space to encode the sign.
845    NonNegativeInt16_0, // i.e., 0
846    NonNegativeInt16_8,
847    NonNegativeInt16_16,
848
849    NonNegativeInt32_0,
850    NonNegativeInt32_8,
851    NonNegativeInt32_16,
852    NonNegativeInt32_24,
853    NonNegativeInt32_32,
854
855    NonNegativeInt64_0,
856    NonNegativeInt64_8,
857    NonNegativeInt64_16,
858    NonNegativeInt64_24,
859    NonNegativeInt64_32,
860    NonNegativeInt64_40,
861    NonNegativeInt64_48,
862    NonNegativeInt64_56,
863    NonNegativeInt64_64,
864
865    NegativeInt16_0, // i.e., -1
866    NegativeInt16_8,
867    NegativeInt16_16,
868
869    NegativeInt32_0,
870    NegativeInt32_8,
871    NegativeInt32_16,
872    NegativeInt32_24,
873    NegativeInt32_32,
874
875    NegativeInt64_0,
876    NegativeInt64_8,
877    NegativeInt64_16,
878    NegativeInt64_24,
879    NegativeInt64_32,
880    NegativeInt64_40,
881    NegativeInt64_48,
882    NegativeInt64_56,
883    NegativeInt64_64,
884
885    // These are like the ones above, but for unsigned types. The
886    // situation is slightly simpler as we don't have negatives.
887    UInt8_0, // i.e., 0
888    UInt8_8,
889
890    UInt16_0,
891    UInt16_8,
892    UInt16_16,
893
894    UInt32_0,
895    UInt32_8,
896    UInt32_16,
897    UInt32_24,
898    UInt32_32,
899
900    UInt64_0,
901    UInt64_8,
902    UInt64_16,
903    UInt64_24,
904    UInt64_32,
905    UInt64_40,
906    UInt64_48,
907    UInt64_56,
908    UInt64_64,
909}
910
911impl Tag {
912    fn actual_int_length(self) -> Option<usize> {
913        use Tag::*;
914        let val = match self {
915            NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
916            | UInt32_0 | UInt64_0 => 0,
917            NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
918            | UInt32_8 | UInt64_8 => 1,
919            NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
920            | UInt32_16 | UInt64_16 => 2,
921            NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
922            NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
923            NonNegativeInt64_40 | UInt64_40 => 5,
924            NonNegativeInt64_48 | UInt64_48 => 6,
925            NonNegativeInt64_56 | UInt64_56 => 7,
926            NonNegativeInt64_64 | UInt64_64 => 8,
927            NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
928            NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
929            NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
930            NegativeInt32_24 | NegativeInt64_24 => 3,
931            NegativeInt32_32 | NegativeInt64_32 => 4,
932            NegativeInt64_40 => 5,
933            NegativeInt64_48 => 6,
934            NegativeInt64_56 => 7,
935            NegativeInt64_64 => 8,
936
937            _ => return None,
938        };
939        Some(val)
940    }
941}
942
943// --------------------------------------------------------------------------------
944// reading data
945
946/// Read a byte slice starting at byte `offset`.
947///
948/// Updates `offset` to point to the first byte after the end of the read region.
949fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
950    let len = u64::from_le_bytes(read_byte_array(data));
951    let len = usize::cast_from(len);
952    let (bytes, next) = data.split_at(len);
953    *data = next;
954    bytes
955}
956
957/// Read a data whose length is encoded in the row before its contents.
958///
959/// Updates `offset` to point to the first byte after the end of the read region.
960///
961/// # Safety
962///
963/// This function is safe if the datum's length and contents were previously written by `push_lengthed_bytes`,
964/// and it was only written with a `String` tag if it was indeed UTF-8.
965unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
966    let len = match tag {
967        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
968        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
969            usize::from(u16::from_le_bytes(read_byte_array(data)))
970        }
971        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
972            usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
973        }
974        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
975            usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
976        }
977        _ => unreachable!(),
978    };
979    let (bytes, next) = data.split_at(len);
980    *data = next;
981    match tag {
982        Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
983        Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
984            Datum::String(str::from_utf8_unchecked(bytes))
985        }
986        Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
987            Datum::List(DatumList { data: bytes })
988        }
989        _ => unreachable!(),
990    }
991}
992
993fn read_byte(data: &mut &[u8]) -> u8 {
994    let byte = data[0];
995    *data = &data[1..];
996    byte
997}
998
999/// Read `length` bytes from `data` at `offset`, updating the
1000/// latter. Extend the resulting buffer to an array of `N` bytes by
1001/// inserting `FILL` in the k most significant bytes, where k = N - length.
1002///
1003/// SAFETY:
1004///   * length <= N
1005///   * offset + length <= data.len()
1006fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1007    data: &mut &[u8],
1008    length: usize,
1009) -> [u8; N] {
1010    let mut raw = [FILL; N];
1011    let (prev, next) = data.split_at(length);
1012    (raw[..prev.len()]).copy_from_slice(prev);
1013    *data = next;
1014    raw
1015}
1016/// Read `length` bytes from `data` at `offset`, updating the
1017/// latter. Extend the resulting buffer to a negative `N`-byte
1018/// twos complement integer by filling the remaining bits with 1.
1019///
1020/// SAFETY:
1021///   * length <= N
1022///   * offset + length <= data.len()
1023fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1024    read_byte_array_sign_extending::<N, 255>(data, length)
1025}
1026
1027/// Read `length` bytes from `data` at `offset`, updating the
1028/// latter. Extend the resulting buffer to a positive or zero `N`-byte
1029/// twos complement integer by filling the remaining bits with 0.
1030///
1031/// SAFETY:
1032///   * length <= N
1033///   * offset + length <= data.len()
1034fn read_byte_array_extending_nonnegative<const N: usize>(
1035    data: &mut &[u8],
1036    length: usize,
1037) -> [u8; N] {
1038    read_byte_array_sign_extending::<N, 0>(data, length)
1039}
1040
1041pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1042    let (prev, next) = data.split_first_chunk().unwrap();
1043    *data = next;
1044    *prev
1045}
1046
1047pub(super) fn read_date(data: &mut &[u8]) -> Date {
1048    let days = i32::from_le_bytes(read_byte_array(data));
1049    Date::from_pg_epoch(days).expect("unexpected date")
1050}
1051
1052pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1053    let year = i32::from_le_bytes(read_byte_array(data));
1054    let ordinal = u32::from_le_bytes(read_byte_array(data));
1055    NaiveDate::from_yo_opt(year, ordinal).unwrap()
1056}
1057
1058pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1059    let secs = u32::from_le_bytes(read_byte_array(data));
1060    let nanos = u32::from_le_bytes(read_byte_array(data));
1061    NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1062}
1063
1064/// Read a datum starting at byte `offset`.
1065///
1066/// Updates `offset` to point to the first byte after the end of the read region.
1067///
1068/// # Safety
1069///
1070/// This function is safe if a `Datum` was previously written at this offset by `push_datum`.
1071/// Otherwise it could return invalid values, which is Undefined Behavior.
1072pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1073    let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1074    match tag {
1075        Tag::Null => Datum::Null,
1076        Tag::False => Datum::False,
1077        Tag::True => Datum::True,
1078        Tag::UInt8_0 | Tag::UInt8_8 => {
1079            let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1080                data,
1081                tag.actual_int_length()
1082                    .expect("returns a value for variable-length-encoded integer tags"),
1083            ));
1084            Datum::UInt8(i)
1085        }
1086        Tag::Int16 => {
1087            let i = i16::from_le_bytes(read_byte_array(data));
1088            Datum::Int16(i)
1089        }
1090        Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1091            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1092            // and `data` is big enough because it was encoded validly. These assumptions
1093            // are checked in debug asserts.
1094            let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1095                data,
1096                tag.actual_int_length()
1097                    .expect("returns a value for variable-length-encoded integer tags"),
1098            ));
1099            Datum::Int16(i)
1100        }
1101        Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1102            let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1103                data,
1104                tag.actual_int_length()
1105                    .expect("returns a value for variable-length-encoded integer tags"),
1106            ));
1107            Datum::UInt16(i)
1108        }
1109        Tag::Int32 => {
1110            let i = i32::from_le_bytes(read_byte_array(data));
1111            Datum::Int32(i)
1112        }
1113        Tag::NonNegativeInt32_0
1114        | Tag::NonNegativeInt32_32
1115        | Tag::NonNegativeInt32_8
1116        | Tag::NonNegativeInt32_16
1117        | Tag::NonNegativeInt32_24 => {
1118            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1119            // and `data` is big enough because it was encoded validly. These assumptions
1120            // are checked in debug asserts.
1121            let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1122                data,
1123                tag.actual_int_length()
1124                    .expect("returns a value for variable-length-encoded integer tags"),
1125            ));
1126            Datum::Int32(i)
1127        }
1128        Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1129            let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1130                data,
1131                tag.actual_int_length()
1132                    .expect("returns a value for variable-length-encoded integer tags"),
1133            ));
1134            Datum::UInt32(i)
1135        }
1136        Tag::Int64 => {
1137            let i = i64::from_le_bytes(read_byte_array(data));
1138            Datum::Int64(i)
1139        }
1140        Tag::NonNegativeInt64_0
1141        | Tag::NonNegativeInt64_64
1142        | Tag::NonNegativeInt64_8
1143        | Tag::NonNegativeInt64_16
1144        | Tag::NonNegativeInt64_24
1145        | Tag::NonNegativeInt64_32
1146        | Tag::NonNegativeInt64_40
1147        | Tag::NonNegativeInt64_48
1148        | Tag::NonNegativeInt64_56 => {
1149            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1150            // and `data` is big enough because it was encoded validly. These assumptions
1151            // are checked in debug asserts.
1152
1153            let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1154                data,
1155                tag.actual_int_length()
1156                    .expect("returns a value for variable-length-encoded integer tags"),
1157            ));
1158            Datum::Int64(i)
1159        }
1160        Tag::UInt64_0
1161        | Tag::UInt64_8
1162        | Tag::UInt64_16
1163        | Tag::UInt64_24
1164        | Tag::UInt64_32
1165        | Tag::UInt64_40
1166        | Tag::UInt64_48
1167        | Tag::UInt64_56
1168        | Tag::UInt64_64 => {
1169            let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1170                data,
1171                tag.actual_int_length()
1172                    .expect("returns a value for variable-length-encoded integer tags"),
1173            ));
1174            Datum::UInt64(i)
1175        }
1176        Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1177            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1178            // and `data` is big enough because it was encoded validly. These assumptions
1179            // are checked in debug asserts.
1180            let i = i16::from_le_bytes(read_byte_array_extending_negative(
1181                data,
1182                tag.actual_int_length()
1183                    .expect("returns a value for variable-length-encoded integer tags"),
1184            ));
1185            Datum::Int16(i)
1186        }
1187        Tag::NegativeInt32_0
1188        | Tag::NegativeInt32_32
1189        | Tag::NegativeInt32_8
1190        | Tag::NegativeInt32_16
1191        | Tag::NegativeInt32_24 => {
1192            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1193            // and `data` is big enough because it was encoded validly. These assumptions
1194            // are checked in debug asserts.
1195            let i = i32::from_le_bytes(read_byte_array_extending_negative(
1196                data,
1197                tag.actual_int_length()
1198                    .expect("returns a value for variable-length-encoded integer tags"),
1199            ));
1200            Datum::Int32(i)
1201        }
1202        Tag::NegativeInt64_0
1203        | Tag::NegativeInt64_64
1204        | Tag::NegativeInt64_8
1205        | Tag::NegativeInt64_16
1206        | Tag::NegativeInt64_24
1207        | Tag::NegativeInt64_32
1208        | Tag::NegativeInt64_40
1209        | Tag::NegativeInt64_48
1210        | Tag::NegativeInt64_56 => {
1211            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1212            // and `data` is big enough because the row was encoded validly. These assumptions
1213            // are checked in debug asserts.
1214            let i = i64::from_le_bytes(read_byte_array_extending_negative(
1215                data,
1216                tag.actual_int_length()
1217                    .expect("returns a value for variable-length-encoded integer tags"),
1218            ));
1219            Datum::Int64(i)
1220        }
1221
1222        Tag::UInt8 => {
1223            let i = u8::from_le_bytes(read_byte_array(data));
1224            Datum::UInt8(i)
1225        }
1226        Tag::UInt16 => {
1227            let i = u16::from_le_bytes(read_byte_array(data));
1228            Datum::UInt16(i)
1229        }
1230        Tag::UInt32 => {
1231            let i = u32::from_le_bytes(read_byte_array(data));
1232            Datum::UInt32(i)
1233        }
1234        Tag::UInt64 => {
1235            let i = u64::from_le_bytes(read_byte_array(data));
1236            Datum::UInt64(i)
1237        }
1238        Tag::Float32 => {
1239            let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1240            Datum::Float32(OrderedFloat::from(f))
1241        }
1242        Tag::Float64 => {
1243            let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1244            Datum::Float64(OrderedFloat::from(f))
1245        }
1246        Tag::Date => Datum::Date(read_date(data)),
1247        Tag::Time => Datum::Time(read_time(data)),
1248        Tag::CheapTimestamp => {
1249            let ts = i64::from_le_bytes(read_byte_array(data));
1250            let secs = ts.div_euclid(1_000_000_000);
1251            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1252            let ndt = DateTime::from_timestamp(secs, nsecs)
1253                .expect("We only write round-trippable timestamps")
1254                .naive_utc();
1255            Datum::Timestamp(
1256                CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1257            )
1258        }
1259        Tag::CheapTimestampTz => {
1260            let ts = i64::from_le_bytes(read_byte_array(data));
1261            let secs = ts.div_euclid(1_000_000_000);
1262            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1263            let dt = DateTime::from_timestamp(secs, nsecs)
1264                .expect("We only write round-trippable timestamps");
1265            Datum::TimestampTz(
1266                CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1267            )
1268        }
1269        Tag::Timestamp => {
1270            let date = read_naive_date(data);
1271            let time = read_time(data);
1272            Datum::Timestamp(
1273                CheckedTimestamp::from_timestamplike(date.and_time(time))
1274                    .expect("unexpected timestamp"),
1275            )
1276        }
1277        Tag::TimestampTz => {
1278            let date = read_naive_date(data);
1279            let time = read_time(data);
1280            Datum::TimestampTz(
1281                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1282                    date.and_time(time),
1283                    Utc,
1284                ))
1285                .expect("unexpected timestamptz"),
1286            )
1287        }
1288        Tag::Interval => {
1289            let months = i32::from_le_bytes(read_byte_array(data));
1290            let days = i32::from_le_bytes(read_byte_array(data));
1291            let micros = i64::from_le_bytes(read_byte_array(data));
1292            Datum::Interval(Interval {
1293                months,
1294                days,
1295                micros,
1296            })
1297        }
1298        Tag::BytesTiny
1299        | Tag::BytesShort
1300        | Tag::BytesLong
1301        | Tag::BytesHuge
1302        | Tag::StringTiny
1303        | Tag::StringShort
1304        | Tag::StringLong
1305        | Tag::StringHuge
1306        | Tag::ListTiny
1307        | Tag::ListShort
1308        | Tag::ListLong
1309        | Tag::ListHuge => read_lengthed_datum(data, tag),
1310        Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1311        Tag::Array => {
1312            // See the comment in `Row::push_array` for details on the encoding
1313            // of arrays.
1314            let ndims = read_byte(data);
1315            let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1316            let (dims, next) = data.split_at(dims_size);
1317            *data = next;
1318            let bytes = read_untagged_bytes(data);
1319            Datum::Array(Array {
1320                dims: ArrayDimensions { data: dims },
1321                elements: DatumList { data: bytes },
1322            })
1323        }
1324        Tag::Dict => {
1325            let bytes = read_untagged_bytes(data);
1326            Datum::Map(DatumMap { data: bytes })
1327        }
1328        Tag::JsonNull => Datum::JsonNull,
1329        Tag::Dummy => Datum::Dummy,
1330        Tag::Numeric => {
1331            let digits = read_byte(data).into();
1332            let exponent = i8::reinterpret_cast(read_byte(data));
1333            let bits = read_byte(data);
1334
1335            let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1336            let lsu_u8_len = lsu_u16_len * 2;
1337            let (lsu_u8, next) = data.split_at(lsu_u8_len);
1338            *data = next;
1339
1340            // TODO: if we refactor the decimal library to accept the owned
1341            // array as a parameter to `from_raw_parts` below, we could likely
1342            // avoid a copy because it is exactly the value we want
1343            let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1344            for (i, c) in lsu_u8.chunks(2).enumerate() {
1345                lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1346            }
1347
1348            let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1349            Datum::from(d)
1350        }
1351        Tag::MzTimestamp => {
1352            let t = Timestamp::decode(read_byte_array(data));
1353            Datum::MzTimestamp(t)
1354        }
1355        Tag::Range => {
1356            // See notes on `push_range_with` for details about encoding.
1357            let flag_byte = read_byte(data);
1358            let flags = range::InternalFlags::from_bits(flag_byte)
1359                .expect("range flags must be encoded validly");
1360
1361            if flags.contains(range::InternalFlags::EMPTY) {
1362                assert!(
1363                    flags == range::InternalFlags::EMPTY,
1364                    "empty ranges contain only RANGE_EMPTY flag"
1365                );
1366
1367                return Datum::Range(Range { inner: None });
1368            }
1369
1370            let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1371                None
1372            } else {
1373                Some(DatumNested::extract(data))
1374            };
1375
1376            let lower = RangeBound {
1377                inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1378                bound: lower_bound,
1379            };
1380
1381            let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1382                None
1383            } else {
1384                Some(DatumNested::extract(data))
1385            };
1386
1387            let upper = RangeBound {
1388                inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1389                bound: upper_bound,
1390            };
1391
1392            Datum::Range(Range {
1393                inner: Some(RangeInner { lower, upper }),
1394            })
1395        }
1396        Tag::MzAclItem => {
1397            const N: usize = MzAclItem::binary_size();
1398            let mz_acl_item =
1399                MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1400            Datum::MzAclItem(mz_acl_item)
1401        }
1402        Tag::AclItem => {
1403            const N: usize = AclItem::binary_size();
1404            let acl_item =
1405                AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1406            Datum::AclItem(acl_item)
1407        }
1408    }
1409}
1410
1411// --------------------------------------------------------------------------------
1412// writing data
1413
1414fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1415where
1416    D: Vector<u8>,
1417{
1418    let len = u64::cast_from(bytes.len());
1419    data.extend_from_slice(&len.to_le_bytes());
1420    data.extend_from_slice(bytes);
1421}
1422
1423fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1424where
1425    D: Vector<u8>,
1426{
1427    match tag {
1428        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1429            let len = bytes.len().to_le_bytes();
1430            data.push(len[0]);
1431        }
1432        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1433            let len = bytes.len().to_le_bytes();
1434            data.extend_from_slice(&len[0..2]);
1435        }
1436        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1437            let len = bytes.len().to_le_bytes();
1438            data.extend_from_slice(&len[0..4]);
1439        }
1440        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1441            let len = bytes.len().to_le_bytes();
1442            data.extend_from_slice(&len);
1443        }
1444        _ => unreachable!(),
1445    }
1446    data.extend_from_slice(bytes);
1447}
1448
1449pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1450    i32::to_le_bytes(date.pg_epoch_days())
1451}
1452
1453fn push_date<D>(data: &mut D, date: Date)
1454where
1455    D: Vector<u8>,
1456{
1457    data.extend_from_slice(&date_to_array(date));
1458}
1459
1460pub(super) fn naive_date_to_arrays(
1461    date: NaiveDate,
1462) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1463    (
1464        i32::to_le_bytes(date.year()),
1465        u32::to_le_bytes(date.ordinal()),
1466    )
1467}
1468
1469fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1470where
1471    D: Vector<u8>,
1472{
1473    let (ds1, ds2) = naive_date_to_arrays(date);
1474    data.extend_from_slice(&ds1);
1475    data.extend_from_slice(&ds2);
1476}
1477
1478pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1479    (
1480        u32::to_le_bytes(time.num_seconds_from_midnight()),
1481        u32::to_le_bytes(time.nanosecond()),
1482    )
1483}
1484
1485fn push_time<D>(data: &mut D, time: NaiveTime)
1486where
1487    D: Vector<u8>,
1488{
1489    let (ts1, ts2) = time_to_arrays(time);
1490    data.extend_from_slice(&ts1);
1491    data.extend_from_slice(&ts2);
1492}
1493
1494/// Returns an i64 representing a `NaiveDateTime`, if
1495/// said i64 can be round-tripped back to a `NaiveDateTime`.
1496///
1497/// The only exotic NDTs for which this can't happen are those that
1498/// are hundreds of years in the future or past, or those that
1499/// represent a leap second. (Note that Materialize does not support
1500/// leap seconds, but this module does).
1501// This function is inspired by `NaiveDateTime::timestamp_nanos`,
1502// with extra checking.
1503fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1504    let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1505    if subsec_nanos >= 1_000_000_000 {
1506        return None;
1507    }
1508    let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1509    as_ns.checked_add(i64::from(subsec_nanos))
1510}
1511
1512// This function is extremely hot, so
1513// we just use `as` to avoid the overhead of
1514// `try_into` followed by `unwrap`.
1515// `leading_ones` and `leading_zeros`
1516// can never return values greater than 64, so the conversion is safe.
1517#[inline(always)]
1518#[allow(clippy::as_conversions)]
1519fn min_bytes_signed<T>(i: T) -> u8
1520where
1521    T: Into<i64>,
1522{
1523    let i: i64 = i.into();
1524
1525    // To fit in n bytes, we require that
1526    // everything but the leading sign bits fits in n*8
1527    // bits.
1528    let n_sign_bits = if i.is_negative() {
1529        i.leading_ones() as u8
1530    } else {
1531        i.leading_zeros() as u8
1532    };
1533
1534    (64 - n_sign_bits + 7) / 8
1535}
1536
1537// In principle we could just use `min_bytes_signed`, rather than
1538// having a separate function here, as long as we made that one take
1539// `T: Into<i128>` instead of 64. But LLVM doesn't seem smart enough
1540// to realize that that function is the same as the current version,
1541// and generates worse code.
1542//
1543// Justification for `as` is the same as in `min_bytes_signed`.
1544#[inline(always)]
1545#[allow(clippy::as_conversions)]
1546fn min_bytes_unsigned<T>(i: T) -> u8
1547where
1548    T: Into<u64>,
1549{
1550    let i: u64 = i.into();
1551
1552    let n_sign_bits = i.leading_zeros() as u8;
1553
1554    (64 - n_sign_bits + 7) / 8
1555}
1556
1557const TINY: usize = 1 << 8;
1558const SHORT: usize = 1 << 16;
1559const LONG: usize = 1 << 32;
1560
1561fn push_datum<D>(data: &mut D, datum: Datum)
1562where
1563    D: Vector<u8>,
1564{
1565    match datum {
1566        Datum::Null => data.push(Tag::Null.into()),
1567        Datum::False => data.push(Tag::False.into()),
1568        Datum::True => data.push(Tag::True.into()),
1569        Datum::Int16(i) => {
1570            let mbs = min_bytes_signed(i);
1571            let tag = u8::from(if i.is_negative() {
1572                Tag::NegativeInt16_0
1573            } else {
1574                Tag::NonNegativeInt16_0
1575            }) + mbs;
1576
1577            data.push(tag);
1578            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1579        }
1580        Datum::Int32(i) => {
1581            let mbs = min_bytes_signed(i);
1582            let tag = u8::from(if i.is_negative() {
1583                Tag::NegativeInt32_0
1584            } else {
1585                Tag::NonNegativeInt32_0
1586            }) + mbs;
1587
1588            data.push(tag);
1589            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1590        }
1591        Datum::Int64(i) => {
1592            let mbs = min_bytes_signed(i);
1593            let tag = u8::from(if i.is_negative() {
1594                Tag::NegativeInt64_0
1595            } else {
1596                Tag::NonNegativeInt64_0
1597            }) + mbs;
1598
1599            data.push(tag);
1600            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1601        }
1602        Datum::UInt8(i) => {
1603            let mbu = min_bytes_unsigned(i);
1604            let tag = u8::from(Tag::UInt8_0) + mbu;
1605            data.push(tag);
1606            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1607        }
1608        Datum::UInt16(i) => {
1609            let mbu = min_bytes_unsigned(i);
1610            let tag = u8::from(Tag::UInt16_0) + mbu;
1611            data.push(tag);
1612            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1613        }
1614        Datum::UInt32(i) => {
1615            let mbu = min_bytes_unsigned(i);
1616            let tag = u8::from(Tag::UInt32_0) + mbu;
1617            data.push(tag);
1618            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1619        }
1620        Datum::UInt64(i) => {
1621            let mbu = min_bytes_unsigned(i);
1622            let tag = u8::from(Tag::UInt64_0) + mbu;
1623            data.push(tag);
1624            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1625        }
1626        Datum::Float32(f) => {
1627            data.push(Tag::Float32.into());
1628            data.extend_from_slice(&f.to_bits().to_le_bytes());
1629        }
1630        Datum::Float64(f) => {
1631            data.push(Tag::Float64.into());
1632            data.extend_from_slice(&f.to_bits().to_le_bytes());
1633        }
1634        Datum::Date(d) => {
1635            data.push(Tag::Date.into());
1636            push_date(data, d);
1637        }
1638        Datum::Time(t) => {
1639            data.push(Tag::Time.into());
1640            push_time(data, t);
1641        }
1642        Datum::Timestamp(t) => {
1643            let datetime = t.to_naive();
1644            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1645                data.push(Tag::CheapTimestamp.into());
1646                data.extend_from_slice(&nanos.to_le_bytes());
1647            } else {
1648                data.push(Tag::Timestamp.into());
1649                push_naive_date(data, datetime.date());
1650                push_time(data, datetime.time());
1651            }
1652        }
1653        Datum::TimestampTz(t) => {
1654            let datetime = t.to_naive();
1655            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1656                data.push(Tag::CheapTimestampTz.into());
1657                data.extend_from_slice(&nanos.to_le_bytes());
1658            } else {
1659                data.push(Tag::TimestampTz.into());
1660                push_naive_date(data, datetime.date());
1661                push_time(data, datetime.time());
1662            }
1663        }
1664        Datum::Interval(i) => {
1665            data.push(Tag::Interval.into());
1666            data.extend_from_slice(&i.months.to_le_bytes());
1667            data.extend_from_slice(&i.days.to_le_bytes());
1668            data.extend_from_slice(&i.micros.to_le_bytes());
1669        }
1670        Datum::Bytes(bytes) => {
1671            let tag = match bytes.len() {
1672                0..TINY => Tag::BytesTiny,
1673                TINY..SHORT => Tag::BytesShort,
1674                SHORT..LONG => Tag::BytesLong,
1675                _ => Tag::BytesHuge,
1676            };
1677            data.push(tag.into());
1678            push_lengthed_bytes(data, bytes, tag);
1679        }
1680        Datum::String(string) => {
1681            let tag = match string.len() {
1682                0..TINY => Tag::StringTiny,
1683                TINY..SHORT => Tag::StringShort,
1684                SHORT..LONG => Tag::StringLong,
1685                _ => Tag::StringHuge,
1686            };
1687            data.push(tag.into());
1688            push_lengthed_bytes(data, string.as_bytes(), tag);
1689        }
1690        Datum::List(list) => {
1691            let tag = match list.data.len() {
1692                0..TINY => Tag::ListTiny,
1693                TINY..SHORT => Tag::ListShort,
1694                SHORT..LONG => Tag::ListLong,
1695                _ => Tag::ListHuge,
1696            };
1697            data.push(tag.into());
1698            push_lengthed_bytes(data, list.data, tag);
1699        }
1700        Datum::Uuid(u) => {
1701            data.push(Tag::Uuid.into());
1702            data.extend_from_slice(u.as_bytes());
1703        }
1704        Datum::Array(array) => {
1705            // See the comment in `Row::push_array` for details on the encoding
1706            // of arrays.
1707            data.push(Tag::Array.into());
1708            data.push(array.dims.ndims());
1709            data.extend_from_slice(array.dims.data);
1710            push_untagged_bytes(data, array.elements.data);
1711        }
1712        Datum::Map(dict) => {
1713            data.push(Tag::Dict.into());
1714            push_untagged_bytes(data, dict.data);
1715        }
1716        Datum::JsonNull => data.push(Tag::JsonNull.into()),
1717        Datum::MzTimestamp(t) => {
1718            data.push(Tag::MzTimestamp.into());
1719            data.extend_from_slice(&t.encode());
1720        }
1721        Datum::Dummy => data.push(Tag::Dummy.into()),
1722        Datum::Numeric(mut n) => {
1723            // Pseudo-canonical representation of decimal values with
1724            // insignificant zeroes trimmed. This compresses the number further
1725            // than `Numeric::trim` by removing all zeroes, and not only those in
1726            // the fractional component.
1727            numeric::cx_datum().reduce(&mut n.0);
1728            let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1729            data.push(Tag::Numeric.into());
1730            data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1731            data.push(
1732                i8::try_from(exponent)
1733                    .expect("exponent to fit within i8; should not exceed +/- 39")
1734                    .to_le_bytes()[0],
1735            );
1736            data.push(bits);
1737
1738            let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1739
1740            // Little endian machines can take the lsu directly from u16 to u8.
1741            if cfg!(target_endian = "little") {
1742                // SAFETY: `lsu` (returned by `coefficient_units()`) is a `&[u16]`, so
1743                // each element can safely be transmuted into two `u8`s.
1744                let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
1745                // The `u8` aligned version of the `lsu` should have twice as many
1746                // elements as we expect for the `u16` version.
1747                soft_assert_no_log!(
1748                    lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
1749                    "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
1750                    Numeric::digits_to_lsu_elements_len(digits) * 2,
1751                    lsu_bytes.len()
1752                );
1753                // There should be no unaligned elements in the prefix or suffix.
1754                soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
1755                data.extend_from_slice(lsu_bytes);
1756            } else {
1757                for u in lsu {
1758                    data.extend_from_slice(&u.to_le_bytes());
1759                }
1760            }
1761        }
1762        Datum::Range(range) => {
1763            // See notes on `push_range_with` for details about encoding.
1764            data.push(Tag::Range.into());
1765            data.push(range.internal_flag_bits());
1766
1767            if let Some(RangeInner { lower, upper }) = range.inner {
1768                for bound in [lower.bound, upper.bound] {
1769                    if let Some(bound) = bound {
1770                        match bound.datum() {
1771                            Datum::Null => panic!("cannot push Datum::Null into range"),
1772                            d => push_datum::<D>(data, d),
1773                        }
1774                    }
1775                }
1776            }
1777        }
1778        Datum::MzAclItem(mz_acl_item) => {
1779            data.push(Tag::MzAclItem.into());
1780            data.extend_from_slice(&mz_acl_item.encode_binary());
1781        }
1782        Datum::AclItem(acl_item) => {
1783            data.push(Tag::AclItem.into());
1784            data.extend_from_slice(&acl_item.encode_binary());
1785        }
1786    }
1787}
1788
1789/// Return the number of bytes these Datums would use if packed as a Row.
1790pub fn row_size<'a, I>(a: I) -> usize
1791where
1792    I: IntoIterator<Item = Datum<'a>>,
1793{
1794    // Using datums_size instead of a.data().len() here is safer because it will
1795    // return the size of the datums if they were packed into a Row. Although
1796    // a.data().len() happens to give the correct answer (and is faster), data()
1797    // is documented as for debugging only.
1798    let sz = datums_size::<_, _>(a);
1799    let size_of_row = std::mem::size_of::<Row>();
1800    // The Row struct attempts to inline data until it can't fit in the
1801    // preallocated size. Otherwise it spills to heap, and uses the Row to point
1802    // to that.
1803    if sz > Row::SIZE {
1804        sz + size_of_row
1805    } else {
1806        size_of_row
1807    }
1808}
1809
1810/// Number of bytes required by the datum.
1811/// This is used to optimistically pre-allocate buffers for packing rows.
1812pub fn datum_size(datum: &Datum) -> usize {
1813    match datum {
1814        Datum::Null => 1,
1815        Datum::False => 1,
1816        Datum::True => 1,
1817        Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
1818        Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
1819        Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
1820        Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1821        Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1822        Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1823        Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1824        Datum::Float32(_) => 1 + size_of::<f32>(),
1825        Datum::Float64(_) => 1 + size_of::<f64>(),
1826        Datum::Date(_) => 1 + size_of::<i32>(),
1827        Datum::Time(_) => 1 + 8,
1828        Datum::Timestamp(t) => {
1829            1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
1830                8
1831            } else {
1832                16
1833            }
1834        }
1835        Datum::TimestampTz(t) => {
1836            1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
1837                8
1838            } else {
1839                16
1840            }
1841        }
1842        Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
1843        Datum::Bytes(bytes) => {
1844            // We use a variable length representation of slice length.
1845            let bytes_for_length = match bytes.len() {
1846                0..TINY => 1,
1847                TINY..SHORT => 2,
1848                SHORT..LONG => 4,
1849                _ => 8,
1850            };
1851            1 + bytes_for_length + bytes.len()
1852        }
1853        Datum::String(string) => {
1854            // We use a variable length representation of slice length.
1855            let bytes_for_length = match string.len() {
1856                0..TINY => 1,
1857                TINY..SHORT => 2,
1858                SHORT..LONG => 4,
1859                _ => 8,
1860            };
1861            1 + bytes_for_length + string.len()
1862        }
1863        Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
1864        Datum::Array(array) => {
1865            1 + size_of::<u8>()
1866                + array.dims.data.len()
1867                + size_of::<u64>()
1868                + array.elements.data.len()
1869        }
1870        Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
1871        Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
1872        Datum::JsonNull => 1,
1873        Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
1874        Datum::Dummy => 1,
1875        Datum::Numeric(d) => {
1876            let mut d = d.0.clone();
1877            // Values must be reduced to determine appropriate number of
1878            // coefficient units.
1879            numeric::cx_datum().reduce(&mut d);
1880            // 4 = 1 bit each for tag, digits, exponent, bits
1881            4 + (d.coefficient_units().len() * 2)
1882        }
1883        Datum::Range(Range { inner }) => {
1884            // Tag + flags
1885            2 + match inner {
1886                None => 0,
1887                Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
1888                    .iter()
1889                    .map(|bound| match bound {
1890                        None => 0,
1891                        Some(bound) => bound.val.len(),
1892                    })
1893                    .sum(),
1894            }
1895        }
1896        Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
1897        Datum::AclItem(_) => 1 + AclItem::binary_size(),
1898    }
1899}
1900
1901/// Number of bytes required by a sequence of datums.
1902///
1903/// This method can be used to right-size the allocation for a `Row`
1904/// before calling [`RowPacker::extend`].
1905pub fn datums_size<'a, I, D>(iter: I) -> usize
1906where
1907    I: IntoIterator<Item = D>,
1908    D: Borrow<Datum<'a>>,
1909{
1910    iter.into_iter().map(|d| datum_size(d.borrow())).sum()
1911}
1912
1913/// Number of bytes required by a list of datums. This computes the size that would be required if
1914/// the given datums were packed into a list.
1915///
1916/// This is used to optimistically pre-allocate buffers for packing rows.
1917pub fn datum_list_size<'a, I, D>(iter: I) -> usize
1918where
1919    I: IntoIterator<Item = D>,
1920    D: Borrow<Datum<'a>>,
1921{
1922    1 + size_of::<u64>() + datums_size(iter)
1923}
1924
1925impl RowPacker<'_> {
1926    /// Constructs a row packer that will pack additional datums into the
1927    /// provided row.
1928    ///
1929    /// This function is intentionally somewhat inconvenient to call. You
1930    /// usually want to call [`Row::packer`] instead to start packing from
1931    /// scratch.
1932    pub fn for_existing_row(row: &mut Row) -> RowPacker {
1933        RowPacker { row }
1934    }
1935
1936    /// Extend an existing `Row` with a `Datum`.
1937    #[inline]
1938    pub fn push<'a, D>(&mut self, datum: D)
1939    where
1940        D: Borrow<Datum<'a>>,
1941    {
1942        push_datum(&mut self.row.data, *datum.borrow());
1943    }
1944
1945    /// Extend an existing `Row` with additional `Datum`s.
1946    #[inline]
1947    pub fn extend<'a, I, D>(&mut self, iter: I)
1948    where
1949        I: IntoIterator<Item = D>,
1950        D: Borrow<Datum<'a>>,
1951    {
1952        for datum in iter {
1953            push_datum(&mut self.row.data, *datum.borrow())
1954        }
1955    }
1956
1957    /// Extend an existing `Row` with additional `Datum`s.
1958    ///
1959    /// In the case the iterator produces an error, the pushing of
1960    /// datums in terminated and the error returned. The `Row` will
1961    /// be incomplete, but it will be safe to read datums from it.
1962    #[inline]
1963    pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
1964    where
1965        I: IntoIterator<Item = Result<D, E>>,
1966        D: Borrow<Datum<'a>>,
1967    {
1968        for datum in iter {
1969            push_datum(&mut self.row.data, *datum?.borrow());
1970        }
1971        Ok(())
1972    }
1973
1974    /// Appends the datums of an entire `Row`.
1975    pub fn extend_by_row(&mut self, row: &Row) {
1976        self.row.data.extend_from_slice(row.data.as_slice());
1977    }
1978
1979    /// Appends the slice of data representing an entire `Row`. The data is not validated.
1980    ///
1981    /// # Safety
1982    ///
1983    /// The requirements from [`Row::from_bytes_unchecked`] apply here, too:
1984    /// This method relies on `data` being an appropriate row encoding, and can
1985    /// result in unsafety if this is not the case.
1986    #[inline]
1987    pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
1988        self.row.data.extend_from_slice(data)
1989    }
1990
1991    /// Pushes a [`DatumList`] that is built from a closure.
1992    ///
1993    /// The supplied closure will be invoked once with a `Row` that can be used
1994    /// to populate the list. It is valid to call any method on the
1995    /// [`RowPacker`] except for [`RowPacker::clear`], [`RowPacker::truncate`],
1996    /// or [`RowPacker::truncate_datums`].
1997    ///
1998    /// Returns the value returned by the closure, if any.
1999    ///
2000    /// ```
2001    /// # use mz_repr::{Row, Datum};
2002    /// let mut row = Row::default();
2003    /// row.packer().push_list_with(|row| {
2004    ///     row.push(Datum::String("age"));
2005    ///     row.push(Datum::Int64(42));
2006    /// });
2007    /// assert_eq!(
2008    ///     row.unpack_first().unwrap_list().iter().collect::<Vec<_>>(),
2009    ///     vec![Datum::String("age"), Datum::Int64(42)],
2010    /// );
2011    /// ```
2012    #[inline]
2013    pub fn push_list_with<F, R>(&mut self, f: F) -> R
2014    where
2015        F: FnOnce(&mut RowPacker) -> R,
2016    {
2017        // First, assume that the list will fit in 255 bytes, and thus the length will fit in
2018        // 1 byte. If not, we'll fix it up later.
2019        let start = self.row.data.len();
2020        self.row.data.push(Tag::ListTiny.into());
2021        // Write a dummy len, will fix it up later.
2022        self.row.data.push(0);
2023
2024        let out = f(self);
2025
2026        // The `- 1 - 1` is for the tag and the len.
2027        let len = self.row.data.len() - start - 1 - 1;
2028        // We now know the real len.
2029        if len < TINY {
2030            // If the len fits in 1 byte, we just need to fix up the len.
2031            self.row.data[start + 1] = len.to_le_bytes()[0];
2032        } else {
2033            // Note: We move this code path into its own function, so that the common case can be
2034            // inlined.
2035            long_list(&mut self.row.data, start, len);
2036        }
2037
2038        /// 1. Fix up the tag.
2039        /// 2. Move the actual data a bit (for which we also need to make room at the end).
2040        /// 3. Fix up the len.
2041        /// `data`: The row's backing data.
2042        /// `start`: where `push_list_with` started writing in `data`.
2043        /// `len`: the length of the data, excluding the tag and the length.
2044        #[cold]
2045        fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2046            // `len_len`: the length of the length. (Possible values are: 2, 4, 8. 1 is handled
2047            // elsewhere.) The other parameters are the same as for `long_list`.
2048            let long_list_inner = |data: &mut CompactBytes, len_len| {
2049                // We'll need memory for the new, bigger length, so make the `CompactBytes` bigger.
2050                // The `- 1` is because the old length was 1 byte.
2051                const ZEROS: [u8; 8] = [0; 8];
2052                data.extend_from_slice(&ZEROS[0..len_len - 1]);
2053                // Move the data to the end of the `CompactBytes`, to make space for the new length.
2054                // Originally, it started after the 1-byte tag and the 1-byte length, now it will
2055                // start after the 1-byte tag and the len_len-byte length.
2056                //
2057                // Note that this is the only operation in `long_list` whose cost is proportional
2058                // to `len`. Since `len` is at least 256 here, the other operations' cost are
2059                // negligible. `copy_within` is a memmove, which is probably a fair bit faster per
2060                // Datum than a Datum encoding in the `f` closure.
2061                data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2062                // Write the new length.
2063                data[start + 1..start + 1 + len_len]
2064                    .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2065            };
2066            match len {
2067                0..TINY => {
2068                    unreachable!()
2069                }
2070                TINY..SHORT => {
2071                    data[start] = Tag::ListShort.into();
2072                    long_list_inner(data, 2);
2073                }
2074                SHORT..LONG => {
2075                    data[start] = Tag::ListLong.into();
2076                    long_list_inner(data, 4);
2077                }
2078                _ => {
2079                    data[start] = Tag::ListHuge.into();
2080                    long_list_inner(data, 8);
2081                }
2082            };
2083        }
2084
2085        out
2086    }
2087
2088    /// Pushes a [`DatumMap`] that is built from a closure.
2089    ///
2090    /// The supplied closure will be invoked once with a `Row` that can be used
2091    /// to populate the dict.
2092    ///
2093    /// The closure **must** alternate pushing string keys and arbitrary values,
2094    /// otherwise reading the dict will cause a panic.
2095    ///
2096    /// The closure **must** push keys in ascending order, otherwise equality
2097    /// checks on the resulting `Row` may be wrong and reading the dict IN DEBUG
2098    /// MODE will cause a panic.
2099    ///
2100    /// The closure **must not** call [`RowPacker::clear`],
2101    /// [`RowPacker::truncate`], or [`RowPacker::truncate_datums`].
2102    ///
2103    /// # Example
2104    ///
2105    /// ```
2106    /// # use mz_repr::{Row, Datum};
2107    /// let mut row = Row::default();
2108    /// row.packer().push_dict_with(|row| {
2109    ///
2110    ///     // key
2111    ///     row.push(Datum::String("age"));
2112    ///     // value
2113    ///     row.push(Datum::Int64(42));
2114    ///
2115    ///     // key
2116    ///     row.push(Datum::String("name"));
2117    ///     // value
2118    ///     row.push(Datum::String("bob"));
2119    /// });
2120    /// assert_eq!(
2121    ///     row.unpack_first().unwrap_map().iter().collect::<Vec<_>>(),
2122    ///     vec![("age", Datum::Int64(42)), ("name", Datum::String("bob"))]
2123    /// );
2124    /// ```
2125    pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2126    where
2127        F: FnOnce(&mut RowPacker) -> R,
2128    {
2129        self.row.data.push(Tag::Dict.into());
2130        let start = self.row.data.len();
2131        // write a dummy len, will fix it up later
2132        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2133
2134        let res = f(self);
2135
2136        let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2137        // fix up the len
2138        self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2139
2140        res
2141    }
2142
2143    /// Convenience function to construct an array from an iter of `Datum`s.
2144    ///
2145    /// Returns an error if the number of elements in `iter` does not match
2146    /// the cardinality of the array as described by `dims`, or if the
2147    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2148    /// occurs, the packer's state will be unchanged.
2149    pub fn try_push_array<'a, I, D>(
2150        &mut self,
2151        dims: &[ArrayDimension],
2152        iter: I,
2153    ) -> Result<(), InvalidArrayError>
2154    where
2155        I: IntoIterator<Item = D>,
2156        D: Borrow<Datum<'a>>,
2157    {
2158        // SAFETY: The function returns the exact number of elements pushed into the array.
2159        unsafe {
2160            self.push_array_with_unchecked(dims, |packer| {
2161                let mut nelements = 0;
2162                for datum in iter {
2163                    packer.push(datum);
2164                    nelements += 1;
2165                }
2166                Ok::<_, InvalidArrayError>(nelements)
2167            })
2168        }
2169    }
2170
2171    /// Convenience function to construct an array from a function. The function must return the
2172    /// number of elements it pushed into the array. It is undefined behavior if the function returns
2173    /// a number different to the number of elements it pushed.
2174    ///
2175    /// Returns an error if the number of elements pushed by `f` does not match
2176    /// the cardinality of the array as described by `dims`, or if the
2177    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`], or if `f` errors. If an error
2178    /// occurs, the packer's state will be unchanged.
2179    pub unsafe fn push_array_with_unchecked<F, E>(
2180        &mut self,
2181        dims: &[ArrayDimension],
2182        f: F,
2183    ) -> Result<(), E>
2184    where
2185        F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2186        E: From<InvalidArrayError>,
2187    {
2188        // Arrays are encoded as follows.
2189        //
2190        // u8    ndims
2191        // u64   dim_0 lower bound
2192        // u64   dim_0 length
2193        // ...
2194        // u64   dim_n lower bound
2195        // u64   dim_n length
2196        // u64   element data size in bytes
2197        // u8    element data, where elements are encoded in row-major order
2198
2199        if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2200            return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2201        }
2202
2203        let start = self.row.data.len();
2204        self.row.data.push(Tag::Array.into());
2205
2206        // Write dimension information.
2207        self.row
2208            .data
2209            .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2210        for dim in dims {
2211            self.row
2212                .data
2213                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2214            self.row
2215                .data
2216                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2217        }
2218
2219        // Write elements.
2220        let off = self.row.data.len();
2221        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2222        let nelements = match f(self) {
2223            Ok(nelements) => nelements,
2224            Err(e) => {
2225                self.row.data.truncate(start);
2226                return Err(e);
2227            }
2228        };
2229        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2230        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2231
2232        // Check that the number of elements written matches the dimension
2233        // information.
2234        let cardinality = match dims {
2235            [] => 0,
2236            dims => dims.iter().map(|d| d.length).product(),
2237        };
2238        if nelements != cardinality {
2239            self.row.data.truncate(start);
2240            return Err(InvalidArrayError::WrongCardinality {
2241                actual: nelements,
2242                expected: cardinality,
2243            }
2244            .into());
2245        }
2246
2247        Ok(())
2248    }
2249
2250    /// Pushes an [`Array`] that is built from a closure.
2251    ///
2252    /// __WARNING__: This is fairly "sharp" tool that is easy to get wrong. You
2253    /// should prefer [`RowPacker::try_push_array`] when possible.
2254    ///
2255    /// Returns an error if the number of elements pushed does not match
2256    /// the cardinality of the array as described by `dims`, or if the
2257    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2258    /// occurs, the packer's state will be unchanged.
2259    pub fn push_array_with_row_major<F, I>(
2260        &mut self,
2261        dims: I,
2262        f: F,
2263    ) -> Result<(), InvalidArrayError>
2264    where
2265        I: IntoIterator<Item = ArrayDimension>,
2266        F: FnOnce(&mut RowPacker) -> usize,
2267    {
2268        let start = self.row.data.len();
2269        self.row.data.push(Tag::Array.into());
2270
2271        // Write dummy dimension length for now, we'll fix it up.
2272        let dims_start = self.row.data.len();
2273        self.row.data.push(42);
2274
2275        let mut num_dims: u8 = 0;
2276        let mut cardinality: usize = 1;
2277        for dim in dims {
2278            num_dims += 1;
2279            cardinality *= dim.length;
2280
2281            self.row
2282                .data
2283                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2284            self.row
2285                .data
2286                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2287        }
2288
2289        if num_dims > MAX_ARRAY_DIMENSIONS {
2290            // Reset the packer state so we don't have invalid data.
2291            self.row.data.truncate(start);
2292            return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2293        }
2294        // Fix up our dimension length.
2295        self.row.data[dims_start..dims_start + size_of::<u8>()]
2296            .copy_from_slice(&num_dims.to_le_bytes());
2297
2298        // Write elements.
2299        let off = self.row.data.len();
2300        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2301
2302        let nelements = f(self);
2303
2304        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2305        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2306
2307        // Check that the number of elements written matches the dimension
2308        // information.
2309        let cardinality = match num_dims {
2310            0 => 0,
2311            _ => cardinality,
2312        };
2313        if nelements != cardinality {
2314            self.row.data.truncate(start);
2315            return Err(InvalidArrayError::WrongCardinality {
2316                actual: nelements,
2317                expected: cardinality,
2318            });
2319        }
2320
2321        Ok(())
2322    }
2323
2324    /// Convenience function to push a `DatumList` from an iter of `Datum`s
2325    ///
2326    /// See [`RowPacker::push_dict_with`] if you need to be able to handle errors
2327    pub fn push_list<'a, I, D>(&mut self, iter: I)
2328    where
2329        I: IntoIterator<Item = D>,
2330        D: Borrow<Datum<'a>>,
2331    {
2332        self.push_list_with(|packer| {
2333            for elem in iter {
2334                packer.push(*elem.borrow())
2335            }
2336        });
2337    }
2338
2339    /// Convenience function to push a `DatumMap` from an iter of `(&str, Datum)` pairs
2340    pub fn push_dict<'a, I, D>(&mut self, iter: I)
2341    where
2342        I: IntoIterator<Item = (&'a str, D)>,
2343        D: Borrow<Datum<'a>>,
2344    {
2345        self.push_dict_with(|packer| {
2346            for (k, v) in iter {
2347                packer.push(Datum::String(k));
2348                packer.push(*v.borrow())
2349            }
2350        })
2351    }
2352
2353    /// Pushes a `Datum::Range` derived from the `Range<Datum<'a>`.
2354    ///
2355    /// # Panics
2356    /// - If lower and upper express finite values and they are datums of
2357    ///   different types.
2358    /// - If lower or upper express finite values and are equal to
2359    ///   `Datum::Null`. To handle `Datum::Null` properly, use
2360    ///   [`RangeBound::new`].
2361    ///
2362    /// # Notes
2363    /// - This function canonicalizes the range before pushing it to the row.
2364    /// - Prefer this function over `push_range_with` because of its
2365    ///   canonicaliztion.
2366    /// - Prefer creating [`RangeBound`]s using [`RangeBound::new`], which
2367    ///   handles `Datum::Null` in a SQL-friendly way.
2368    pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2369        range.canonicalize()?;
2370        match range.inner {
2371            None => {
2372                self.row.data.push(Tag::Range.into());
2373                // Untagged bytes only contains the `RANGE_EMPTY` flag value.
2374                self.row.data.push(range::InternalFlags::EMPTY.bits());
2375                Ok(())
2376            }
2377            Some(inner) => self.push_range_with(
2378                RangeLowerBound {
2379                    inclusive: inner.lower.inclusive,
2380                    bound: inner
2381                        .lower
2382                        .bound
2383                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2384                },
2385                RangeUpperBound {
2386                    inclusive: inner.upper.inclusive,
2387                    bound: inner
2388                        .upper
2389                        .bound
2390                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2391                },
2392            ),
2393        }
2394    }
2395
2396    /// Pushes a `DatumRange` built from the specified arguments.
2397    ///
2398    /// # Warning
2399    /// Unlike `push_range`, `push_range_with` _does not_ canonicalize its
2400    /// inputs. Consequentially, this means it's possible to generate ranges
2401    /// that will not reflect the proper ordering and equality.
2402    ///
2403    /// # Panics
2404    /// - If lower or upper expresses a finite value and does not push exactly
2405    ///   one value into the `RowPacker`.
2406    /// - If lower and upper express finite values and they are datums of
2407    ///   different types.
2408    /// - If lower or upper express finite values and push `Datum::Null`.
2409    ///
2410    /// # Notes
2411    /// - Prefer `push_range_with` over this function. This function should be
2412    ///   used only when you are not pushing `Datum`s to the inner row.
2413    /// - Range encoding is `[<flag bytes>,<lower>?,<upper>?]`, where `lower`
2414    ///   and `upper` are optional, contingent on the flag value expressing an
2415    ///   empty range (where neither will be present) or infinite bounds (where
2416    ///   each infinite bound will be absent).
2417    /// - To push an emtpy range, use `push_range` using `Range { inner: None }`.
2418    pub fn push_range_with<L, U, E>(
2419        &mut self,
2420        lower: RangeLowerBound<L>,
2421        upper: RangeUpperBound<U>,
2422    ) -> Result<(), E>
2423    where
2424        L: FnOnce(&mut RowPacker) -> Result<(), E>,
2425        U: FnOnce(&mut RowPacker) -> Result<(), E>,
2426        E: From<InvalidRangeError>,
2427    {
2428        let start = self.row.data.len();
2429        self.row.data.push(Tag::Range.into());
2430
2431        let mut flags = range::InternalFlags::empty();
2432
2433        flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2434        flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2435        flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2436        flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2437
2438        let mut expected_datums = 0;
2439
2440        self.row.data.push(flags.bits());
2441
2442        let datum_check = self.row.data.len();
2443
2444        if let Some(value) = lower.bound {
2445            let start = self.row.data.len();
2446            value(self)?;
2447            assert!(
2448                start < self.row.data.len(),
2449                "finite values must each push exactly one value; expected 1 but got 0"
2450            );
2451            expected_datums += 1;
2452        }
2453
2454        if let Some(value) = upper.bound {
2455            let start = self.row.data.len();
2456            value(self)?;
2457            assert!(
2458                start < self.row.data.len(),
2459                "finite values must each push exactly one value; expected 1 but got 0"
2460            );
2461            expected_datums += 1;
2462        }
2463
2464        // Validate the invariants that 0, 1, or 2 elements were pushed, none are Null,
2465        // and if two are pushed then the second is not less than the first. Panic in
2466        // some cases and error in others.
2467        let mut actual_datums = 0;
2468        let mut seen = None;
2469        let mut dataz = &self.row.data[datum_check..];
2470        while !dataz.is_empty() {
2471            let d = unsafe { read_datum(&mut dataz) };
2472            assert!(d != Datum::Null, "cannot push Datum::Null into range");
2473
2474            match seen {
2475                None => seen = Some(d),
2476                Some(seen) => {
2477                    let seen_kind = DatumKind::from(seen);
2478                    let d_kind = DatumKind::from(d);
2479                    assert!(
2480                        seen_kind == d_kind,
2481                        "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2482                    );
2483
2484                    if seen > d {
2485                        self.row.data.truncate(start);
2486                        return Err(InvalidRangeError::MisorderedRangeBounds.into());
2487                    }
2488                }
2489            }
2490            actual_datums += 1;
2491        }
2492
2493        assert!(
2494            actual_datums == expected_datums,
2495            "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2496        );
2497
2498        Ok(())
2499    }
2500
2501    /// Clears the contents of the packer without de-allocating its backing memory.
2502    pub fn clear(&mut self) {
2503        self.row.data.clear();
2504    }
2505
2506    /// Truncates the underlying storage to the specified byte position.
2507    ///
2508    /// # Safety
2509    ///
2510    /// `pos` MUST specify a byte offset that lies on a datum boundary.
2511    /// If `pos` specifies a byte offset that is *within* a datum, the row
2512    /// packer will produce an invalid row, the unpacking of which may
2513    /// trigger undefined behavior!
2514    ///
2515    /// To find the byte offset of a datum boundary, inspect the packer's
2516    /// byte length by calling `packer.data().len()` after pushing the desired
2517    /// number of datums onto the packer.
2518    pub unsafe fn truncate(&mut self, pos: usize) {
2519        self.row.data.truncate(pos)
2520    }
2521
2522    /// Truncates the underlying row to contain at most the first `n` datums.
2523    pub fn truncate_datums(&mut self, n: usize) {
2524        let prev_len = self.row.data.len();
2525        let mut iter = self.row.iter();
2526        for _ in iter.by_ref().take(n) {}
2527        let next_len = iter.data.len();
2528        // SAFETY: iterator offsets always lie on a datum boundary.
2529        unsafe { self.truncate(prev_len - next_len) }
2530    }
2531
2532    /// Returns the total amount of bytes used by the underlying row.
2533    pub fn byte_len(&self) -> usize {
2534        self.row.byte_len()
2535    }
2536}
2537
2538impl<'a> IntoIterator for &'a Row {
2539    type Item = Datum<'a>;
2540    type IntoIter = DatumListIter<'a>;
2541    fn into_iter(self) -> DatumListIter<'a> {
2542        self.iter()
2543    }
2544}
2545
2546impl fmt::Debug for Row {
2547    /// Debug representation using the internal datums
2548    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2549        f.write_str("Row{")?;
2550        f.debug_list().entries(self.iter()).finish()?;
2551        f.write_str("}")
2552    }
2553}
2554
2555impl fmt::Display for Row {
2556    /// Display representation using the internal datums
2557    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2558        f.write_str("(")?;
2559        for (i, datum) in self.iter().enumerate() {
2560            if i != 0 {
2561                f.write_str(", ")?;
2562            }
2563            write!(f, "{}", datum)?;
2564        }
2565        f.write_str(")")
2566    }
2567}
2568
2569impl<'a> DatumList<'a> {
2570    pub fn empty() -> DatumList<'static> {
2571        DatumList { data: &[] }
2572    }
2573
2574    pub fn iter(&self) -> DatumListIter<'a> {
2575        DatumListIter { data: self.data }
2576    }
2577
2578    /// For debugging only
2579    pub fn data(&self) -> &'a [u8] {
2580        self.data
2581    }
2582}
2583
2584impl<'a> IntoIterator for &'a DatumList<'a> {
2585    type Item = Datum<'a>;
2586    type IntoIter = DatumListIter<'a>;
2587    fn into_iter(self) -> DatumListIter<'a> {
2588        self.iter()
2589    }
2590}
2591
2592impl<'a> Iterator for DatumListIter<'a> {
2593    type Item = Datum<'a>;
2594    fn next(&mut self) -> Option<Self::Item> {
2595        if self.data.is_empty() {
2596            None
2597        } else {
2598            Some(unsafe { read_datum(&mut self.data) })
2599        }
2600    }
2601}
2602
2603impl<'a> DatumMap<'a> {
2604    pub fn empty() -> DatumMap<'static> {
2605        DatumMap { data: &[] }
2606    }
2607
2608    pub fn iter(&self) -> DatumDictIter<'a> {
2609        DatumDictIter {
2610            data: self.data,
2611            prev_key: None,
2612        }
2613    }
2614
2615    /// For debugging only
2616    pub fn data(&self) -> &'a [u8] {
2617        self.data
2618    }
2619}
2620
2621impl<'a> Debug for DatumMap<'a> {
2622    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2623        f.debug_map().entries(self.iter()).finish()
2624    }
2625}
2626
2627impl<'a> IntoIterator for &'a DatumMap<'a> {
2628    type Item = (&'a str, Datum<'a>);
2629    type IntoIter = DatumDictIter<'a>;
2630    fn into_iter(self) -> DatumDictIter<'a> {
2631        self.iter()
2632    }
2633}
2634
2635impl<'a> Iterator for DatumDictIter<'a> {
2636    type Item = (&'a str, Datum<'a>);
2637    fn next(&mut self) -> Option<Self::Item> {
2638        if self.data.is_empty() {
2639            None
2640        } else {
2641            let key_tag =
2642                Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2643            assert!(
2644                key_tag == Tag::StringTiny
2645                    || key_tag == Tag::StringShort
2646                    || key_tag == Tag::StringLong
2647                    || key_tag == Tag::StringHuge,
2648                "Dict keys must be strings, got {:?}",
2649                key_tag
2650            );
2651            let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
2652            let val = unsafe { read_datum(&mut self.data) };
2653
2654            // if in debug mode, sanity check keys
2655            if cfg!(debug_assertions) {
2656                if let Some(prev_key) = self.prev_key {
2657                    debug_assert!(
2658                        prev_key < key,
2659                        "Dict keys must be unique and given in ascending order: {} came before {}",
2660                        prev_key,
2661                        key
2662                    );
2663                }
2664                self.prev_key = Some(key);
2665            }
2666
2667            Some((key, val))
2668        }
2669    }
2670}
2671
2672impl RowArena {
2673    pub fn new() -> Self {
2674        RowArena {
2675            inner: RefCell::new(vec![]),
2676        }
2677    }
2678
2679    /// Creates a `RowArena` with a hint of how many rows will be created in the arena, to avoid
2680    /// reallocations of its internal vector.
2681    pub fn with_capacity(capacity: usize) -> Self {
2682        RowArena {
2683            inner: RefCell::new(Vec::with_capacity(capacity)),
2684        }
2685    }
2686
2687    /// Does a `reserve` on the underlying `Vec`. Call this when you expect `additional` more datums
2688    /// to be created in this arena.
2689    pub fn reserve(&self, additional: usize) {
2690        self.inner.borrow_mut().reserve(additional);
2691    }
2692
2693    /// Take ownership of `bytes` for the lifetime of the arena.
2694    #[allow(clippy::transmute_ptr_to_ptr)]
2695    pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
2696        let mut inner = self.inner.borrow_mut();
2697        inner.push(bytes);
2698        let owned_bytes = &inner[inner.len() - 1];
2699        unsafe {
2700            // This is safe because:
2701            //   * We only ever append to self.inner, so the byte vector
2702            //     will live as long as the arena.
2703            //   * We return a reference to the byte vector's contents, so it's
2704            //     okay if self.inner reallocates and moves the byte
2705            //     vector.
2706            //   * We don't allow access to the byte vector itself, so it will
2707            //     never reallocate.
2708            transmute::<&[u8], &'a [u8]>(owned_bytes)
2709        }
2710    }
2711
2712    /// Take ownership of `string` for the lifetime of the arena.
2713    pub fn push_string<'a>(&'a self, string: String) -> &'a str {
2714        let owned_bytes = self.push_bytes(string.into_bytes());
2715        unsafe {
2716            // This is safe because we know it was a `String` just before.
2717            std::str::from_utf8_unchecked(owned_bytes)
2718        }
2719    }
2720
2721    /// Take ownership of `row` for the lifetime of the arena, returning a
2722    /// reference to the first datum in the row.
2723    ///
2724    /// If we had an owned datum type, this method would be much clearer, and
2725    /// would be called `push_owned_datum`.
2726    pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
2727        let mut inner = self.inner.borrow_mut();
2728        inner.push(row.data.into_vec());
2729        unsafe {
2730            // This is safe because:
2731            //   * We only ever append to self.inner, so the row data will live
2732            //     as long as the arena.
2733            //   * We force the row data into its own heap allocation--
2734            //     importantly, we do NOT store the SmallVec, which might be
2735            //     storing data inline--so it's okay if self.inner reallocates
2736            //     and moves the row.
2737            //   * We don't allow access to the byte vector itself, so it will
2738            //     never reallocate.
2739            let datum = read_datum(&mut &inner[inner.len() - 1][..]);
2740            transmute::<Datum<'_>, Datum<'a>>(datum)
2741        }
2742    }
2743
2744    /// Equivalent to `push_unary_row` but returns a `DatumNested` rather than a
2745    /// `Datum`.
2746    fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
2747        let mut inner = self.inner.borrow_mut();
2748        inner.push(row.data.into_vec());
2749        unsafe {
2750            // This is safe because:
2751            //   * We only ever append to self.inner, so the row data will live
2752            //     as long as the arena.
2753            //   * We force the row data into its own heap allocation--
2754            //     importantly, we do NOT store the SmallVec, which might be
2755            //     storing data inline--so it's okay if self.inner reallocates
2756            //     and moves the row.
2757            //   * We don't allow access to the byte vector itself, so it will
2758            //     never reallocate.
2759            let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
2760            transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
2761        }
2762    }
2763
2764    /// Convenience function to make a new `Row` containing a single datum, and
2765    /// take ownership of it for the lifetime of the arena
2766    ///
2767    /// ```
2768    /// # use mz_repr::{RowArena, Datum};
2769    /// let arena = RowArena::new();
2770    /// let datum = arena.make_datum(|packer| {
2771    ///   packer.push_list(&[Datum::String("hello"), Datum::String("world")]);
2772    /// });
2773    /// assert_eq!(datum.unwrap_list().iter().collect::<Vec<_>>(), vec![Datum::String("hello"), Datum::String("world")]);
2774    /// ```
2775    pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
2776    where
2777        F: FnOnce(&mut RowPacker),
2778    {
2779        let mut row = Row::default();
2780        f(&mut row.packer());
2781        self.push_unary_row(row)
2782    }
2783
2784    /// Convenience function identical to `make_datum` but instead returns a
2785    /// `DatumNested`.
2786    pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
2787    where
2788        F: FnOnce(&mut RowPacker),
2789    {
2790        let mut row = Row::default();
2791        f(&mut row.packer());
2792        self.push_unary_row_datum_nested(row)
2793    }
2794
2795    /// Like [`RowArena::make_datum`], but the provided closure can return an error.
2796    pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
2797    where
2798        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2799    {
2800        let mut row = Row::default();
2801        f(&mut row.packer())?;
2802        Ok(self.push_unary_row(row))
2803    }
2804
2805    /// Clear the contents of the arena.
2806    pub fn clear(&mut self) {
2807        self.inner.borrow_mut().clear();
2808    }
2809}
2810
2811impl Default for RowArena {
2812    fn default() -> RowArena {
2813        RowArena::new()
2814    }
2815}
2816
2817/// A thread-local row, which can be borrowed and returned.
2818/// # Example
2819///
2820/// Use this type instead of creating a new row:
2821/// ```
2822/// use mz_repr::SharedRow;
2823///
2824/// let mut row_builder = SharedRow::get();
2825/// ```
2826///
2827/// This allows us to reuse an existing row allocation instead of creating a new one or retaining
2828/// an allocation locally. Additionally, we can observe the size of the local row in a central
2829/// place and potentially reallocate to reduce memory needs.
2830///
2831/// # Panic
2832///
2833/// [`SharedRow::get`] panics when trying to obtain multiple references to the shared row.
2834#[derive(Debug)]
2835pub struct SharedRow(Row);
2836
2837impl SharedRow {
2838    thread_local! {
2839        /// A thread-local slot containing a shared Row that can be temporarily used by a function.
2840        /// There can be at most one active user of this Row, which is tracked by the state of the
2841        /// `Option<_>` wrapper. When it is `Some(..)`, the row is available for using. When it
2842        /// is `None`, it is not, and the constructor will panic if a thread attempts to use it.
2843        static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
2844    }
2845
2846    /// Get the shared row.
2847    ///
2848    /// The row's contents are cleared before returning it.
2849    ///
2850    /// # Panic
2851    ///
2852    /// Panics when the row is already borrowed elsewhere.
2853    pub fn get() -> Self {
2854        let mut row = Self::SHARED_ROW
2855            .take()
2856            .expect("attempted to borrow already borrowed SharedRow");
2857        // Clear row
2858        row.packer();
2859        Self(row)
2860    }
2861
2862    /// Gets the shared row and uses it to pack `iter`.
2863    pub fn pack<'a, I, D>(iter: I) -> Row
2864    where
2865        I: IntoIterator<Item = D>,
2866        D: Borrow<Datum<'a>>,
2867    {
2868        let mut row_builder = Self::get();
2869        let mut row_packer = row_builder.packer();
2870        row_packer.extend(iter);
2871        row_builder.clone()
2872    }
2873}
2874
2875impl std::ops::Deref for SharedRow {
2876    type Target = Row;
2877
2878    fn deref(&self) -> &Self::Target {
2879        &self.0
2880    }
2881}
2882
2883impl std::ops::DerefMut for SharedRow {
2884    fn deref_mut(&mut self) -> &mut Self::Target {
2885        &mut self.0
2886    }
2887}
2888
2889impl Drop for SharedRow {
2890    fn drop(&mut self) {
2891        // Take the Row allocation from this instance and put it back in the thread local slot for
2892        // the next user. The Row in `self` is replaced with an empty Row which does not allocate.
2893        Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
2894    }
2895}
2896
2897#[cfg(test)]
2898mod tests {
2899    use chrono::{DateTime, NaiveDate};
2900    use mz_ore::{assert_err, assert_none};
2901
2902    use crate::ScalarType;
2903
2904    use super::*;
2905
2906    #[mz_ore::test]
2907    fn test_assumptions() {
2908        assert_eq!(size_of::<Tag>(), 1);
2909        #[cfg(target_endian = "big")]
2910        {
2911            // if you want to run this on a big-endian cpu, we'll need big-endian versions of the serialization code
2912            assert!(false);
2913        }
2914    }
2915
2916    #[mz_ore::test]
2917    fn miri_test_arena() {
2918        let arena = RowArena::new();
2919
2920        assert_eq!(arena.push_string("".to_owned()), "");
2921        assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
2922
2923        let empty: &[u8] = &[];
2924        assert_eq!(arena.push_bytes(vec![]), empty);
2925        assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
2926
2927        let mut row = Row::default();
2928        let mut packer = row.packer();
2929        packer.push_dict_with(|row| {
2930            row.push(Datum::String("a"));
2931            row.push_list_with(|row| {
2932                row.push(Datum::String("one"));
2933                row.push(Datum::String("two"));
2934                row.push(Datum::String("three"));
2935            });
2936            row.push(Datum::String("b"));
2937            row.push(Datum::String("c"));
2938        });
2939        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
2940    }
2941
2942    #[mz_ore::test]
2943    fn miri_test_round_trip() {
2944        fn round_trip(datums: Vec<Datum>) {
2945            let row = Row::pack(datums.clone());
2946
2947            // When run under miri this catches undefined bytes written to data
2948            // eg by calling push_copy! on a type which contains undefined padding values
2949            println!("{:?}", row.data());
2950
2951            let datums2 = row.iter().collect::<Vec<_>>();
2952            let datums3 = row.unpack();
2953            assert_eq!(datums, datums2);
2954            assert_eq!(datums, datums3);
2955        }
2956
2957        round_trip(vec![]);
2958        round_trip(
2959            ScalarType::enumerate()
2960                .iter()
2961                .flat_map(|r#type| r#type.interesting_datums())
2962                .collect(),
2963        );
2964        round_trip(vec![
2965            Datum::Null,
2966            Datum::Null,
2967            Datum::False,
2968            Datum::True,
2969            Datum::Int16(-21),
2970            Datum::Int32(-42),
2971            Datum::Int64(-2_147_483_648 - 42),
2972            Datum::UInt8(0),
2973            Datum::UInt8(1),
2974            Datum::UInt16(0),
2975            Datum::UInt16(1),
2976            Datum::UInt16(1 << 8),
2977            Datum::UInt32(0),
2978            Datum::UInt32(1),
2979            Datum::UInt32(1 << 8),
2980            Datum::UInt32(1 << 16),
2981            Datum::UInt32(1 << 24),
2982            Datum::UInt64(0),
2983            Datum::UInt64(1),
2984            Datum::UInt64(1 << 8),
2985            Datum::UInt64(1 << 16),
2986            Datum::UInt64(1 << 24),
2987            Datum::UInt64(1 << 32),
2988            Datum::UInt64(1 << 40),
2989            Datum::UInt64(1 << 48),
2990            Datum::UInt64(1 << 56),
2991            Datum::Float32(OrderedFloat::from(-42.12)),
2992            Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
2993            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
2994            Datum::Timestamp(
2995                CheckedTimestamp::from_timestamplike(
2996                    NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
2997                        .unwrap()
2998                        .and_hms_opt(14, 32, 11)
2999                        .unwrap(),
3000                )
3001                .unwrap(),
3002            ),
3003            Datum::TimestampTz(
3004                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3005                    .unwrap(),
3006            ),
3007            Datum::Interval(Interval {
3008                months: 312,
3009                ..Default::default()
3010            }),
3011            Datum::Interval(Interval::new(0, 0, 1_012_312)),
3012            Datum::Bytes(&[]),
3013            Datum::Bytes(&[0, 2, 1, 255]),
3014            Datum::String(""),
3015            Datum::String("العَرَبِيَّة"),
3016        ]);
3017    }
3018
3019    #[mz_ore::test]
3020    fn test_array() {
3021        // Construct an array using `Row::push_array` and verify that it unpacks
3022        // correctly.
3023        const DIM: ArrayDimension = ArrayDimension {
3024            lower_bound: 2,
3025            length: 2,
3026        };
3027        let mut row = Row::default();
3028        let mut packer = row.packer();
3029        packer
3030            .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3031            .unwrap();
3032        let arr1 = row.unpack_first().unwrap_array();
3033        assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3034        assert_eq!(
3035            arr1.elements().into_iter().collect::<Vec<_>>(),
3036            vec![Datum::Int32(1), Datum::Int32(2)]
3037        );
3038
3039        // Pack a previously-constructed `Datum::Array` and verify that it
3040        // unpacks correctly.
3041        let row = Row::pack_slice(&[Datum::Array(arr1)]);
3042        let arr2 = row.unpack_first().unwrap_array();
3043        assert_eq!(arr1, arr2);
3044    }
3045
3046    #[mz_ore::test]
3047    fn test_multidimensional_array() {
3048        let datums = vec![
3049            Datum::Int32(1),
3050            Datum::Int32(2),
3051            Datum::Int32(3),
3052            Datum::Int32(4),
3053            Datum::Int32(5),
3054            Datum::Int32(6),
3055            Datum::Int32(7),
3056            Datum::Int32(8),
3057        ];
3058
3059        let mut row = Row::default();
3060        let mut packer = row.packer();
3061        packer
3062            .try_push_array(
3063                &[
3064                    ArrayDimension {
3065                        lower_bound: 1,
3066                        length: 1,
3067                    },
3068                    ArrayDimension {
3069                        lower_bound: 1,
3070                        length: 4,
3071                    },
3072                    ArrayDimension {
3073                        lower_bound: 1,
3074                        length: 2,
3075                    },
3076                ],
3077                &datums,
3078            )
3079            .unwrap();
3080        let array = row.unpack_first().unwrap_array();
3081        assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3082    }
3083
3084    #[mz_ore::test]
3085    fn test_array_max_dimensions() {
3086        let mut row = Row::default();
3087        let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3088
3089        // An array with one too many dimensions should be rejected.
3090        let res = row.packer().try_push_array(
3091            &vec![
3092                ArrayDimension {
3093                    lower_bound: 1,
3094                    length: 1
3095                };
3096                max_dims + 1
3097            ],
3098            vec![Datum::Int32(4)],
3099        );
3100        assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3101        assert!(row.data.is_empty());
3102
3103        // An array with exactly the maximum allowable dimensions should be
3104        // accepted.
3105        row.packer()
3106            .try_push_array(
3107                &vec![
3108                    ArrayDimension {
3109                        lower_bound: 1,
3110                        length: 1
3111                    };
3112                    max_dims
3113                ],
3114                vec![Datum::Int32(4)],
3115            )
3116            .unwrap();
3117    }
3118
3119    #[mz_ore::test]
3120    fn test_array_wrong_cardinality() {
3121        let mut row = Row::default();
3122        let res = row.packer().try_push_array(
3123            &[
3124                ArrayDimension {
3125                    lower_bound: 1,
3126                    length: 2,
3127                },
3128                ArrayDimension {
3129                    lower_bound: 1,
3130                    length: 3,
3131                },
3132            ],
3133            vec![Datum::Int32(1), Datum::Int32(2)],
3134        );
3135        assert_eq!(
3136            res,
3137            Err(InvalidArrayError::WrongCardinality {
3138                actual: 2,
3139                expected: 6,
3140            })
3141        );
3142        assert!(row.data.is_empty());
3143    }
3144
3145    #[mz_ore::test]
3146    fn test_nesting() {
3147        let mut row = Row::default();
3148        row.packer().push_dict_with(|row| {
3149            row.push(Datum::String("favourites"));
3150            row.push_list_with(|row| {
3151                row.push(Datum::String("ice cream"));
3152                row.push(Datum::String("oreos"));
3153                row.push(Datum::String("cheesecake"));
3154            });
3155            row.push(Datum::String("name"));
3156            row.push(Datum::String("bob"));
3157        });
3158
3159        let mut iter = row.unpack_first().unwrap_map().iter();
3160
3161        let (k, v) = iter.next().unwrap();
3162        assert_eq!(k, "favourites");
3163        assert_eq!(
3164            v.unwrap_list().iter().collect::<Vec<_>>(),
3165            vec![
3166                Datum::String("ice cream"),
3167                Datum::String("oreos"),
3168                Datum::String("cheesecake"),
3169            ]
3170        );
3171
3172        let (k, v) = iter.next().unwrap();
3173        assert_eq!(k, "name");
3174        assert_eq!(v, Datum::String("bob"));
3175    }
3176
3177    #[mz_ore::test]
3178    fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3179        let pack = |ok| {
3180            let mut row = Row::default();
3181            row.packer().push_dict_with(|row| {
3182                if ok {
3183                    row.push(Datum::String("key"));
3184                    row.push(Datum::Int32(42));
3185                    Ok(7)
3186                } else {
3187                    Err("fail")
3188                }
3189            })?;
3190            Ok(row)
3191        };
3192
3193        assert_eq!(pack(false), Err("fail"));
3194
3195        let row = pack(true)?;
3196        let mut dict = row.unpack_first().unwrap_map().iter();
3197        assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3198        assert_eq!(dict.next(), None);
3199
3200        Ok(())
3201    }
3202
3203    #[mz_ore::test]
3204    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
3205    fn test_datum_sizes() {
3206        let arena = RowArena::new();
3207
3208        // Test the claims about various datum sizes.
3209        let values_of_interest = vec![
3210            Datum::Null,
3211            Datum::False,
3212            Datum::Int16(0),
3213            Datum::Int32(0),
3214            Datum::Int64(0),
3215            Datum::UInt8(0),
3216            Datum::UInt8(1),
3217            Datum::UInt16(0),
3218            Datum::UInt16(1),
3219            Datum::UInt16(1 << 8),
3220            Datum::UInt32(0),
3221            Datum::UInt32(1),
3222            Datum::UInt32(1 << 8),
3223            Datum::UInt32(1 << 16),
3224            Datum::UInt32(1 << 24),
3225            Datum::UInt64(0),
3226            Datum::UInt64(1),
3227            Datum::UInt64(1 << 8),
3228            Datum::UInt64(1 << 16),
3229            Datum::UInt64(1 << 24),
3230            Datum::UInt64(1 << 32),
3231            Datum::UInt64(1 << 40),
3232            Datum::UInt64(1 << 48),
3233            Datum::UInt64(1 << 56),
3234            Datum::Float32(OrderedFloat(0.0)),
3235            Datum::Float64(OrderedFloat(0.0)),
3236            Datum::from(numeric::Numeric::from(0)),
3237            Datum::from(numeric::Numeric::from(1000)),
3238            Datum::from(numeric::Numeric::from(9999)),
3239            Datum::Date(
3240                NaiveDate::from_ymd_opt(1, 1, 1)
3241                    .unwrap()
3242                    .try_into()
3243                    .unwrap(),
3244            ),
3245            Datum::Timestamp(
3246                CheckedTimestamp::from_timestamplike(
3247                    DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3248                )
3249                .unwrap(),
3250            ),
3251            Datum::TimestampTz(
3252                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3253                    .unwrap(),
3254            ),
3255            Datum::Interval(Interval::default()),
3256            Datum::Bytes(&[]),
3257            Datum::String(""),
3258            Datum::JsonNull,
3259            Datum::Range(Range { inner: None }),
3260            arena.make_datum(|packer| {
3261                packer
3262                    .push_range(Range::new(Some((
3263                        RangeLowerBound::new(Datum::Int32(-1), true),
3264                        RangeUpperBound::new(Datum::Int32(1), true),
3265                    ))))
3266                    .unwrap();
3267            }),
3268        ];
3269        for value in values_of_interest {
3270            if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3271                panic!("Disparity in claimed size for {:?}", value);
3272            }
3273        }
3274    }
3275
3276    #[mz_ore::test]
3277    fn test_range_errors() {
3278        fn test_range_errors_inner<'a>(
3279            datums: Vec<Vec<Datum<'a>>>,
3280        ) -> Result<(), InvalidRangeError> {
3281            let mut row = Row::default();
3282            let row_len = row.byte_len();
3283            let mut packer = row.packer();
3284            let r = packer.push_range_with(
3285                RangeLowerBound {
3286                    inclusive: true,
3287                    bound: Some(|row: &mut RowPacker| {
3288                        for d in &datums[0] {
3289                            row.push(d);
3290                        }
3291                        Ok(())
3292                    }),
3293                },
3294                RangeUpperBound {
3295                    inclusive: true,
3296                    bound: Some(|row: &mut RowPacker| {
3297                        for d in &datums[1] {
3298                            row.push(d);
3299                        }
3300                        Ok(())
3301                    }),
3302                },
3303            );
3304
3305            assert_eq!(row_len, row.byte_len());
3306
3307            r
3308        }
3309
3310        for panicking_case in [
3311            vec![vec![Datum::Int32(1)], vec![]],
3312            vec![
3313                vec![Datum::Int32(1), Datum::Int32(2)],
3314                vec![Datum::Int32(3)],
3315            ],
3316            vec![
3317                vec![Datum::Int32(1)],
3318                vec![Datum::Int32(2), Datum::Int32(3)],
3319            ],
3320            vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3321            vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3322            vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3323            vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3324        ] {
3325            #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
3326            let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3327            assert_err!(result);
3328        }
3329
3330        let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3331        assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3332    }
3333
3334    /// Lists have a variable-length encoding for their lengths. We test each case here.
3335    #[mz_ore::test]
3336    #[cfg_attr(miri, ignore)] // slow
3337    fn test_list_encoding() {
3338        fn test_list_encoding_inner(len: usize) {
3339            let list_elem = |i: usize| {
3340                if i % 2 == 0 {
3341                    Datum::False
3342                } else {
3343                    Datum::True
3344                }
3345            };
3346            let mut row = Row::default();
3347            {
3348                // Push some stuff.
3349                let mut packer = row.packer();
3350                packer.push(Datum::String("start"));
3351                packer.push_list_with(|packer| {
3352                    for i in 0..len {
3353                        packer.push(list_elem(i));
3354                    }
3355                });
3356                packer.push(Datum::String("end"));
3357            }
3358            // Check that we read back exactly what we pushed.
3359            let mut row_it = row.iter();
3360            assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3361            match row_it.next().unwrap() {
3362                Datum::List(list) => {
3363                    let mut list_it = list.iter();
3364                    for i in 0..len {
3365                        assert_eq!(list_it.next().unwrap(), list_elem(i));
3366                    }
3367                    assert_none!(list_it.next());
3368                }
3369                _ => panic!("expected Datum::List"),
3370            }
3371            assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3372            assert_none!(row_it.next());
3373        }
3374
3375        test_list_encoding_inner(0);
3376        test_list_encoding_inner(1);
3377        test_list_encoding_inner(10);
3378        test_list_encoding_inner(TINY - 1); // tiny
3379        test_list_encoding_inner(TINY + 1); // short
3380        test_list_encoding_inner(SHORT + 1); // long
3381
3382        // The biggest one takes 40 s on my laptop, probably not worth it.
3383        //test_list_encoding_inner(LONG + 1); // huge
3384    }
3385}