mz_repr/
row.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::cell::RefCell;
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::mem::{size_of, transmute};
16use std::ops::Deref;
17use std::rc::Rc;
18use std::str;
19
20use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
21use compact_bytes::CompactBytes;
22use mz_ore::cast::{CastFrom, ReinterpretCast};
23use mz_ore::soft_assert_no_log;
24use mz_ore::vec::Vector;
25use mz_persist_types::Codec64;
26use num_enum::{IntoPrimitive, TryFromPrimitive};
27use ordered_float::OrderedFloat;
28use proptest::prelude::*;
29use proptest::strategy::{BoxedStrategy, Strategy};
30use serde::{Deserialize, Serialize};
31use uuid::Uuid;
32
33use crate::adt::array::{
34    Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
35};
36use crate::adt::date::Date;
37use crate::adt::interval::Interval;
38use crate::adt::mz_acl_item::{AclItem, MzAclItem};
39use crate::adt::numeric;
40use crate::adt::numeric::Numeric;
41use crate::adt::range::{
42    self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
43};
44use crate::adt::timestamp::CheckedTimestamp;
45use crate::scalar::{DatumKind, arb_datum};
46use crate::{Datum, RelationDesc, Timestamp};
47
48pub(crate) mod encode;
49pub mod iter;
50
51include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
52
53/// A packed representation for `Datum`s.
54///
55/// `Datum` is easy to work with but very space inefficient. A `Datum::Int32(42)`
56/// is laid out in memory like this:
57///
58///   tag: 3
59///   padding: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
60///   data: 0 0 0 42
61///   padding: 0 0 0 0 0 0 0 0 0 0 0 0
62///
63/// For a total of 32 bytes! The second set of padding is needed in case we were
64/// to write a 16-byte datum into this location. The first set of padding is
65/// needed to align that hypothetical decimal to a 16 bytes boundary.
66///
67/// A `Row` stores zero or more `Datum`s without any padding. We avoid the need
68/// for the first set of padding by only providing access to the `Datum`s via
69/// calls to `ptr::read_unaligned`, which on modern x86 is barely penalized. We
70/// avoid the need for the second set of padding by not providing mutable access
71/// to the `Datum`. Instead, `Row` is append-only.
72///
73/// A `Row` can be built from a collection of `Datum`s using `Row::pack`, but it
74/// is more efficient to use `Row::pack_slice` so that a right-sized allocation
75/// can be created. If that is not possible, consider using the row buffer
76/// pattern: allocate one row, pack into it, and then call [`Row::clone`] to
77/// receive a copy of that row, leaving behind the original allocation to pack
78/// future rows.
79///
80/// Creating a row via [`Row::pack_slice`]:
81///
82/// ```
83/// # use mz_repr::{Row, Datum};
84/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
85/// assert_eq!(row.unpack(), vec![Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)])
86/// ```
87///
88/// `Row`s can be unpacked by iterating over them:
89///
90/// ```
91/// # use mz_repr::{Row, Datum};
92/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
93/// assert_eq!(row.iter().nth(1).unwrap(), Datum::Int32(1));
94/// ```
95///
96/// If you want random access to the `Datum`s in a `Row`, use `Row::unpack` to create a `Vec<Datum>`
97/// ```
98/// # use mz_repr::{Row, Datum};
99/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
100/// let datums = row.unpack();
101/// assert_eq!(datums[1], Datum::Int32(1));
102/// ```
103///
104/// # Performance
105///
106/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
107/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
108/// avoids the allocations involved in `Row::new()`.
109#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
110pub struct Row {
111    data: CompactBytes,
112}
113
114impl Row {
115    const SIZE: usize = CompactBytes::MAX_INLINE;
116
117    /// A variant of `Row::from_proto` that allows for reuse of internal allocs
118    /// and validates the decoding against a provided [`RelationDesc`].
119    pub fn decode_from_proto(
120        &mut self,
121        proto: &ProtoRow,
122        desc: &RelationDesc,
123    ) -> Result<(), String> {
124        let mut packer = self.packer();
125        for (col_idx, _, _) in desc.iter_all() {
126            let d = match proto.datums.get(col_idx.to_raw()) {
127                Some(x) => x,
128                None => {
129                    packer.push(Datum::Null);
130                    continue;
131                }
132            };
133            packer.try_push_proto(d)?;
134        }
135
136        Ok(())
137    }
138
139    /// Allocate an empty `Row` with a pre-allocated capacity.
140    #[inline]
141    pub fn with_capacity(cap: usize) -> Self {
142        Self {
143            data: CompactBytes::with_capacity(cap),
144        }
145    }
146
147    /// Creates a new row from supplied bytes.
148    ///
149    /// # Safety
150    ///
151    /// This method relies on `data` being an appropriate row encoding, and can
152    /// result in unsafety if this is not the case.
153    pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
154        Row {
155            data: CompactBytes::new(data),
156        }
157    }
158
159    /// Constructs a [`RowPacker`] that will pack datums into this row's
160    /// allocation.
161    ///
162    /// This method clears the existing contents of the row, but retains the
163    /// allocation.
164    pub fn packer(&mut self) -> RowPacker<'_> {
165        self.clear();
166        RowPacker { row: self }
167    }
168
169    /// Take some `Datum`s and pack them into a `Row`.
170    ///
171    /// This method builds a `Row` by repeatedly increasing the backing
172    /// allocation. If the contents of the iterator are known ahead of
173    /// time, consider [`Row::with_capacity`] to right-size the allocation
174    /// first, and then [`RowPacker::extend`] to populate it with `Datum`s.
175    /// This avoids the repeated allocation resizing and copying.
176    pub fn pack<'a, I, D>(iter: I) -> Row
177    where
178        I: IntoIterator<Item = D>,
179        D: Borrow<Datum<'a>>,
180    {
181        let mut row = Row::default();
182        row.packer().extend(iter);
183        row
184    }
185
186    /// Use `self` to pack `iter`, and then clone the result.
187    ///
188    /// This is a convenience method meant to reduce boilerplate around row
189    /// formation.
190    pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
191    where
192        I: IntoIterator<Item = D>,
193        D: Borrow<Datum<'a>>,
194    {
195        self.packer().extend(iter);
196        self.clone()
197    }
198
199    /// Like [`Row::pack`], but the provided iterator is allowed to produce an
200    /// error, in which case the packing operation is aborted and the error
201    /// returned.
202    pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
203    where
204        I: IntoIterator<Item = Result<D, E>>,
205        D: Borrow<Datum<'a>>,
206    {
207        let mut row = Row::default();
208        row.packer().try_extend(iter)?;
209        Ok(row)
210    }
211
212    /// Pack a slice of `Datum`s into a `Row`.
213    ///
214    /// This method has the advantage over `pack` that it can determine the required
215    /// allocation before packing the elements, ensuring only one allocation and no
216    /// redundant copies required.
217    pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
218        // Pre-allocate the needed number of bytes.
219        let mut row = Row::with_capacity(datums_size(slice.iter()));
220        row.packer().extend(slice.iter());
221        row
222    }
223
224    /// Returns the total amount of bytes used by this row.
225    pub fn byte_len(&self) -> usize {
226        let heap_size = if self.data.spilled() {
227            self.data.len()
228        } else {
229            0
230        };
231        let inline_size = std::mem::size_of::<Self>();
232        inline_size.saturating_add(heap_size)
233    }
234
235    /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself.
236    pub fn data_len(&self) -> usize {
237        self.data.len()
238    }
239
240    /// Returns the total capacity in bytes used by this row.
241    pub fn byte_capacity(&self) -> usize {
242        self.data.capacity()
243    }
244
245    /// Extracts a Row slice containing the entire [`Row`].
246    #[inline]
247    pub fn as_row_ref(&self) -> &RowRef {
248        RowRef::from_slice(self.data.as_slice())
249    }
250
251    /// Clear the contents of the [`Row`], leaving any allocation in place.
252    #[inline]
253    fn clear(&mut self) {
254        self.data.clear();
255    }
256}
257
258impl Borrow<RowRef> for Row {
259    #[inline]
260    fn borrow(&self) -> &RowRef {
261        self.as_row_ref()
262    }
263}
264
265impl AsRef<RowRef> for Row {
266    #[inline]
267    fn as_ref(&self) -> &RowRef {
268        self.as_row_ref()
269    }
270}
271
272impl Deref for Row {
273    type Target = RowRef;
274
275    #[inline]
276    fn deref(&self) -> &Self::Target {
277        self.as_row_ref()
278    }
279}
280
281// Nothing depends on Row being exactly 24, we just want to add visibility to the size.
282static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
283
284impl Clone for Row {
285    fn clone(&self) -> Self {
286        Row {
287            data: self.data.clone(),
288        }
289    }
290
291    fn clone_from(&mut self, source: &Self) {
292        self.data.clone_from(&source.data);
293    }
294}
295
296// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
297impl std::hash::Hash for Row {
298    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
299        self.as_row_ref().hash(state)
300    }
301}
302
303impl Arbitrary for Row {
304    type Parameters = prop::collection::SizeRange;
305    type Strategy = BoxedStrategy<Row>;
306
307    fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
308        prop::collection::vec(arb_datum(), size)
309            .prop_map(|items| {
310                let mut row = Row::default();
311                let mut packer = row.packer();
312                for item in items.iter() {
313                    let datum: Datum<'_> = item.into();
314                    packer.push(datum);
315                }
316                row
317            })
318            .boxed()
319    }
320}
321
322impl PartialOrd for Row {
323    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
324        Some(self.cmp(other))
325    }
326}
327
328impl Ord for Row {
329    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
330        self.as_ref().cmp(other.as_ref())
331    }
332}
333
334#[allow(missing_debug_implementations)]
335mod columnation {
336    use columnation::{Columnation, Region};
337    use mz_ore::region::LgAllocRegion;
338
339    use crate::Row;
340
341    /// Region allocation for `Row` data.
342    ///
343    /// Content bytes are stored in stable contiguous memory locations,
344    /// and then a `Row` referencing them is falsified.
345    pub struct RowStack {
346        region: LgAllocRegion<u8>,
347    }
348
349    impl RowStack {
350        const LIMIT: usize = 2 << 20;
351    }
352
353    // Implement `Default` manually to specify a region allocation limit.
354    impl Default for RowStack {
355        fn default() -> Self {
356            Self {
357                // Limit the region size to 2MiB.
358                region: LgAllocRegion::with_limit(Self::LIMIT),
359            }
360        }
361    }
362
363    impl Columnation for Row {
364        type InnerRegion = RowStack;
365    }
366
367    impl Region for RowStack {
368        type Item = Row;
369        #[inline]
370        fn clear(&mut self) {
371            self.region.clear();
372        }
373        #[inline(always)]
374        unsafe fn copy(&mut self, item: &Row) -> Row {
375            if item.data.spilled() {
376                let bytes = self.region.copy_slice(&item.data[..]);
377                Row {
378                    data: compact_bytes::CompactBytes::from_raw_parts(
379                        bytes.as_mut_ptr(),
380                        item.data.len(),
381                        item.data.capacity(),
382                    ),
383                }
384            } else {
385                item.clone()
386            }
387        }
388
389        fn reserve_items<'a, I>(&mut self, items: I)
390        where
391            Self: 'a,
392            I: Iterator<Item = &'a Self::Item> + Clone,
393        {
394            let size = items
395                .filter(|row| row.data.spilled())
396                .map(|row| row.data.len())
397                .sum();
398            let size = std::cmp::min(size, Self::LIMIT);
399            self.region.reserve(size);
400        }
401
402        fn reserve_regions<'a, I>(&mut self, regions: I)
403        where
404            Self: 'a,
405            I: Iterator<Item = &'a Self> + Clone,
406        {
407            let size = regions.map(|r| r.region.len()).sum();
408            let size = std::cmp::min(size, Self::LIMIT);
409            self.region.reserve(size);
410        }
411
412        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
413            self.region.heap_size(callback)
414        }
415    }
416}
417
418mod columnar {
419    use columnar::{
420        AsBytes, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
421    };
422    use mz_ore::cast::CastFrom;
423
424    use crate::{Row, RowRef};
425
426    #[derive(Copy, Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
427    pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
428        /// Bounds container; provides indexed access to offsets.
429        pub bounds: BC,
430        /// Values container; provides slice access to bytes.
431        pub values: VC,
432    }
433
434    impl Columnar for Row {
435        type Ref<'a> = &'a RowRef;
436        fn copy_from(&mut self, other: Self::Ref<'_>) {
437            self.clear();
438            self.data.extend_from_slice(other.data());
439        }
440        fn into_owned(other: Self::Ref<'_>) -> Self {
441            other.to_owned()
442        }
443        type Container = Rows;
444    }
445
446    impl<'b, BC: Container<u64>> Container<Row> for Rows<BC, &'b [u8]> {
447        type Borrowed<'a>
448            = Rows<BC::Borrowed<'a>, &'a [u8]>
449        where
450            Self: 'a;
451        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
452            Rows {
453                bounds: self.bounds.borrow(),
454                values: self.values,
455            }
456        }
457    }
458    impl<BC: Container<u64>> Container<Row> for Rows<BC, Vec<u8>> {
459        type Borrowed<'a>
460            = Rows<BC::Borrowed<'a>, &'a [u8]>
461        where
462            BC: 'a;
463        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
464            Rows {
465                bounds: self.bounds.borrow(),
466                values: self.values.borrow(),
467            }
468        }
469    }
470
471    impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
472        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
473            self.bounds.as_bytes().chain(self.values.as_bytes())
474        }
475    }
476    impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
477        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
478            Self {
479                bounds: FromBytes::from_bytes(bytes),
480                values: FromBytes::from_bytes(bytes),
481            }
482        }
483    }
484
485    impl<BC: Len, VC> Len for Rows<BC, VC> {
486        #[inline(always)]
487        fn len(&self) -> usize {
488            self.bounds.len()
489        }
490    }
491
492    impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
493        type Ref = &'a RowRef;
494        #[inline(always)]
495        fn get(&self, index: usize) -> Self::Ref {
496            let lower = if index == 0 {
497                0
498            } else {
499                self.bounds.index_as(index - 1)
500            };
501            let upper = self.bounds.index_as(index);
502            let lower = usize::cast_from(lower);
503            let upper = usize::cast_from(upper);
504            RowRef::from_slice(&self.values[lower..upper])
505        }
506    }
507    impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
508        type Ref = &'a RowRef;
509        #[inline(always)]
510        fn get(&self, index: usize) -> Self::Ref {
511            let lower = if index == 0 {
512                0
513            } else {
514                self.bounds.index_as(index - 1)
515            };
516            let upper = self.bounds.index_as(index);
517            let lower = usize::cast_from(lower);
518            let upper = usize::cast_from(upper);
519            RowRef::from_slice(&self.values[lower..upper])
520        }
521    }
522
523    impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
524        #[inline(always)]
525        fn push(&mut self, item: &Row) {
526            self.values.extend_from_slice(item.data.as_slice());
527            self.bounds.push(u64::cast_from(self.values.len()));
528        }
529    }
530    impl<BC: Push<u64>> Push<&RowRef> for Rows<BC> {
531        fn push(&mut self, item: &RowRef) {
532            self.values.extend_from_slice(item.data());
533            self.bounds.push(u64::cast_from(self.values.len()));
534        }
535    }
536    impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
537        fn clear(&mut self) {
538            self.bounds.clear();
539            self.values.clear();
540        }
541    }
542    impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
543        fn heap_size(&self) -> (usize, usize) {
544            let (l0, c0) = self.bounds.heap_size();
545            let (l1, c1) = self.values.heap_size();
546            (l0 + l1, c0 + c1)
547        }
548    }
549}
550
551/// A contiguous slice of bytes that are row data.
552///
553/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
554#[derive(PartialEq, Eq, Hash)]
555#[repr(transparent)]
556pub struct RowRef([u8]);
557
558impl RowRef {
559    /// Create a [`RowRef`] from a slice of data.
560    ///
561    /// We do not check that the provided slice is valid [`Row`] data, will panic on read
562    /// if the data is invalid.
563    pub fn from_slice(row: &[u8]) -> &RowRef {
564        #[allow(clippy::as_conversions)]
565        let ptr = row as *const [u8] as *const RowRef;
566        // SAFETY: We know `ptr` is non-null and aligned because it came from a &[u8].
567        unsafe { &*ptr }
568    }
569
570    /// Unpack `self` into a `Vec<Datum>` for efficient random access.
571    pub fn unpack(&self) -> Vec<Datum> {
572        // It's usually cheaper to unpack twice to figure out the right length than it is to grow the vec as we go
573        let len = self.iter().count();
574        let mut vec = Vec::with_capacity(len);
575        vec.extend(self.iter());
576        vec
577    }
578
579    /// Return the first [`Datum`] in `self`
580    ///
581    /// Panics if the [`RowRef`] is empty.
582    pub fn unpack_first(&self) -> Datum {
583        self.iter().next().unwrap()
584    }
585
586    /// Iterate the [`Datum`] elements of the [`RowRef`].
587    pub fn iter(&self) -> DatumListIter {
588        DatumListIter { data: &self.0 }
589    }
590
591    /// Return the byte length of this [`RowRef`].
592    pub fn byte_len(&self) -> usize {
593        self.0.len()
594    }
595
596    /// For debugging only.
597    pub fn data(&self) -> &[u8] {
598        &self.0
599    }
600
601    /// True iff there is no data in this [`RowRef`].
602    pub fn is_empty(&self) -> bool {
603        self.0.is_empty()
604    }
605}
606
607impl ToOwned for RowRef {
608    type Owned = Row;
609
610    fn to_owned(&self) -> Self::Owned {
611        // SAFETY: RowRef has the invariant that the wrapped data must be a valid Row encoding.
612        unsafe { Row::from_bytes_unchecked(&self.0) }
613    }
614}
615
616impl<'a> IntoIterator for &'a RowRef {
617    type Item = Datum<'a>;
618    type IntoIter = DatumListIter<'a>;
619
620    fn into_iter(self) -> DatumListIter<'a> {
621        DatumListIter { data: &self.0 }
622    }
623}
624
625/// These implementations order first by length, and then by slice contents.
626/// This allows many comparisons to complete without dereferencing memory.
627/// Warning: These order by the u8 array representation, and NOT by Datum::cmp.
628impl PartialOrd for RowRef {
629    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
630        Some(self.cmp(other))
631    }
632}
633
634impl Ord for RowRef {
635    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
636        match self.0.len().cmp(&other.0.len()) {
637            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
638            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
639            std::cmp::Ordering::Equal => self.0.cmp(&other.0),
640        }
641    }
642}
643
644impl fmt::Debug for RowRef {
645    /// Debug representation using the internal datums
646    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
647        f.write_str("RowRef{")?;
648        f.debug_list().entries(self.into_iter()).finish()?;
649        f.write_str("}")
650    }
651}
652
653/// Packs datums into a [`Row`].
654///
655/// Creating a `RowPacker` via [`Row::packer`] starts a packing operation on the
656/// row. A packing operation always starts from scratch: the existing contents
657/// of the underlying row are cleared.
658///
659/// To complete a packing operation, drop the `RowPacker`.
660#[derive(Debug)]
661pub struct RowPacker<'a> {
662    row: &'a mut Row,
663}
664
665#[derive(Debug, Clone)]
666pub struct DatumListIter<'a> {
667    data: &'a [u8],
668}
669
670#[derive(Debug, Clone)]
671pub struct DatumDictIter<'a> {
672    data: &'a [u8],
673    prev_key: Option<&'a str>,
674}
675
676/// `RowArena` is used to hold on to temporary `Row`s for functions like `eval` that need to create complex `Datum`s but don't have a `Row` to put them in yet.
677#[derive(Debug)]
678pub struct RowArena {
679    // Semantically, this field would be better represented by a `Vec<Box<[u8]>>`,
680    // as once the arena takes ownership of a byte vector the vector is never
681    // modified. But `RowArena::push_bytes` takes ownership of a `Vec<u8>`, so
682    // storing that `Vec<u8>` directly avoids an allocation. The cost is
683    // additional memory use, as the vector may have spare capacity, but row
684    // arenas are short lived so this is the better tradeoff.
685    inner: RefCell<Vec<Vec<u8>>>,
686}
687
688// DatumList and DatumDict defined here rather than near Datum because we need private access to the unsafe data field
689
690/// A sequence of Datums
691#[derive(Clone, Copy, Eq, PartialEq, Hash)]
692pub struct DatumList<'a> {
693    /// Points at the serialized datums
694    data: &'a [u8],
695}
696
697impl<'a> Debug for DatumList<'a> {
698    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
699        f.debug_list().entries(self.iter()).finish()
700    }
701}
702
703impl Ord for DatumList<'_> {
704    fn cmp(&self, other: &DatumList) -> Ordering {
705        self.iter().cmp(other.iter())
706    }
707}
708
709impl PartialOrd for DatumList<'_> {
710    fn partial_cmp(&self, other: &DatumList) -> Option<Ordering> {
711        Some(self.cmp(other))
712    }
713}
714
715/// A mapping from string keys to Datums
716#[derive(Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
717pub struct DatumMap<'a> {
718    /// Points at the serialized datums, which should be sorted in key order
719    data: &'a [u8],
720}
721
722/// Represents a single `Datum`, appropriate to be nested inside other
723/// `Datum`s.
724#[derive(Clone, Copy, Eq, PartialEq, Hash)]
725pub struct DatumNested<'a> {
726    val: &'a [u8],
727}
728
729impl<'a> std::fmt::Display for DatumNested<'a> {
730    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
731        std::fmt::Display::fmt(&self.datum(), f)
732    }
733}
734
735impl<'a> std::fmt::Debug for DatumNested<'a> {
736    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
737        f.debug_struct("DatumNested")
738            .field("val", &self.datum())
739            .finish()
740    }
741}
742
743impl<'a> DatumNested<'a> {
744    // Figure out which bytes `read_datum` returns (e.g. including the tag),
745    // and then store a reference to those bytes, so we can "replay" this same
746    // call later on without storing the datum itself.
747    pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
748        let prev = *data;
749        let _ = unsafe { read_datum(data) };
750        DatumNested {
751            val: &prev[..(prev.len() - data.len())],
752        }
753    }
754
755    /// Returns the datum `self` contains.
756    pub fn datum(&self) -> Datum<'a> {
757        let mut temp = self.val;
758        unsafe { read_datum(&mut temp) }
759    }
760}
761
762impl<'a> Ord for DatumNested<'a> {
763    fn cmp(&self, other: &Self) -> Ordering {
764        self.datum().cmp(&other.datum())
765    }
766}
767
768impl<'a> PartialOrd for DatumNested<'a> {
769    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
770        Some(self.cmp(other))
771    }
772}
773
774// Prefer adding new tags to the end of the enum. Certain behavior, like row ordering and EXPLAIN
775// PHYSICAL PLAN, rely on the ordering of this enum. Neither of these are breaking changes, but
776// it's annoying when they change.
777#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
778#[repr(u8)]
779enum Tag {
780    Null,
781    False,
782    True,
783    Int16,
784    Int32,
785    Int64,
786    UInt8,
787    UInt32,
788    Float32,
789    Float64,
790    Date,
791    Time,
792    Timestamp,
793    TimestampTz,
794    Interval,
795    BytesTiny,
796    BytesShort,
797    BytesLong,
798    BytesHuge,
799    StringTiny,
800    StringShort,
801    StringLong,
802    StringHuge,
803    Uuid,
804    Array,
805    ListTiny,
806    ListShort,
807    ListLong,
808    ListHuge,
809    Dict,
810    JsonNull,
811    Dummy,
812    Numeric,
813    UInt16,
814    UInt64,
815    MzTimestamp,
816    Range,
817    MzAclItem,
818    AclItem,
819    // Everything except leap seconds and times beyond the range of
820    // i64 nanoseconds. (Note that Materialize does not support leap
821    // seconds, but this module does).
822    CheapTimestamp,
823    // Everything except leap seconds and times beyond the range of
824    // i64 nanoseconds. (Note that Materialize does not support leap
825    // seconds, but this module does).
826    CheapTimestampTz,
827    // The next several tags are for variable-length signed integer encoding.
828    // The basic idea is that `NonNegativeIntN_K` is used to encode a datum of type
829    // IntN whose actual value is positive or zero and fits in K bits, and similarly for
830    // NegativeIntN_K with negative values.
831    //
832    // The order of these tags matters, because we want to be able to choose the
833    // tag for a given datum quickly, with arithmetic, rather than slowly, with a
834    // stack of `if` statements.
835    //
836    // Separate tags for non-negative and negative numbers are used to avoid having to
837    // waste one bit in the actual data space to encode the sign.
838    NonNegativeInt16_0, // i.e., 0
839    NonNegativeInt16_8,
840    NonNegativeInt16_16,
841
842    NonNegativeInt32_0,
843    NonNegativeInt32_8,
844    NonNegativeInt32_16,
845    NonNegativeInt32_24,
846    NonNegativeInt32_32,
847
848    NonNegativeInt64_0,
849    NonNegativeInt64_8,
850    NonNegativeInt64_16,
851    NonNegativeInt64_24,
852    NonNegativeInt64_32,
853    NonNegativeInt64_40,
854    NonNegativeInt64_48,
855    NonNegativeInt64_56,
856    NonNegativeInt64_64,
857
858    NegativeInt16_0, // i.e., -1
859    NegativeInt16_8,
860    NegativeInt16_16,
861
862    NegativeInt32_0,
863    NegativeInt32_8,
864    NegativeInt32_16,
865    NegativeInt32_24,
866    NegativeInt32_32,
867
868    NegativeInt64_0,
869    NegativeInt64_8,
870    NegativeInt64_16,
871    NegativeInt64_24,
872    NegativeInt64_32,
873    NegativeInt64_40,
874    NegativeInt64_48,
875    NegativeInt64_56,
876    NegativeInt64_64,
877
878    // These are like the ones above, but for unsigned types. The
879    // situation is slightly simpler as we don't have negatives.
880    UInt8_0, // i.e., 0
881    UInt8_8,
882
883    UInt16_0,
884    UInt16_8,
885    UInt16_16,
886
887    UInt32_0,
888    UInt32_8,
889    UInt32_16,
890    UInt32_24,
891    UInt32_32,
892
893    UInt64_0,
894    UInt64_8,
895    UInt64_16,
896    UInt64_24,
897    UInt64_32,
898    UInt64_40,
899    UInt64_48,
900    UInt64_56,
901    UInt64_64,
902}
903
904impl Tag {
905    fn actual_int_length(self) -> Option<usize> {
906        use Tag::*;
907        let val = match self {
908            NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
909            | UInt32_0 | UInt64_0 => 0,
910            NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
911            | UInt32_8 | UInt64_8 => 1,
912            NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
913            | UInt32_16 | UInt64_16 => 2,
914            NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
915            NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
916            NonNegativeInt64_40 | UInt64_40 => 5,
917            NonNegativeInt64_48 | UInt64_48 => 6,
918            NonNegativeInt64_56 | UInt64_56 => 7,
919            NonNegativeInt64_64 | UInt64_64 => 8,
920            NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
921            NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
922            NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
923            NegativeInt32_24 | NegativeInt64_24 => 3,
924            NegativeInt32_32 | NegativeInt64_32 => 4,
925            NegativeInt64_40 => 5,
926            NegativeInt64_48 => 6,
927            NegativeInt64_56 => 7,
928            NegativeInt64_64 => 8,
929
930            _ => return None,
931        };
932        Some(val)
933    }
934}
935
936// --------------------------------------------------------------------------------
937// reading data
938
939/// Read a byte slice starting at byte `offset`.
940///
941/// Updates `offset` to point to the first byte after the end of the read region.
942fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
943    let len = u64::from_le_bytes(read_byte_array(data));
944    let len = usize::cast_from(len);
945    let (bytes, next) = data.split_at(len);
946    *data = next;
947    bytes
948}
949
950/// Read a data whose length is encoded in the row before its contents.
951///
952/// Updates `offset` to point to the first byte after the end of the read region.
953///
954/// # Safety
955///
956/// This function is safe if the datum's length and contents were previously written by `push_lengthed_bytes`,
957/// and it was only written with a `String` tag if it was indeed UTF-8.
958unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
959    let len = match tag {
960        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
961        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
962            usize::from(u16::from_le_bytes(read_byte_array(data)))
963        }
964        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
965            usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
966        }
967        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
968            usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
969        }
970        _ => unreachable!(),
971    };
972    let (bytes, next) = data.split_at(len);
973    *data = next;
974    match tag {
975        Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
976        Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
977            Datum::String(str::from_utf8_unchecked(bytes))
978        }
979        Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
980            Datum::List(DatumList { data: bytes })
981        }
982        _ => unreachable!(),
983    }
984}
985
986fn read_byte(data: &mut &[u8]) -> u8 {
987    let byte = data[0];
988    *data = &data[1..];
989    byte
990}
991
992/// Read `length` bytes from `data` at `offset`, updating the
993/// latter. Extend the resulting buffer to an array of `N` bytes by
994/// inserting `FILL` in the k most significant bytes, where k = N - length.
995///
996/// SAFETY:
997///   * length <= N
998///   * offset + length <= data.len()
999fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1000    data: &mut &[u8],
1001    length: usize,
1002) -> [u8; N] {
1003    let mut raw = [FILL; N];
1004    let (prev, next) = data.split_at(length);
1005    (raw[..prev.len()]).copy_from_slice(prev);
1006    *data = next;
1007    raw
1008}
1009/// Read `length` bytes from `data` at `offset`, updating the
1010/// latter. Extend the resulting buffer to a negative `N`-byte
1011/// twos complement integer by filling the remaining bits with 1.
1012///
1013/// SAFETY:
1014///   * length <= N
1015///   * offset + length <= data.len()
1016fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1017    read_byte_array_sign_extending::<N, 255>(data, length)
1018}
1019
1020/// Read `length` bytes from `data` at `offset`, updating the
1021/// latter. Extend the resulting buffer to a positive or zero `N`-byte
1022/// twos complement integer by filling the remaining bits with 0.
1023///
1024/// SAFETY:
1025///   * length <= N
1026///   * offset + length <= data.len()
1027fn read_byte_array_extending_nonnegative<const N: usize>(
1028    data: &mut &[u8],
1029    length: usize,
1030) -> [u8; N] {
1031    read_byte_array_sign_extending::<N, 0>(data, length)
1032}
1033
1034pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1035    let (prev, next) = data.split_first_chunk().unwrap();
1036    *data = next;
1037    *prev
1038}
1039
1040pub(super) fn read_date(data: &mut &[u8]) -> Date {
1041    let days = i32::from_le_bytes(read_byte_array(data));
1042    Date::from_pg_epoch(days).expect("unexpected date")
1043}
1044
1045pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1046    let year = i32::from_le_bytes(read_byte_array(data));
1047    let ordinal = u32::from_le_bytes(read_byte_array(data));
1048    NaiveDate::from_yo_opt(year, ordinal).unwrap()
1049}
1050
1051pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1052    let secs = u32::from_le_bytes(read_byte_array(data));
1053    let nanos = u32::from_le_bytes(read_byte_array(data));
1054    NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1055}
1056
1057/// Read a datum starting at byte `offset`.
1058///
1059/// Updates `offset` to point to the first byte after the end of the read region.
1060///
1061/// # Safety
1062///
1063/// This function is safe if a `Datum` was previously written at this offset by `push_datum`.
1064/// Otherwise it could return invalid values, which is Undefined Behavior.
1065pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1066    let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1067    match tag {
1068        Tag::Null => Datum::Null,
1069        Tag::False => Datum::False,
1070        Tag::True => Datum::True,
1071        Tag::UInt8_0 | Tag::UInt8_8 => {
1072            let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1073                data,
1074                tag.actual_int_length()
1075                    .expect("returns a value for variable-length-encoded integer tags"),
1076            ));
1077            Datum::UInt8(i)
1078        }
1079        Tag::Int16 => {
1080            let i = i16::from_le_bytes(read_byte_array(data));
1081            Datum::Int16(i)
1082        }
1083        Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1084            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1085            // and `data` is big enough because it was encoded validly. These assumptions
1086            // are checked in debug asserts.
1087            let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1088                data,
1089                tag.actual_int_length()
1090                    .expect("returns a value for variable-length-encoded integer tags"),
1091            ));
1092            Datum::Int16(i)
1093        }
1094        Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1095            let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1096                data,
1097                tag.actual_int_length()
1098                    .expect("returns a value for variable-length-encoded integer tags"),
1099            ));
1100            Datum::UInt16(i)
1101        }
1102        Tag::Int32 => {
1103            let i = i32::from_le_bytes(read_byte_array(data));
1104            Datum::Int32(i)
1105        }
1106        Tag::NonNegativeInt32_0
1107        | Tag::NonNegativeInt32_32
1108        | Tag::NonNegativeInt32_8
1109        | Tag::NonNegativeInt32_16
1110        | Tag::NonNegativeInt32_24 => {
1111            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1112            // and `data` is big enough because it was encoded validly. These assumptions
1113            // are checked in debug asserts.
1114            let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1115                data,
1116                tag.actual_int_length()
1117                    .expect("returns a value for variable-length-encoded integer tags"),
1118            ));
1119            Datum::Int32(i)
1120        }
1121        Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1122            let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1123                data,
1124                tag.actual_int_length()
1125                    .expect("returns a value for variable-length-encoded integer tags"),
1126            ));
1127            Datum::UInt32(i)
1128        }
1129        Tag::Int64 => {
1130            let i = i64::from_le_bytes(read_byte_array(data));
1131            Datum::Int64(i)
1132        }
1133        Tag::NonNegativeInt64_0
1134        | Tag::NonNegativeInt64_64
1135        | Tag::NonNegativeInt64_8
1136        | Tag::NonNegativeInt64_16
1137        | Tag::NonNegativeInt64_24
1138        | Tag::NonNegativeInt64_32
1139        | Tag::NonNegativeInt64_40
1140        | Tag::NonNegativeInt64_48
1141        | Tag::NonNegativeInt64_56 => {
1142            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1143            // and `data` is big enough because it was encoded validly. These assumptions
1144            // are checked in debug asserts.
1145
1146            let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1147                data,
1148                tag.actual_int_length()
1149                    .expect("returns a value for variable-length-encoded integer tags"),
1150            ));
1151            Datum::Int64(i)
1152        }
1153        Tag::UInt64_0
1154        | Tag::UInt64_8
1155        | Tag::UInt64_16
1156        | Tag::UInt64_24
1157        | Tag::UInt64_32
1158        | Tag::UInt64_40
1159        | Tag::UInt64_48
1160        | Tag::UInt64_56
1161        | Tag::UInt64_64 => {
1162            let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1163                data,
1164                tag.actual_int_length()
1165                    .expect("returns a value for variable-length-encoded integer tags"),
1166            ));
1167            Datum::UInt64(i)
1168        }
1169        Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1170            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1171            // and `data` is big enough because it was encoded validly. These assumptions
1172            // are checked in debug asserts.
1173            let i = i16::from_le_bytes(read_byte_array_extending_negative(
1174                data,
1175                tag.actual_int_length()
1176                    .expect("returns a value for variable-length-encoded integer tags"),
1177            ));
1178            Datum::Int16(i)
1179        }
1180        Tag::NegativeInt32_0
1181        | Tag::NegativeInt32_32
1182        | Tag::NegativeInt32_8
1183        | Tag::NegativeInt32_16
1184        | Tag::NegativeInt32_24 => {
1185            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1186            // and `data` is big enough because it was encoded validly. These assumptions
1187            // are checked in debug asserts.
1188            let i = i32::from_le_bytes(read_byte_array_extending_negative(
1189                data,
1190                tag.actual_int_length()
1191                    .expect("returns a value for variable-length-encoded integer tags"),
1192            ));
1193            Datum::Int32(i)
1194        }
1195        Tag::NegativeInt64_0
1196        | Tag::NegativeInt64_64
1197        | Tag::NegativeInt64_8
1198        | Tag::NegativeInt64_16
1199        | Tag::NegativeInt64_24
1200        | Tag::NegativeInt64_32
1201        | Tag::NegativeInt64_40
1202        | Tag::NegativeInt64_48
1203        | Tag::NegativeInt64_56 => {
1204            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1205            // and `data` is big enough because the row was encoded validly. These assumptions
1206            // are checked in debug asserts.
1207            let i = i64::from_le_bytes(read_byte_array_extending_negative(
1208                data,
1209                tag.actual_int_length()
1210                    .expect("returns a value for variable-length-encoded integer tags"),
1211            ));
1212            Datum::Int64(i)
1213        }
1214
1215        Tag::UInt8 => {
1216            let i = u8::from_le_bytes(read_byte_array(data));
1217            Datum::UInt8(i)
1218        }
1219        Tag::UInt16 => {
1220            let i = u16::from_le_bytes(read_byte_array(data));
1221            Datum::UInt16(i)
1222        }
1223        Tag::UInt32 => {
1224            let i = u32::from_le_bytes(read_byte_array(data));
1225            Datum::UInt32(i)
1226        }
1227        Tag::UInt64 => {
1228            let i = u64::from_le_bytes(read_byte_array(data));
1229            Datum::UInt64(i)
1230        }
1231        Tag::Float32 => {
1232            let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1233            Datum::Float32(OrderedFloat::from(f))
1234        }
1235        Tag::Float64 => {
1236            let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1237            Datum::Float64(OrderedFloat::from(f))
1238        }
1239        Tag::Date => Datum::Date(read_date(data)),
1240        Tag::Time => Datum::Time(read_time(data)),
1241        Tag::CheapTimestamp => {
1242            let ts = i64::from_le_bytes(read_byte_array(data));
1243            let secs = ts.div_euclid(1_000_000_000);
1244            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1245            let ndt = DateTime::from_timestamp(secs, nsecs)
1246                .expect("We only write round-trippable timestamps")
1247                .naive_utc();
1248            Datum::Timestamp(
1249                CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1250            )
1251        }
1252        Tag::CheapTimestampTz => {
1253            let ts = i64::from_le_bytes(read_byte_array(data));
1254            let secs = ts.div_euclid(1_000_000_000);
1255            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1256            let dt = DateTime::from_timestamp(secs, nsecs)
1257                .expect("We only write round-trippable timestamps");
1258            Datum::TimestampTz(
1259                CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1260            )
1261        }
1262        Tag::Timestamp => {
1263            let date = read_naive_date(data);
1264            let time = read_time(data);
1265            Datum::Timestamp(
1266                CheckedTimestamp::from_timestamplike(date.and_time(time))
1267                    .expect("unexpected timestamp"),
1268            )
1269        }
1270        Tag::TimestampTz => {
1271            let date = read_naive_date(data);
1272            let time = read_time(data);
1273            Datum::TimestampTz(
1274                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1275                    date.and_time(time),
1276                    Utc,
1277                ))
1278                .expect("unexpected timestamptz"),
1279            )
1280        }
1281        Tag::Interval => {
1282            let months = i32::from_le_bytes(read_byte_array(data));
1283            let days = i32::from_le_bytes(read_byte_array(data));
1284            let micros = i64::from_le_bytes(read_byte_array(data));
1285            Datum::Interval(Interval {
1286                months,
1287                days,
1288                micros,
1289            })
1290        }
1291        Tag::BytesTiny
1292        | Tag::BytesShort
1293        | Tag::BytesLong
1294        | Tag::BytesHuge
1295        | Tag::StringTiny
1296        | Tag::StringShort
1297        | Tag::StringLong
1298        | Tag::StringHuge
1299        | Tag::ListTiny
1300        | Tag::ListShort
1301        | Tag::ListLong
1302        | Tag::ListHuge => read_lengthed_datum(data, tag),
1303        Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1304        Tag::Array => {
1305            // See the comment in `Row::push_array` for details on the encoding
1306            // of arrays.
1307            let ndims = read_byte(data);
1308            let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1309            let (dims, next) = data.split_at(dims_size);
1310            *data = next;
1311            let bytes = read_untagged_bytes(data);
1312            Datum::Array(Array {
1313                dims: ArrayDimensions { data: dims },
1314                elements: DatumList { data: bytes },
1315            })
1316        }
1317        Tag::Dict => {
1318            let bytes = read_untagged_bytes(data);
1319            Datum::Map(DatumMap { data: bytes })
1320        }
1321        Tag::JsonNull => Datum::JsonNull,
1322        Tag::Dummy => Datum::Dummy,
1323        Tag::Numeric => {
1324            let digits = read_byte(data).into();
1325            let exponent = i8::reinterpret_cast(read_byte(data));
1326            let bits = read_byte(data);
1327
1328            let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1329            let lsu_u8_len = lsu_u16_len * 2;
1330            let (lsu_u8, next) = data.split_at(lsu_u8_len);
1331            *data = next;
1332
1333            // TODO: if we refactor the decimal library to accept the owned
1334            // array as a parameter to `from_raw_parts` below, we could likely
1335            // avoid a copy because it is exactly the value we want
1336            let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1337            for (i, c) in lsu_u8.chunks(2).enumerate() {
1338                lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1339            }
1340
1341            let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1342            Datum::from(d)
1343        }
1344        Tag::MzTimestamp => {
1345            let t = Timestamp::decode(read_byte_array(data));
1346            Datum::MzTimestamp(t)
1347        }
1348        Tag::Range => {
1349            // See notes on `push_range_with` for details about encoding.
1350            let flag_byte = read_byte(data);
1351            let flags = range::InternalFlags::from_bits(flag_byte)
1352                .expect("range flags must be encoded validly");
1353
1354            if flags.contains(range::InternalFlags::EMPTY) {
1355                assert!(
1356                    flags == range::InternalFlags::EMPTY,
1357                    "empty ranges contain only RANGE_EMPTY flag"
1358                );
1359
1360                return Datum::Range(Range { inner: None });
1361            }
1362
1363            let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1364                None
1365            } else {
1366                Some(DatumNested::extract(data))
1367            };
1368
1369            let lower = RangeBound {
1370                inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1371                bound: lower_bound,
1372            };
1373
1374            let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1375                None
1376            } else {
1377                Some(DatumNested::extract(data))
1378            };
1379
1380            let upper = RangeBound {
1381                inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1382                bound: upper_bound,
1383            };
1384
1385            Datum::Range(Range {
1386                inner: Some(RangeInner { lower, upper }),
1387            })
1388        }
1389        Tag::MzAclItem => {
1390            const N: usize = MzAclItem::binary_size();
1391            let mz_acl_item =
1392                MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1393            Datum::MzAclItem(mz_acl_item)
1394        }
1395        Tag::AclItem => {
1396            const N: usize = AclItem::binary_size();
1397            let acl_item =
1398                AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1399            Datum::AclItem(acl_item)
1400        }
1401    }
1402}
1403
1404// --------------------------------------------------------------------------------
1405// writing data
1406
1407fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1408where
1409    D: Vector<u8>,
1410{
1411    let len = u64::cast_from(bytes.len());
1412    data.extend_from_slice(&len.to_le_bytes());
1413    data.extend_from_slice(bytes);
1414}
1415
1416fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1417where
1418    D: Vector<u8>,
1419{
1420    match tag {
1421        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1422            let len = bytes.len().to_le_bytes();
1423            data.push(len[0]);
1424        }
1425        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1426            let len = bytes.len().to_le_bytes();
1427            data.extend_from_slice(&len[0..2]);
1428        }
1429        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1430            let len = bytes.len().to_le_bytes();
1431            data.extend_from_slice(&len[0..4]);
1432        }
1433        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1434            let len = bytes.len().to_le_bytes();
1435            data.extend_from_slice(&len);
1436        }
1437        _ => unreachable!(),
1438    }
1439    data.extend_from_slice(bytes);
1440}
1441
1442pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1443    i32::to_le_bytes(date.pg_epoch_days())
1444}
1445
1446fn push_date<D>(data: &mut D, date: Date)
1447where
1448    D: Vector<u8>,
1449{
1450    data.extend_from_slice(&date_to_array(date));
1451}
1452
1453pub(super) fn naive_date_to_arrays(
1454    date: NaiveDate,
1455) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1456    (
1457        i32::to_le_bytes(date.year()),
1458        u32::to_le_bytes(date.ordinal()),
1459    )
1460}
1461
1462fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1463where
1464    D: Vector<u8>,
1465{
1466    let (ds1, ds2) = naive_date_to_arrays(date);
1467    data.extend_from_slice(&ds1);
1468    data.extend_from_slice(&ds2);
1469}
1470
1471pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1472    (
1473        u32::to_le_bytes(time.num_seconds_from_midnight()),
1474        u32::to_le_bytes(time.nanosecond()),
1475    )
1476}
1477
1478fn push_time<D>(data: &mut D, time: NaiveTime)
1479where
1480    D: Vector<u8>,
1481{
1482    let (ts1, ts2) = time_to_arrays(time);
1483    data.extend_from_slice(&ts1);
1484    data.extend_from_slice(&ts2);
1485}
1486
1487/// Returns an i64 representing a `NaiveDateTime`, if
1488/// said i64 can be round-tripped back to a `NaiveDateTime`.
1489///
1490/// The only exotic NDTs for which this can't happen are those that
1491/// are hundreds of years in the future or past, or those that
1492/// represent a leap second. (Note that Materialize does not support
1493/// leap seconds, but this module does).
1494// This function is inspired by `NaiveDateTime::timestamp_nanos`,
1495// with extra checking.
1496fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1497    let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1498    if subsec_nanos >= 1_000_000_000 {
1499        return None;
1500    }
1501    let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1502    as_ns.checked_add(i64::from(subsec_nanos))
1503}
1504
1505// This function is extremely hot, so
1506// we just use `as` to avoid the overhead of
1507// `try_into` followed by `unwrap`.
1508// `leading_ones` and `leading_zeros`
1509// can never return values greater than 64, so the conversion is safe.
1510#[inline(always)]
1511#[allow(clippy::as_conversions)]
1512fn min_bytes_signed<T>(i: T) -> u8
1513where
1514    T: Into<i64>,
1515{
1516    let i: i64 = i.into();
1517
1518    // To fit in n bytes, we require that
1519    // everything but the leading sign bits fits in n*8
1520    // bits.
1521    let n_sign_bits = if i.is_negative() {
1522        i.leading_ones() as u8
1523    } else {
1524        i.leading_zeros() as u8
1525    };
1526
1527    (64 - n_sign_bits + 7) / 8
1528}
1529
1530// In principle we could just use `min_bytes_signed`, rather than
1531// having a separate function here, as long as we made that one take
1532// `T: Into<i128>` instead of 64. But LLVM doesn't seem smart enough
1533// to realize that that function is the same as the current version,
1534// and generates worse code.
1535//
1536// Justification for `as` is the same as in `min_bytes_signed`.
1537#[inline(always)]
1538#[allow(clippy::as_conversions)]
1539fn min_bytes_unsigned<T>(i: T) -> u8
1540where
1541    T: Into<u64>,
1542{
1543    let i: u64 = i.into();
1544
1545    let n_sign_bits = i.leading_zeros() as u8;
1546
1547    (64 - n_sign_bits + 7) / 8
1548}
1549
1550const TINY: usize = 1 << 8;
1551const SHORT: usize = 1 << 16;
1552const LONG: usize = 1 << 32;
1553
1554fn push_datum<D>(data: &mut D, datum: Datum)
1555where
1556    D: Vector<u8>,
1557{
1558    match datum {
1559        Datum::Null => data.push(Tag::Null.into()),
1560        Datum::False => data.push(Tag::False.into()),
1561        Datum::True => data.push(Tag::True.into()),
1562        Datum::Int16(i) => {
1563            let mbs = min_bytes_signed(i);
1564            let tag = u8::from(if i.is_negative() {
1565                Tag::NegativeInt16_0
1566            } else {
1567                Tag::NonNegativeInt16_0
1568            }) + mbs;
1569
1570            data.push(tag);
1571            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1572        }
1573        Datum::Int32(i) => {
1574            let mbs = min_bytes_signed(i);
1575            let tag = u8::from(if i.is_negative() {
1576                Tag::NegativeInt32_0
1577            } else {
1578                Tag::NonNegativeInt32_0
1579            }) + mbs;
1580
1581            data.push(tag);
1582            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1583        }
1584        Datum::Int64(i) => {
1585            let mbs = min_bytes_signed(i);
1586            let tag = u8::from(if i.is_negative() {
1587                Tag::NegativeInt64_0
1588            } else {
1589                Tag::NonNegativeInt64_0
1590            }) + mbs;
1591
1592            data.push(tag);
1593            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1594        }
1595        Datum::UInt8(i) => {
1596            let mbu = min_bytes_unsigned(i);
1597            let tag = u8::from(Tag::UInt8_0) + mbu;
1598            data.push(tag);
1599            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1600        }
1601        Datum::UInt16(i) => {
1602            let mbu = min_bytes_unsigned(i);
1603            let tag = u8::from(Tag::UInt16_0) + mbu;
1604            data.push(tag);
1605            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1606        }
1607        Datum::UInt32(i) => {
1608            let mbu = min_bytes_unsigned(i);
1609            let tag = u8::from(Tag::UInt32_0) + mbu;
1610            data.push(tag);
1611            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1612        }
1613        Datum::UInt64(i) => {
1614            let mbu = min_bytes_unsigned(i);
1615            let tag = u8::from(Tag::UInt64_0) + mbu;
1616            data.push(tag);
1617            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1618        }
1619        Datum::Float32(f) => {
1620            data.push(Tag::Float32.into());
1621            data.extend_from_slice(&f.to_bits().to_le_bytes());
1622        }
1623        Datum::Float64(f) => {
1624            data.push(Tag::Float64.into());
1625            data.extend_from_slice(&f.to_bits().to_le_bytes());
1626        }
1627        Datum::Date(d) => {
1628            data.push(Tag::Date.into());
1629            push_date(data, d);
1630        }
1631        Datum::Time(t) => {
1632            data.push(Tag::Time.into());
1633            push_time(data, t);
1634        }
1635        Datum::Timestamp(t) => {
1636            let datetime = t.to_naive();
1637            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1638                data.push(Tag::CheapTimestamp.into());
1639                data.extend_from_slice(&nanos.to_le_bytes());
1640            } else {
1641                data.push(Tag::Timestamp.into());
1642                push_naive_date(data, datetime.date());
1643                push_time(data, datetime.time());
1644            }
1645        }
1646        Datum::TimestampTz(t) => {
1647            let datetime = t.to_naive();
1648            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1649                data.push(Tag::CheapTimestampTz.into());
1650                data.extend_from_slice(&nanos.to_le_bytes());
1651            } else {
1652                data.push(Tag::TimestampTz.into());
1653                push_naive_date(data, datetime.date());
1654                push_time(data, datetime.time());
1655            }
1656        }
1657        Datum::Interval(i) => {
1658            data.push(Tag::Interval.into());
1659            data.extend_from_slice(&i.months.to_le_bytes());
1660            data.extend_from_slice(&i.days.to_le_bytes());
1661            data.extend_from_slice(&i.micros.to_le_bytes());
1662        }
1663        Datum::Bytes(bytes) => {
1664            let tag = match bytes.len() {
1665                0..TINY => Tag::BytesTiny,
1666                TINY..SHORT => Tag::BytesShort,
1667                SHORT..LONG => Tag::BytesLong,
1668                _ => Tag::BytesHuge,
1669            };
1670            data.push(tag.into());
1671            push_lengthed_bytes(data, bytes, tag);
1672        }
1673        Datum::String(string) => {
1674            let tag = match string.len() {
1675                0..TINY => Tag::StringTiny,
1676                TINY..SHORT => Tag::StringShort,
1677                SHORT..LONG => Tag::StringLong,
1678                _ => Tag::StringHuge,
1679            };
1680            data.push(tag.into());
1681            push_lengthed_bytes(data, string.as_bytes(), tag);
1682        }
1683        Datum::List(list) => {
1684            let tag = match list.data.len() {
1685                0..TINY => Tag::ListTiny,
1686                TINY..SHORT => Tag::ListShort,
1687                SHORT..LONG => Tag::ListLong,
1688                _ => Tag::ListHuge,
1689            };
1690            data.push(tag.into());
1691            push_lengthed_bytes(data, list.data, tag);
1692        }
1693        Datum::Uuid(u) => {
1694            data.push(Tag::Uuid.into());
1695            data.extend_from_slice(u.as_bytes());
1696        }
1697        Datum::Array(array) => {
1698            // See the comment in `Row::push_array` for details on the encoding
1699            // of arrays.
1700            data.push(Tag::Array.into());
1701            data.push(array.dims.ndims());
1702            data.extend_from_slice(array.dims.data);
1703            push_untagged_bytes(data, array.elements.data);
1704        }
1705        Datum::Map(dict) => {
1706            data.push(Tag::Dict.into());
1707            push_untagged_bytes(data, dict.data);
1708        }
1709        Datum::JsonNull => data.push(Tag::JsonNull.into()),
1710        Datum::MzTimestamp(t) => {
1711            data.push(Tag::MzTimestamp.into());
1712            data.extend_from_slice(&t.encode());
1713        }
1714        Datum::Dummy => data.push(Tag::Dummy.into()),
1715        Datum::Numeric(mut n) => {
1716            // Pseudo-canonical representation of decimal values with
1717            // insignificant zeroes trimmed. This compresses the number further
1718            // than `Numeric::trim` by removing all zeroes, and not only those in
1719            // the fractional component.
1720            numeric::cx_datum().reduce(&mut n.0);
1721            let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1722            data.push(Tag::Numeric.into());
1723            data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1724            data.push(
1725                i8::try_from(exponent)
1726                    .expect("exponent to fit within i8; should not exceed +/- 39")
1727                    .to_le_bytes()[0],
1728            );
1729            data.push(bits);
1730
1731            let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1732
1733            // Little endian machines can take the lsu directly from u16 to u8.
1734            if cfg!(target_endian = "little") {
1735                // SAFETY: `lsu` (returned by `coefficient_units()`) is a `&[u16]`, so
1736                // each element can safely be transmuted into two `u8`s.
1737                let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
1738                // The `u8` aligned version of the `lsu` should have twice as many
1739                // elements as we expect for the `u16` version.
1740                soft_assert_no_log!(
1741                    lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
1742                    "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
1743                    Numeric::digits_to_lsu_elements_len(digits) * 2,
1744                    lsu_bytes.len()
1745                );
1746                // There should be no unaligned elements in the prefix or suffix.
1747                soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
1748                data.extend_from_slice(lsu_bytes);
1749            } else {
1750                for u in lsu {
1751                    data.extend_from_slice(&u.to_le_bytes());
1752                }
1753            }
1754        }
1755        Datum::Range(range) => {
1756            // See notes on `push_range_with` for details about encoding.
1757            data.push(Tag::Range.into());
1758            data.push(range.internal_flag_bits());
1759
1760            if let Some(RangeInner { lower, upper }) = range.inner {
1761                for bound in [lower.bound, upper.bound] {
1762                    if let Some(bound) = bound {
1763                        match bound.datum() {
1764                            Datum::Null => panic!("cannot push Datum::Null into range"),
1765                            d => push_datum::<D>(data, d),
1766                        }
1767                    }
1768                }
1769            }
1770        }
1771        Datum::MzAclItem(mz_acl_item) => {
1772            data.push(Tag::MzAclItem.into());
1773            data.extend_from_slice(&mz_acl_item.encode_binary());
1774        }
1775        Datum::AclItem(acl_item) => {
1776            data.push(Tag::AclItem.into());
1777            data.extend_from_slice(&acl_item.encode_binary());
1778        }
1779    }
1780}
1781
1782/// Return the number of bytes these Datums would use if packed as a Row.
1783pub fn row_size<'a, I>(a: I) -> usize
1784where
1785    I: IntoIterator<Item = Datum<'a>>,
1786{
1787    // Using datums_size instead of a.data().len() here is safer because it will
1788    // return the size of the datums if they were packed into a Row. Although
1789    // a.data().len() happens to give the correct answer (and is faster), data()
1790    // is documented as for debugging only.
1791    let sz = datums_size::<_, _>(a);
1792    let size_of_row = std::mem::size_of::<Row>();
1793    // The Row struct attempts to inline data until it can't fit in the
1794    // preallocated size. Otherwise it spills to heap, and uses the Row to point
1795    // to that.
1796    if sz > Row::SIZE {
1797        sz + size_of_row
1798    } else {
1799        size_of_row
1800    }
1801}
1802
1803/// Number of bytes required by the datum.
1804/// This is used to optimistically pre-allocate buffers for packing rows.
1805pub fn datum_size(datum: &Datum) -> usize {
1806    match datum {
1807        Datum::Null => 1,
1808        Datum::False => 1,
1809        Datum::True => 1,
1810        Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
1811        Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
1812        Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
1813        Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1814        Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1815        Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1816        Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1817        Datum::Float32(_) => 1 + size_of::<f32>(),
1818        Datum::Float64(_) => 1 + size_of::<f64>(),
1819        Datum::Date(_) => 1 + size_of::<i32>(),
1820        Datum::Time(_) => 1 + 8,
1821        Datum::Timestamp(t) => {
1822            1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
1823                8
1824            } else {
1825                16
1826            }
1827        }
1828        Datum::TimestampTz(t) => {
1829            1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
1830                8
1831            } else {
1832                16
1833            }
1834        }
1835        Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
1836        Datum::Bytes(bytes) => {
1837            // We use a variable length representation of slice length.
1838            let bytes_for_length = match bytes.len() {
1839                0..TINY => 1,
1840                TINY..SHORT => 2,
1841                SHORT..LONG => 4,
1842                _ => 8,
1843            };
1844            1 + bytes_for_length + bytes.len()
1845        }
1846        Datum::String(string) => {
1847            // We use a variable length representation of slice length.
1848            let bytes_for_length = match string.len() {
1849                0..TINY => 1,
1850                TINY..SHORT => 2,
1851                SHORT..LONG => 4,
1852                _ => 8,
1853            };
1854            1 + bytes_for_length + string.len()
1855        }
1856        Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
1857        Datum::Array(array) => {
1858            1 + size_of::<u8>()
1859                + array.dims.data.len()
1860                + size_of::<u64>()
1861                + array.elements.data.len()
1862        }
1863        Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
1864        Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
1865        Datum::JsonNull => 1,
1866        Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
1867        Datum::Dummy => 1,
1868        Datum::Numeric(d) => {
1869            let mut d = d.0.clone();
1870            // Values must be reduced to determine appropriate number of
1871            // coefficient units.
1872            numeric::cx_datum().reduce(&mut d);
1873            // 4 = 1 bit each for tag, digits, exponent, bits
1874            4 + (d.coefficient_units().len() * 2)
1875        }
1876        Datum::Range(Range { inner }) => {
1877            // Tag + flags
1878            2 + match inner {
1879                None => 0,
1880                Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
1881                    .iter()
1882                    .map(|bound| match bound {
1883                        None => 0,
1884                        Some(bound) => bound.val.len(),
1885                    })
1886                    .sum(),
1887            }
1888        }
1889        Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
1890        Datum::AclItem(_) => 1 + AclItem::binary_size(),
1891    }
1892}
1893
1894/// Number of bytes required by a sequence of datums.
1895///
1896/// This method can be used to right-size the allocation for a `Row`
1897/// before calling [`RowPacker::extend`].
1898pub fn datums_size<'a, I, D>(iter: I) -> usize
1899where
1900    I: IntoIterator<Item = D>,
1901    D: Borrow<Datum<'a>>,
1902{
1903    iter.into_iter().map(|d| datum_size(d.borrow())).sum()
1904}
1905
1906/// Number of bytes required by a list of datums. This computes the size that would be required if
1907/// the given datums were packed into a list.
1908///
1909/// This is used to optimistically pre-allocate buffers for packing rows.
1910pub fn datum_list_size<'a, I, D>(iter: I) -> usize
1911where
1912    I: IntoIterator<Item = D>,
1913    D: Borrow<Datum<'a>>,
1914{
1915    1 + size_of::<u64>() + datums_size(iter)
1916}
1917
1918impl RowPacker<'_> {
1919    /// Constructs a row packer that will pack additional datums into the
1920    /// provided row.
1921    ///
1922    /// This function is intentionally somewhat inconvenient to call. You
1923    /// usually want to call [`Row::packer`] instead to start packing from
1924    /// scratch.
1925    pub fn for_existing_row(row: &mut Row) -> RowPacker {
1926        RowPacker { row }
1927    }
1928
1929    /// Extend an existing `Row` with a `Datum`.
1930    #[inline]
1931    pub fn push<'a, D>(&mut self, datum: D)
1932    where
1933        D: Borrow<Datum<'a>>,
1934    {
1935        push_datum(&mut self.row.data, *datum.borrow());
1936    }
1937
1938    /// Extend an existing `Row` with additional `Datum`s.
1939    #[inline]
1940    pub fn extend<'a, I, D>(&mut self, iter: I)
1941    where
1942        I: IntoIterator<Item = D>,
1943        D: Borrow<Datum<'a>>,
1944    {
1945        for datum in iter {
1946            push_datum(&mut self.row.data, *datum.borrow())
1947        }
1948    }
1949
1950    /// Extend an existing `Row` with additional `Datum`s.
1951    ///
1952    /// In the case the iterator produces an error, the pushing of
1953    /// datums in terminated and the error returned. The `Row` will
1954    /// be incomplete, but it will be safe to read datums from it.
1955    #[inline]
1956    pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
1957    where
1958        I: IntoIterator<Item = Result<D, E>>,
1959        D: Borrow<Datum<'a>>,
1960    {
1961        for datum in iter {
1962            push_datum(&mut self.row.data, *datum?.borrow());
1963        }
1964        Ok(())
1965    }
1966
1967    /// Appends the datums of an entire `Row`.
1968    pub fn extend_by_row(&mut self, row: &Row) {
1969        self.row.data.extend_from_slice(row.data.as_slice());
1970    }
1971
1972    /// Appends the slice of data representing an entire `Row`. The data is not validated.
1973    ///
1974    /// # Safety
1975    ///
1976    /// The requirements from [`Row::from_bytes_unchecked`] apply here, too:
1977    /// This method relies on `data` being an appropriate row encoding, and can
1978    /// result in unsafety if this is not the case.
1979    #[inline]
1980    pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
1981        self.row.data.extend_from_slice(data)
1982    }
1983
1984    /// Pushes a [`DatumList`] that is built from a closure.
1985    ///
1986    /// The supplied closure will be invoked once with a `Row` that can be used
1987    /// to populate the list. It is valid to call any method on the
1988    /// [`RowPacker`] except for [`RowPacker::clear`], [`RowPacker::truncate`],
1989    /// or [`RowPacker::truncate_datums`].
1990    ///
1991    /// Returns the value returned by the closure, if any.
1992    ///
1993    /// ```
1994    /// # use mz_repr::{Row, Datum};
1995    /// let mut row = Row::default();
1996    /// row.packer().push_list_with(|row| {
1997    ///     row.push(Datum::String("age"));
1998    ///     row.push(Datum::Int64(42));
1999    /// });
2000    /// assert_eq!(
2001    ///     row.unpack_first().unwrap_list().iter().collect::<Vec<_>>(),
2002    ///     vec![Datum::String("age"), Datum::Int64(42)],
2003    /// );
2004    /// ```
2005    #[inline]
2006    pub fn push_list_with<F, R>(&mut self, f: F) -> R
2007    where
2008        F: FnOnce(&mut RowPacker) -> R,
2009    {
2010        // First, assume that the list will fit in 255 bytes, and thus the length will fit in
2011        // 1 byte. If not, we'll fix it up later.
2012        let start = self.row.data.len();
2013        self.row.data.push(Tag::ListTiny.into());
2014        // Write a dummy len, will fix it up later.
2015        self.row.data.push(0);
2016
2017        let out = f(self);
2018
2019        // The `- 1 - 1` is for the tag and the len.
2020        let len = self.row.data.len() - start - 1 - 1;
2021        // We now know the real len.
2022        if len < TINY {
2023            // If the len fits in 1 byte, we just need to fix up the len.
2024            self.row.data[start + 1] = len.to_le_bytes()[0];
2025        } else {
2026            // Note: We move this code path into its own function, so that the common case can be
2027            // inlined.
2028            long_list(&mut self.row.data, start, len);
2029        }
2030
2031        /// 1. Fix up the tag.
2032        /// 2. Move the actual data a bit (for which we also need to make room at the end).
2033        /// 3. Fix up the len.
2034        /// `data`: The row's backing data.
2035        /// `start`: where `push_list_with` started writing in `data`.
2036        /// `len`: the length of the data, excluding the tag and the length.
2037        #[cold]
2038        fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2039            // `len_len`: the length of the length. (Possible values are: 2, 4, 8. 1 is handled
2040            // elsewhere.) The other parameters are the same as for `long_list`.
2041            let long_list_inner = |data: &mut CompactBytes, len_len| {
2042                // We'll need memory for the new, bigger length, so make the `CompactBytes` bigger.
2043                // The `- 1` is because the old length was 1 byte.
2044                const ZEROS: [u8; 8] = [0; 8];
2045                data.extend_from_slice(&ZEROS[0..len_len - 1]);
2046                // Move the data to the end of the `CompactBytes`, to make space for the new length.
2047                // Originally, it started after the 1-byte tag and the 1-byte length, now it will
2048                // start after the 1-byte tag and the len_len-byte length.
2049                //
2050                // Note that this is the only operation in `long_list` whose cost is proportional
2051                // to `len`. Since `len` is at least 256 here, the other operations' cost are
2052                // negligible. `copy_within` is a memmove, which is probably a fair bit faster per
2053                // Datum than a Datum encoding in the `f` closure.
2054                data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2055                // Write the new length.
2056                data[start + 1..start + 1 + len_len]
2057                    .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2058            };
2059            match len {
2060                0..TINY => {
2061                    unreachable!()
2062                }
2063                TINY..SHORT => {
2064                    data[start] = Tag::ListShort.into();
2065                    long_list_inner(data, 2);
2066                }
2067                SHORT..LONG => {
2068                    data[start] = Tag::ListLong.into();
2069                    long_list_inner(data, 4);
2070                }
2071                _ => {
2072                    data[start] = Tag::ListHuge.into();
2073                    long_list_inner(data, 8);
2074                }
2075            };
2076        }
2077
2078        out
2079    }
2080
2081    /// Pushes a [`DatumMap`] that is built from a closure.
2082    ///
2083    /// The supplied closure will be invoked once with a `Row` that can be used
2084    /// to populate the dict.
2085    ///
2086    /// The closure **must** alternate pushing string keys and arbitrary values,
2087    /// otherwise reading the dict will cause a panic.
2088    ///
2089    /// The closure **must** push keys in ascending order, otherwise equality
2090    /// checks on the resulting `Row` may be wrong and reading the dict IN DEBUG
2091    /// MODE will cause a panic.
2092    ///
2093    /// The closure **must not** call [`RowPacker::clear`],
2094    /// [`RowPacker::truncate`], or [`RowPacker::truncate_datums`].
2095    ///
2096    /// # Example
2097    ///
2098    /// ```
2099    /// # use mz_repr::{Row, Datum};
2100    /// let mut row = Row::default();
2101    /// row.packer().push_dict_with(|row| {
2102    ///
2103    ///     // key
2104    ///     row.push(Datum::String("age"));
2105    ///     // value
2106    ///     row.push(Datum::Int64(42));
2107    ///
2108    ///     // key
2109    ///     row.push(Datum::String("name"));
2110    ///     // value
2111    ///     row.push(Datum::String("bob"));
2112    /// });
2113    /// assert_eq!(
2114    ///     row.unpack_first().unwrap_map().iter().collect::<Vec<_>>(),
2115    ///     vec![("age", Datum::Int64(42)), ("name", Datum::String("bob"))]
2116    /// );
2117    /// ```
2118    pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2119    where
2120        F: FnOnce(&mut RowPacker) -> R,
2121    {
2122        self.row.data.push(Tag::Dict.into());
2123        let start = self.row.data.len();
2124        // write a dummy len, will fix it up later
2125        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2126
2127        let res = f(self);
2128
2129        let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2130        // fix up the len
2131        self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2132
2133        res
2134    }
2135
2136    /// Convenience function to construct an array from an iter of `Datum`s.
2137    ///
2138    /// Returns an error if the number of elements in `iter` does not match
2139    /// the cardinality of the array as described by `dims`, or if the
2140    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2141    /// occurs, the packer's state will be unchanged.
2142    pub fn try_push_array<'a, I, D>(
2143        &mut self,
2144        dims: &[ArrayDimension],
2145        iter: I,
2146    ) -> Result<(), InvalidArrayError>
2147    where
2148        I: IntoIterator<Item = D>,
2149        D: Borrow<Datum<'a>>,
2150    {
2151        // SAFETY: The function returns the exact number of elements pushed into the array.
2152        unsafe {
2153            self.push_array_with_unchecked(dims, |packer| {
2154                let mut nelements = 0;
2155                for datum in iter {
2156                    packer.push(datum);
2157                    nelements += 1;
2158                }
2159                Ok::<_, InvalidArrayError>(nelements)
2160            })
2161        }
2162    }
2163
2164    /// Convenience function to construct an array from a function. The function must return the
2165    /// number of elements it pushed into the array. It is undefined behavior if the function returns
2166    /// a number different to the number of elements it pushed.
2167    ///
2168    /// Returns an error if the number of elements pushed by `f` does not match
2169    /// the cardinality of the array as described by `dims`, or if the
2170    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`], or if `f` errors. If an error
2171    /// occurs, the packer's state will be unchanged.
2172    pub unsafe fn push_array_with_unchecked<F, E>(
2173        &mut self,
2174        dims: &[ArrayDimension],
2175        f: F,
2176    ) -> Result<(), E>
2177    where
2178        F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2179        E: From<InvalidArrayError>,
2180    {
2181        // Arrays are encoded as follows.
2182        //
2183        // u8    ndims
2184        // u64   dim_0 lower bound
2185        // u64   dim_0 length
2186        // ...
2187        // u64   dim_n lower bound
2188        // u64   dim_n length
2189        // u64   element data size in bytes
2190        // u8    element data, where elements are encoded in row-major order
2191
2192        if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2193            return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2194        }
2195
2196        let start = self.row.data.len();
2197        self.row.data.push(Tag::Array.into());
2198
2199        // Write dimension information.
2200        self.row
2201            .data
2202            .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2203        for dim in dims {
2204            self.row
2205                .data
2206                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2207            self.row
2208                .data
2209                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2210        }
2211
2212        // Write elements.
2213        let off = self.row.data.len();
2214        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2215        let nelements = match f(self) {
2216            Ok(nelements) => nelements,
2217            Err(e) => {
2218                self.row.data.truncate(start);
2219                return Err(e);
2220            }
2221        };
2222        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2223        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2224
2225        // Check that the number of elements written matches the dimension
2226        // information.
2227        let cardinality = match dims {
2228            [] => 0,
2229            dims => dims.iter().map(|d| d.length).product(),
2230        };
2231        if nelements != cardinality {
2232            self.row.data.truncate(start);
2233            return Err(InvalidArrayError::WrongCardinality {
2234                actual: nelements,
2235                expected: cardinality,
2236            }
2237            .into());
2238        }
2239
2240        Ok(())
2241    }
2242
2243    /// Pushes an [`Array`] that is built from a closure.
2244    ///
2245    /// __WARNING__: This is fairly "sharp" tool that is easy to get wrong. You
2246    /// should prefer [`RowPacker::try_push_array`] when possible.
2247    ///
2248    /// Returns an error if the number of elements pushed does not match
2249    /// the cardinality of the array as described by `dims`, or if the
2250    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2251    /// occurs, the packer's state will be unchanged.
2252    pub fn push_array_with_row_major<F, I>(
2253        &mut self,
2254        dims: I,
2255        f: F,
2256    ) -> Result<(), InvalidArrayError>
2257    where
2258        I: IntoIterator<Item = ArrayDimension>,
2259        F: FnOnce(&mut RowPacker) -> usize,
2260    {
2261        let start = self.row.data.len();
2262        self.row.data.push(Tag::Array.into());
2263
2264        // Write dummy dimension length for now, we'll fix it up.
2265        let dims_start = self.row.data.len();
2266        self.row.data.push(42);
2267
2268        let mut num_dims: u8 = 0;
2269        let mut cardinality: usize = 1;
2270        for dim in dims {
2271            num_dims += 1;
2272            cardinality *= dim.length;
2273
2274            self.row
2275                .data
2276                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2277            self.row
2278                .data
2279                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2280        }
2281
2282        if num_dims > MAX_ARRAY_DIMENSIONS {
2283            // Reset the packer state so we don't have invalid data.
2284            self.row.data.truncate(start);
2285            return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2286        }
2287        // Fix up our dimension length.
2288        self.row.data[dims_start..dims_start + size_of::<u8>()]
2289            .copy_from_slice(&num_dims.to_le_bytes());
2290
2291        // Write elements.
2292        let off = self.row.data.len();
2293        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2294
2295        let nelements = f(self);
2296
2297        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2298        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2299
2300        // Check that the number of elements written matches the dimension
2301        // information.
2302        let cardinality = match num_dims {
2303            0 => 0,
2304            _ => cardinality,
2305        };
2306        if nelements != cardinality {
2307            self.row.data.truncate(start);
2308            return Err(InvalidArrayError::WrongCardinality {
2309                actual: nelements,
2310                expected: cardinality,
2311            });
2312        }
2313
2314        Ok(())
2315    }
2316
2317    /// Convenience function to push a `DatumList` from an iter of `Datum`s
2318    ///
2319    /// See [`RowPacker::push_dict_with`] if you need to be able to handle errors
2320    pub fn push_list<'a, I, D>(&mut self, iter: I)
2321    where
2322        I: IntoIterator<Item = D>,
2323        D: Borrow<Datum<'a>>,
2324    {
2325        self.push_list_with(|packer| {
2326            for elem in iter {
2327                packer.push(*elem.borrow())
2328            }
2329        });
2330    }
2331
2332    /// Convenience function to push a `DatumMap` from an iter of `(&str, Datum)` pairs
2333    pub fn push_dict<'a, I, D>(&mut self, iter: I)
2334    where
2335        I: IntoIterator<Item = (&'a str, D)>,
2336        D: Borrow<Datum<'a>>,
2337    {
2338        self.push_dict_with(|packer| {
2339            for (k, v) in iter {
2340                packer.push(Datum::String(k));
2341                packer.push(*v.borrow())
2342            }
2343        })
2344    }
2345
2346    /// Pushes a `Datum::Range` derived from the `Range<Datum<'a>`.
2347    ///
2348    /// # Panics
2349    /// - If lower and upper express finite values and they are datums of
2350    ///   different types.
2351    /// - If lower or upper express finite values and are equal to
2352    ///   `Datum::Null`. To handle `Datum::Null` properly, use
2353    ///   [`RangeBound::new`].
2354    ///
2355    /// # Notes
2356    /// - This function canonicalizes the range before pushing it to the row.
2357    /// - Prefer this function over `push_range_with` because of its
2358    ///   canonicaliztion.
2359    /// - Prefer creating [`RangeBound`]s using [`RangeBound::new`], which
2360    ///   handles `Datum::Null` in a SQL-friendly way.
2361    pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2362        range.canonicalize()?;
2363        match range.inner {
2364            None => {
2365                self.row.data.push(Tag::Range.into());
2366                // Untagged bytes only contains the `RANGE_EMPTY` flag value.
2367                self.row.data.push(range::InternalFlags::EMPTY.bits());
2368                Ok(())
2369            }
2370            Some(inner) => self.push_range_with(
2371                RangeLowerBound {
2372                    inclusive: inner.lower.inclusive,
2373                    bound: inner
2374                        .lower
2375                        .bound
2376                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2377                },
2378                RangeUpperBound {
2379                    inclusive: inner.upper.inclusive,
2380                    bound: inner
2381                        .upper
2382                        .bound
2383                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2384                },
2385            ),
2386        }
2387    }
2388
2389    /// Pushes a `DatumRange` built from the specified arguments.
2390    ///
2391    /// # Warning
2392    /// Unlike `push_range`, `push_range_with` _does not_ canonicalize its
2393    /// inputs. Consequentially, this means it's possible to generate ranges
2394    /// that will not reflect the proper ordering and equality.
2395    ///
2396    /// # Panics
2397    /// - If lower or upper expresses a finite value and does not push exactly
2398    ///   one value into the `RowPacker`.
2399    /// - If lower and upper express finite values and they are datums of
2400    ///   different types.
2401    /// - If lower or upper express finite values and push `Datum::Null`.
2402    ///
2403    /// # Notes
2404    /// - Prefer `push_range_with` over this function. This function should be
2405    ///   used only when you are not pushing `Datum`s to the inner row.
2406    /// - Range encoding is `[<flag bytes>,<lower>?,<upper>?]`, where `lower`
2407    ///   and `upper` are optional, contingent on the flag value expressing an
2408    ///   empty range (where neither will be present) or infinite bounds (where
2409    ///   each infinite bound will be absent).
2410    /// - To push an emtpy range, use `push_range` using `Range { inner: None }`.
2411    pub fn push_range_with<L, U, E>(
2412        &mut self,
2413        lower: RangeLowerBound<L>,
2414        upper: RangeUpperBound<U>,
2415    ) -> Result<(), E>
2416    where
2417        L: FnOnce(&mut RowPacker) -> Result<(), E>,
2418        U: FnOnce(&mut RowPacker) -> Result<(), E>,
2419        E: From<InvalidRangeError>,
2420    {
2421        let start = self.row.data.len();
2422        self.row.data.push(Tag::Range.into());
2423
2424        let mut flags = range::InternalFlags::empty();
2425
2426        flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2427        flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2428        flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2429        flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2430
2431        let mut expected_datums = 0;
2432
2433        self.row.data.push(flags.bits());
2434
2435        let datum_check = self.row.data.len();
2436
2437        if let Some(value) = lower.bound {
2438            let start = self.row.data.len();
2439            value(self)?;
2440            assert!(
2441                start < self.row.data.len(),
2442                "finite values must each push exactly one value; expected 1 but got 0"
2443            );
2444            expected_datums += 1;
2445        }
2446
2447        if let Some(value) = upper.bound {
2448            let start = self.row.data.len();
2449            value(self)?;
2450            assert!(
2451                start < self.row.data.len(),
2452                "finite values must each push exactly one value; expected 1 but got 0"
2453            );
2454            expected_datums += 1;
2455        }
2456
2457        // Validate the invariants that 0, 1, or 2 elements were pushed, none are Null,
2458        // and if two are pushed then the second is not less than the first. Panic in
2459        // some cases and error in others.
2460        let mut actual_datums = 0;
2461        let mut seen = None;
2462        let mut dataz = &self.row.data[datum_check..];
2463        while !dataz.is_empty() {
2464            let d = unsafe { read_datum(&mut dataz) };
2465            assert!(d != Datum::Null, "cannot push Datum::Null into range");
2466
2467            match seen {
2468                None => seen = Some(d),
2469                Some(seen) => {
2470                    let seen_kind = DatumKind::from(seen);
2471                    let d_kind = DatumKind::from(d);
2472                    assert!(
2473                        seen_kind == d_kind,
2474                        "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2475                    );
2476
2477                    if seen > d {
2478                        self.row.data.truncate(start);
2479                        return Err(InvalidRangeError::MisorderedRangeBounds.into());
2480                    }
2481                }
2482            }
2483            actual_datums += 1;
2484        }
2485
2486        assert!(
2487            actual_datums == expected_datums,
2488            "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2489        );
2490
2491        Ok(())
2492    }
2493
2494    /// Clears the contents of the packer without de-allocating its backing memory.
2495    pub fn clear(&mut self) {
2496        self.row.data.clear();
2497    }
2498
2499    /// Truncates the underlying storage to the specified byte position.
2500    ///
2501    /// # Safety
2502    ///
2503    /// `pos` MUST specify a byte offset that lies on a datum boundary.
2504    /// If `pos` specifies a byte offset that is *within* a datum, the row
2505    /// packer will produce an invalid row, the unpacking of which may
2506    /// trigger undefined behavior!
2507    ///
2508    /// To find the byte offset of a datum boundary, inspect the packer's
2509    /// byte length by calling `packer.data().len()` after pushing the desired
2510    /// number of datums onto the packer.
2511    pub unsafe fn truncate(&mut self, pos: usize) {
2512        self.row.data.truncate(pos)
2513    }
2514
2515    /// Truncates the underlying row to contain at most the first `n` datums.
2516    pub fn truncate_datums(&mut self, n: usize) {
2517        let prev_len = self.row.data.len();
2518        let mut iter = self.row.iter();
2519        for _ in iter.by_ref().take(n) {}
2520        let next_len = iter.data.len();
2521        // SAFETY: iterator offsets always lie on a datum boundary.
2522        unsafe { self.truncate(prev_len - next_len) }
2523    }
2524
2525    /// Returns the total amount of bytes used by the underlying row.
2526    pub fn byte_len(&self) -> usize {
2527        self.row.byte_len()
2528    }
2529}
2530
2531impl<'a> IntoIterator for &'a Row {
2532    type Item = Datum<'a>;
2533    type IntoIter = DatumListIter<'a>;
2534    fn into_iter(self) -> DatumListIter<'a> {
2535        self.iter()
2536    }
2537}
2538
2539impl fmt::Debug for Row {
2540    /// Debug representation using the internal datums
2541    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2542        f.write_str("Row{")?;
2543        f.debug_list().entries(self.iter()).finish()?;
2544        f.write_str("}")
2545    }
2546}
2547
2548impl fmt::Display for Row {
2549    /// Display representation using the internal datums
2550    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2551        f.write_str("(")?;
2552        for (i, datum) in self.iter().enumerate() {
2553            if i != 0 {
2554                f.write_str(", ")?;
2555            }
2556            write!(f, "{}", datum)?;
2557        }
2558        f.write_str(")")
2559    }
2560}
2561
2562impl<'a> DatumList<'a> {
2563    pub fn empty() -> DatumList<'static> {
2564        DatumList { data: &[] }
2565    }
2566
2567    pub fn iter(&self) -> DatumListIter<'a> {
2568        DatumListIter { data: self.data }
2569    }
2570
2571    /// For debugging only
2572    pub fn data(&self) -> &'a [u8] {
2573        self.data
2574    }
2575}
2576
2577impl<'a> IntoIterator for &'a DatumList<'a> {
2578    type Item = Datum<'a>;
2579    type IntoIter = DatumListIter<'a>;
2580    fn into_iter(self) -> DatumListIter<'a> {
2581        self.iter()
2582    }
2583}
2584
2585impl<'a> Iterator for DatumListIter<'a> {
2586    type Item = Datum<'a>;
2587    fn next(&mut self) -> Option<Self::Item> {
2588        if self.data.is_empty() {
2589            None
2590        } else {
2591            Some(unsafe { read_datum(&mut self.data) })
2592        }
2593    }
2594}
2595
2596impl<'a> DatumMap<'a> {
2597    pub fn empty() -> DatumMap<'static> {
2598        DatumMap { data: &[] }
2599    }
2600
2601    pub fn iter(&self) -> DatumDictIter<'a> {
2602        DatumDictIter {
2603            data: self.data,
2604            prev_key: None,
2605        }
2606    }
2607
2608    /// For debugging only
2609    pub fn data(&self) -> &'a [u8] {
2610        self.data
2611    }
2612}
2613
2614impl<'a> Debug for DatumMap<'a> {
2615    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2616        f.debug_map().entries(self.iter()).finish()
2617    }
2618}
2619
2620impl<'a> IntoIterator for &'a DatumMap<'a> {
2621    type Item = (&'a str, Datum<'a>);
2622    type IntoIter = DatumDictIter<'a>;
2623    fn into_iter(self) -> DatumDictIter<'a> {
2624        self.iter()
2625    }
2626}
2627
2628impl<'a> Iterator for DatumDictIter<'a> {
2629    type Item = (&'a str, Datum<'a>);
2630    fn next(&mut self) -> Option<Self::Item> {
2631        if self.data.is_empty() {
2632            None
2633        } else {
2634            let key_tag =
2635                Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2636            assert!(
2637                key_tag == Tag::StringTiny
2638                    || key_tag == Tag::StringShort
2639                    || key_tag == Tag::StringLong
2640                    || key_tag == Tag::StringHuge,
2641                "Dict keys must be strings, got {:?}",
2642                key_tag
2643            );
2644            let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
2645            let val = unsafe { read_datum(&mut self.data) };
2646
2647            // if in debug mode, sanity check keys
2648            if cfg!(debug_assertions) {
2649                if let Some(prev_key) = self.prev_key {
2650                    debug_assert!(
2651                        prev_key < key,
2652                        "Dict keys must be unique and given in ascending order: {} came before {}",
2653                        prev_key,
2654                        key
2655                    );
2656                }
2657                self.prev_key = Some(key);
2658            }
2659
2660            Some((key, val))
2661        }
2662    }
2663}
2664
2665impl RowArena {
2666    pub fn new() -> Self {
2667        RowArena {
2668            inner: RefCell::new(vec![]),
2669        }
2670    }
2671
2672    /// Creates a `RowArena` with a hint of how many rows will be created in the arena, to avoid
2673    /// reallocations of its internal vector.
2674    pub fn with_capacity(capacity: usize) -> Self {
2675        RowArena {
2676            inner: RefCell::new(Vec::with_capacity(capacity)),
2677        }
2678    }
2679
2680    /// Does a `reserve` on the underlying `Vec`. Call this when you expect `additional` more datums
2681    /// to be created in this arena.
2682    pub fn reserve(&self, additional: usize) {
2683        self.inner.borrow_mut().reserve(additional);
2684    }
2685
2686    /// Take ownership of `bytes` for the lifetime of the arena.
2687    #[allow(clippy::transmute_ptr_to_ptr)]
2688    pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
2689        let mut inner = self.inner.borrow_mut();
2690        inner.push(bytes);
2691        let owned_bytes = &inner[inner.len() - 1];
2692        unsafe {
2693            // This is safe because:
2694            //   * We only ever append to self.inner, so the byte vector
2695            //     will live as long as the arena.
2696            //   * We return a reference to the byte vector's contents, so it's
2697            //     okay if self.inner reallocates and moves the byte
2698            //     vector.
2699            //   * We don't allow access to the byte vector itself, so it will
2700            //     never reallocate.
2701            transmute::<&[u8], &'a [u8]>(owned_bytes)
2702        }
2703    }
2704
2705    /// Take ownership of `string` for the lifetime of the arena.
2706    pub fn push_string<'a>(&'a self, string: String) -> &'a str {
2707        let owned_bytes = self.push_bytes(string.into_bytes());
2708        unsafe {
2709            // This is safe because we know it was a `String` just before.
2710            std::str::from_utf8_unchecked(owned_bytes)
2711        }
2712    }
2713
2714    /// Take ownership of `row` for the lifetime of the arena, returning a
2715    /// reference to the first datum in the row.
2716    ///
2717    /// If we had an owned datum type, this method would be much clearer, and
2718    /// would be called `push_owned_datum`.
2719    pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
2720        let mut inner = self.inner.borrow_mut();
2721        inner.push(row.data.into_vec());
2722        unsafe {
2723            // This is safe because:
2724            //   * We only ever append to self.inner, so the row data will live
2725            //     as long as the arena.
2726            //   * We force the row data into its own heap allocation--
2727            //     importantly, we do NOT store the SmallVec, which might be
2728            //     storing data inline--so it's okay if self.inner reallocates
2729            //     and moves the row.
2730            //   * We don't allow access to the byte vector itself, so it will
2731            //     never reallocate.
2732            let datum = read_datum(&mut &inner[inner.len() - 1][..]);
2733            transmute::<Datum<'_>, Datum<'a>>(datum)
2734        }
2735    }
2736
2737    /// Equivalent to `push_unary_row` but returns a `DatumNested` rather than a
2738    /// `Datum`.
2739    fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
2740        let mut inner = self.inner.borrow_mut();
2741        inner.push(row.data.into_vec());
2742        unsafe {
2743            // This is safe because:
2744            //   * We only ever append to self.inner, so the row data will live
2745            //     as long as the arena.
2746            //   * We force the row data into its own heap allocation--
2747            //     importantly, we do NOT store the SmallVec, which might be
2748            //     storing data inline--so it's okay if self.inner reallocates
2749            //     and moves the row.
2750            //   * We don't allow access to the byte vector itself, so it will
2751            //     never reallocate.
2752            let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
2753            transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
2754        }
2755    }
2756
2757    /// Convenience function to make a new `Row` containing a single datum, and
2758    /// take ownership of it for the lifetime of the arena
2759    ///
2760    /// ```
2761    /// # use mz_repr::{RowArena, Datum};
2762    /// let arena = RowArena::new();
2763    /// let datum = arena.make_datum(|packer| {
2764    ///   packer.push_list(&[Datum::String("hello"), Datum::String("world")]);
2765    /// });
2766    /// assert_eq!(datum.unwrap_list().iter().collect::<Vec<_>>(), vec![Datum::String("hello"), Datum::String("world")]);
2767    /// ```
2768    pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
2769    where
2770        F: FnOnce(&mut RowPacker),
2771    {
2772        let mut row = Row::default();
2773        f(&mut row.packer());
2774        self.push_unary_row(row)
2775    }
2776
2777    /// Convenience function identical to `make_datum` but instead returns a
2778    /// `DatumNested`.
2779    pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
2780    where
2781        F: FnOnce(&mut RowPacker),
2782    {
2783        let mut row = Row::default();
2784        f(&mut row.packer());
2785        self.push_unary_row_datum_nested(row)
2786    }
2787
2788    /// Like [`RowArena::make_datum`], but the provided closure can return an error.
2789    pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
2790    where
2791        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2792    {
2793        let mut row = Row::default();
2794        f(&mut row.packer())?;
2795        Ok(self.push_unary_row(row))
2796    }
2797
2798    /// Clear the contents of the arena.
2799    pub fn clear(&mut self) {
2800        self.inner.borrow_mut().clear();
2801    }
2802}
2803
2804impl Default for RowArena {
2805    fn default() -> RowArena {
2806        RowArena::new()
2807    }
2808}
2809
2810/// A thread-local row, which can be borrowed and returned.
2811/// # Example
2812///
2813/// Use this type instead of creating a new row:
2814/// ```
2815/// use mz_repr::SharedRow;
2816///
2817/// let binding = SharedRow::get();
2818/// let mut row_builder = binding.borrow_mut();
2819/// ```
2820///
2821/// This allows us to reuse an existing row allocation instead of creating a new one or retaining
2822/// an allocation locally. Additionally, we can observe the size of the local row in a central
2823/// place and potentially reallocate to reduce memory needs.
2824///
2825/// # Panic
2826///
2827/// [`SharedRow::get`] panics when trying to obtain multiple references to the shared row.
2828#[derive(Debug)]
2829pub struct SharedRow(Rc<RefCell<Row>>);
2830
2831impl SharedRow {
2832    thread_local! {
2833        static SHARED_ROW: Rc<RefCell<Row>> = Rc::new(RefCell::new(Row::default()));
2834    }
2835
2836    /// Get the shared row.
2837    ///
2838    /// The row's contents are cleared before returning it.
2839    ///
2840    /// # Panic
2841    ///
2842    /// Panics when the row is already borrowed elsewhere.
2843    pub fn get() -> Self {
2844        let row = Self::SHARED_ROW.with(Rc::clone);
2845        // Clear row
2846        row.borrow_mut().packer();
2847        Self(row)
2848    }
2849
2850    /// Gets the shared row and uses it to pack `iter`.
2851    pub fn pack<'a, I, D>(iter: I) -> Row
2852    where
2853        I: IntoIterator<Item = D>,
2854        D: Borrow<Datum<'a>>,
2855    {
2856        let binding = Self::SHARED_ROW.with(Rc::clone);
2857        let mut row_builder = binding.borrow_mut();
2858        let mut row_packer = row_builder.packer();
2859        row_packer.extend(iter);
2860        row_builder.clone()
2861    }
2862
2863    /// Calls the provided closure with a [`RowPacker`] writing the shared row.
2864    ///
2865    /// # Panics
2866    ///
2867    /// Panics when the row is already borrowed elsewhere.
2868    pub fn pack_with<F, R>(&mut self, f: F) -> R
2869    where
2870        for<'a> F: FnOnce(&'a mut RowPacker<'a>) -> R,
2871    {
2872        let mut borrow = self.borrow_mut();
2873        let mut packer = borrow.packer();
2874        (f)(&mut packer)
2875    }
2876}
2877
2878impl std::ops::Deref for SharedRow {
2879    type Target = RefCell<Row>;
2880
2881    fn deref(&self) -> &Self::Target {
2882        &self.0
2883    }
2884}
2885
2886#[cfg(test)]
2887mod tests {
2888    use chrono::{DateTime, NaiveDate};
2889    use mz_ore::{assert_err, assert_none};
2890
2891    use crate::ScalarType;
2892
2893    use super::*;
2894
2895    #[mz_ore::test]
2896    fn test_assumptions() {
2897        assert_eq!(size_of::<Tag>(), 1);
2898        #[cfg(target_endian = "big")]
2899        {
2900            // if you want to run this on a big-endian cpu, we'll need big-endian versions of the serialization code
2901            assert!(false);
2902        }
2903    }
2904
2905    #[mz_ore::test]
2906    fn miri_test_arena() {
2907        let arena = RowArena::new();
2908
2909        assert_eq!(arena.push_string("".to_owned()), "");
2910        assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
2911
2912        let empty: &[u8] = &[];
2913        assert_eq!(arena.push_bytes(vec![]), empty);
2914        assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
2915
2916        let mut row = Row::default();
2917        let mut packer = row.packer();
2918        packer.push_dict_with(|row| {
2919            row.push(Datum::String("a"));
2920            row.push_list_with(|row| {
2921                row.push(Datum::String("one"));
2922                row.push(Datum::String("two"));
2923                row.push(Datum::String("three"));
2924            });
2925            row.push(Datum::String("b"));
2926            row.push(Datum::String("c"));
2927        });
2928        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
2929    }
2930
2931    #[mz_ore::test]
2932    fn miri_test_round_trip() {
2933        fn round_trip(datums: Vec<Datum>) {
2934            let row = Row::pack(datums.clone());
2935
2936            // When run under miri this catches undefined bytes written to data
2937            // eg by calling push_copy! on a type which contains undefined padding values
2938            println!("{:?}", row.data());
2939
2940            let datums2 = row.iter().collect::<Vec<_>>();
2941            let datums3 = row.unpack();
2942            assert_eq!(datums, datums2);
2943            assert_eq!(datums, datums3);
2944        }
2945
2946        round_trip(vec![]);
2947        round_trip(
2948            ScalarType::enumerate()
2949                .iter()
2950                .flat_map(|r#type| r#type.interesting_datums())
2951                .collect(),
2952        );
2953        round_trip(vec![
2954            Datum::Null,
2955            Datum::Null,
2956            Datum::False,
2957            Datum::True,
2958            Datum::Int16(-21),
2959            Datum::Int32(-42),
2960            Datum::Int64(-2_147_483_648 - 42),
2961            Datum::UInt8(0),
2962            Datum::UInt8(1),
2963            Datum::UInt16(0),
2964            Datum::UInt16(1),
2965            Datum::UInt16(1 << 8),
2966            Datum::UInt32(0),
2967            Datum::UInt32(1),
2968            Datum::UInt32(1 << 8),
2969            Datum::UInt32(1 << 16),
2970            Datum::UInt32(1 << 24),
2971            Datum::UInt64(0),
2972            Datum::UInt64(1),
2973            Datum::UInt64(1 << 8),
2974            Datum::UInt64(1 << 16),
2975            Datum::UInt64(1 << 24),
2976            Datum::UInt64(1 << 32),
2977            Datum::UInt64(1 << 40),
2978            Datum::UInt64(1 << 48),
2979            Datum::UInt64(1 << 56),
2980            Datum::Float32(OrderedFloat::from(-42.12)),
2981            Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
2982            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
2983            Datum::Timestamp(
2984                CheckedTimestamp::from_timestamplike(
2985                    NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
2986                        .unwrap()
2987                        .and_hms_opt(14, 32, 11)
2988                        .unwrap(),
2989                )
2990                .unwrap(),
2991            ),
2992            Datum::TimestampTz(
2993                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
2994                    .unwrap(),
2995            ),
2996            Datum::Interval(Interval {
2997                months: 312,
2998                ..Default::default()
2999            }),
3000            Datum::Interval(Interval::new(0, 0, 1_012_312)),
3001            Datum::Bytes(&[]),
3002            Datum::Bytes(&[0, 2, 1, 255]),
3003            Datum::String(""),
3004            Datum::String("العَرَبِيَّة"),
3005        ]);
3006    }
3007
3008    #[mz_ore::test]
3009    fn test_array() {
3010        // Construct an array using `Row::push_array` and verify that it unpacks
3011        // correctly.
3012        const DIM: ArrayDimension = ArrayDimension {
3013            lower_bound: 2,
3014            length: 2,
3015        };
3016        let mut row = Row::default();
3017        let mut packer = row.packer();
3018        packer
3019            .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3020            .unwrap();
3021        let arr1 = row.unpack_first().unwrap_array();
3022        assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3023        assert_eq!(
3024            arr1.elements().into_iter().collect::<Vec<_>>(),
3025            vec![Datum::Int32(1), Datum::Int32(2)]
3026        );
3027
3028        // Pack a previously-constructed `Datum::Array` and verify that it
3029        // unpacks correctly.
3030        let row = Row::pack_slice(&[Datum::Array(arr1)]);
3031        let arr2 = row.unpack_first().unwrap_array();
3032        assert_eq!(arr1, arr2);
3033    }
3034
3035    #[mz_ore::test]
3036    fn test_multidimensional_array() {
3037        let datums = vec![
3038            Datum::Int32(1),
3039            Datum::Int32(2),
3040            Datum::Int32(3),
3041            Datum::Int32(4),
3042            Datum::Int32(5),
3043            Datum::Int32(6),
3044            Datum::Int32(7),
3045            Datum::Int32(8),
3046        ];
3047
3048        let mut row = Row::default();
3049        let mut packer = row.packer();
3050        packer
3051            .try_push_array(
3052                &[
3053                    ArrayDimension {
3054                        lower_bound: 1,
3055                        length: 1,
3056                    },
3057                    ArrayDimension {
3058                        lower_bound: 1,
3059                        length: 4,
3060                    },
3061                    ArrayDimension {
3062                        lower_bound: 1,
3063                        length: 2,
3064                    },
3065                ],
3066                &datums,
3067            )
3068            .unwrap();
3069        let array = row.unpack_first().unwrap_array();
3070        assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3071    }
3072
3073    #[mz_ore::test]
3074    fn test_array_max_dimensions() {
3075        let mut row = Row::default();
3076        let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3077
3078        // An array with one too many dimensions should be rejected.
3079        let res = row.packer().try_push_array(
3080            &vec![
3081                ArrayDimension {
3082                    lower_bound: 1,
3083                    length: 1
3084                };
3085                max_dims + 1
3086            ],
3087            vec![Datum::Int32(4)],
3088        );
3089        assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3090        assert!(row.data.is_empty());
3091
3092        // An array with exactly the maximum allowable dimensions should be
3093        // accepted.
3094        row.packer()
3095            .try_push_array(
3096                &vec![
3097                    ArrayDimension {
3098                        lower_bound: 1,
3099                        length: 1
3100                    };
3101                    max_dims
3102                ],
3103                vec![Datum::Int32(4)],
3104            )
3105            .unwrap();
3106    }
3107
3108    #[mz_ore::test]
3109    fn test_array_wrong_cardinality() {
3110        let mut row = Row::default();
3111        let res = row.packer().try_push_array(
3112            &[
3113                ArrayDimension {
3114                    lower_bound: 1,
3115                    length: 2,
3116                },
3117                ArrayDimension {
3118                    lower_bound: 1,
3119                    length: 3,
3120                },
3121            ],
3122            vec![Datum::Int32(1), Datum::Int32(2)],
3123        );
3124        assert_eq!(
3125            res,
3126            Err(InvalidArrayError::WrongCardinality {
3127                actual: 2,
3128                expected: 6,
3129            })
3130        );
3131        assert!(row.data.is_empty());
3132    }
3133
3134    #[mz_ore::test]
3135    fn test_nesting() {
3136        let mut row = Row::default();
3137        row.packer().push_dict_with(|row| {
3138            row.push(Datum::String("favourites"));
3139            row.push_list_with(|row| {
3140                row.push(Datum::String("ice cream"));
3141                row.push(Datum::String("oreos"));
3142                row.push(Datum::String("cheesecake"));
3143            });
3144            row.push(Datum::String("name"));
3145            row.push(Datum::String("bob"));
3146        });
3147
3148        let mut iter = row.unpack_first().unwrap_map().iter();
3149
3150        let (k, v) = iter.next().unwrap();
3151        assert_eq!(k, "favourites");
3152        assert_eq!(
3153            v.unwrap_list().iter().collect::<Vec<_>>(),
3154            vec![
3155                Datum::String("ice cream"),
3156                Datum::String("oreos"),
3157                Datum::String("cheesecake"),
3158            ]
3159        );
3160
3161        let (k, v) = iter.next().unwrap();
3162        assert_eq!(k, "name");
3163        assert_eq!(v, Datum::String("bob"));
3164    }
3165
3166    #[mz_ore::test]
3167    fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3168        let pack = |ok| {
3169            let mut row = Row::default();
3170            row.packer().push_dict_with(|row| {
3171                if ok {
3172                    row.push(Datum::String("key"));
3173                    row.push(Datum::Int32(42));
3174                    Ok(7)
3175                } else {
3176                    Err("fail")
3177                }
3178            })?;
3179            Ok(row)
3180        };
3181
3182        assert_eq!(pack(false), Err("fail"));
3183
3184        let row = pack(true)?;
3185        let mut dict = row.unpack_first().unwrap_map().iter();
3186        assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3187        assert_eq!(dict.next(), None);
3188
3189        Ok(())
3190    }
3191
3192    #[mz_ore::test]
3193    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
3194    fn test_datum_sizes() {
3195        let arena = RowArena::new();
3196
3197        // Test the claims about various datum sizes.
3198        let values_of_interest = vec![
3199            Datum::Null,
3200            Datum::False,
3201            Datum::Int16(0),
3202            Datum::Int32(0),
3203            Datum::Int64(0),
3204            Datum::UInt8(0),
3205            Datum::UInt8(1),
3206            Datum::UInt16(0),
3207            Datum::UInt16(1),
3208            Datum::UInt16(1 << 8),
3209            Datum::UInt32(0),
3210            Datum::UInt32(1),
3211            Datum::UInt32(1 << 8),
3212            Datum::UInt32(1 << 16),
3213            Datum::UInt32(1 << 24),
3214            Datum::UInt64(0),
3215            Datum::UInt64(1),
3216            Datum::UInt64(1 << 8),
3217            Datum::UInt64(1 << 16),
3218            Datum::UInt64(1 << 24),
3219            Datum::UInt64(1 << 32),
3220            Datum::UInt64(1 << 40),
3221            Datum::UInt64(1 << 48),
3222            Datum::UInt64(1 << 56),
3223            Datum::Float32(OrderedFloat(0.0)),
3224            Datum::Float64(OrderedFloat(0.0)),
3225            Datum::from(numeric::Numeric::from(0)),
3226            Datum::from(numeric::Numeric::from(1000)),
3227            Datum::from(numeric::Numeric::from(9999)),
3228            Datum::Date(
3229                NaiveDate::from_ymd_opt(1, 1, 1)
3230                    .unwrap()
3231                    .try_into()
3232                    .unwrap(),
3233            ),
3234            Datum::Timestamp(
3235                CheckedTimestamp::from_timestamplike(
3236                    DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3237                )
3238                .unwrap(),
3239            ),
3240            Datum::TimestampTz(
3241                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3242                    .unwrap(),
3243            ),
3244            Datum::Interval(Interval::default()),
3245            Datum::Bytes(&[]),
3246            Datum::String(""),
3247            Datum::JsonNull,
3248            Datum::Range(Range { inner: None }),
3249            arena.make_datum(|packer| {
3250                packer
3251                    .push_range(Range::new(Some((
3252                        RangeLowerBound::new(Datum::Int32(-1), true),
3253                        RangeUpperBound::new(Datum::Int32(1), true),
3254                    ))))
3255                    .unwrap();
3256            }),
3257        ];
3258        for value in values_of_interest {
3259            if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3260                panic!("Disparity in claimed size for {:?}", value);
3261            }
3262        }
3263    }
3264
3265    #[mz_ore::test]
3266    fn test_range_errors() {
3267        fn test_range_errors_inner<'a>(
3268            datums: Vec<Vec<Datum<'a>>>,
3269        ) -> Result<(), InvalidRangeError> {
3270            let mut row = Row::default();
3271            let row_len = row.byte_len();
3272            let mut packer = row.packer();
3273            let r = packer.push_range_with(
3274                RangeLowerBound {
3275                    inclusive: true,
3276                    bound: Some(|row: &mut RowPacker| {
3277                        for d in &datums[0] {
3278                            row.push(d);
3279                        }
3280                        Ok(())
3281                    }),
3282                },
3283                RangeUpperBound {
3284                    inclusive: true,
3285                    bound: Some(|row: &mut RowPacker| {
3286                        for d in &datums[1] {
3287                            row.push(d);
3288                        }
3289                        Ok(())
3290                    }),
3291                },
3292            );
3293
3294            assert_eq!(row_len, row.byte_len());
3295
3296            r
3297        }
3298
3299        for panicking_case in [
3300            vec![vec![Datum::Int32(1)], vec![]],
3301            vec![
3302                vec![Datum::Int32(1), Datum::Int32(2)],
3303                vec![Datum::Int32(3)],
3304            ],
3305            vec![
3306                vec![Datum::Int32(1)],
3307                vec![Datum::Int32(2), Datum::Int32(3)],
3308            ],
3309            vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3310            vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3311            vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3312            vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3313        ] {
3314            #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
3315            let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3316            assert_err!(result);
3317        }
3318
3319        let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3320        assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3321    }
3322
3323    /// Lists have a variable-length encoding for their lengths. We test each case here.
3324    #[mz_ore::test]
3325    #[cfg_attr(miri, ignore)] // slow
3326    fn test_list_encoding() {
3327        fn test_list_encoding_inner(len: usize) {
3328            let list_elem = |i: usize| {
3329                if i % 2 == 0 {
3330                    Datum::False
3331                } else {
3332                    Datum::True
3333                }
3334            };
3335            let mut row = Row::default();
3336            {
3337                // Push some stuff.
3338                let mut packer = row.packer();
3339                packer.push(Datum::String("start"));
3340                packer.push_list_with(|packer| {
3341                    for i in 0..len {
3342                        packer.push(list_elem(i));
3343                    }
3344                });
3345                packer.push(Datum::String("end"));
3346            }
3347            // Check that we read back exactly what we pushed.
3348            let mut row_it = row.iter();
3349            assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3350            match row_it.next().unwrap() {
3351                Datum::List(list) => {
3352                    let mut list_it = list.iter();
3353                    for i in 0..len {
3354                        assert_eq!(list_it.next().unwrap(), list_elem(i));
3355                    }
3356                    assert_none!(list_it.next());
3357                }
3358                _ => panic!("expected Datum::List"),
3359            }
3360            assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3361            assert_none!(row_it.next());
3362        }
3363
3364        test_list_encoding_inner(0);
3365        test_list_encoding_inner(1);
3366        test_list_encoding_inner(10);
3367        test_list_encoding_inner(TINY - 1); // tiny
3368        test_list_encoding_inner(TINY + 1); // short
3369        test_list_encoding_inner(SHORT + 1); // long
3370
3371        // The biggest one takes 40 s on my laptop, probably not worth it.
3372        //test_list_encoding_inner(LONG + 1); // huge
3373    }
3374}