mz_repr/
row.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::cell::{Cell, RefCell};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::mem::{size_of, transmute};
16use std::ops::Deref;
17use std::str;
18
19use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
20use compact_bytes::CompactBytes;
21use mz_ore::cast::{CastFrom, ReinterpretCast};
22use mz_ore::soft_assert_no_log;
23use mz_ore::vec::Vector;
24use mz_persist_types::Codec64;
25use num_enum::{IntoPrimitive, TryFromPrimitive};
26use ordered_float::OrderedFloat;
27use proptest::prelude::*;
28use proptest::strategy::{BoxedStrategy, Strategy};
29use serde::{Deserialize, Serialize};
30use uuid::Uuid;
31
32use crate::adt::array::{
33    Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
34};
35use crate::adt::date::Date;
36use crate::adt::interval::Interval;
37use crate::adt::mz_acl_item::{AclItem, MzAclItem};
38use crate::adt::numeric;
39use crate::adt::numeric::Numeric;
40use crate::adt::range::{
41    self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
42};
43use crate::adt::timestamp::CheckedTimestamp;
44use crate::scalar::{DatumKind, arb_datum};
45use crate::{Datum, RelationDesc, Timestamp};
46
47pub(crate) mod encode;
48pub mod iter;
49
50include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
51
52/// A packed representation for `Datum`s.
53///
54/// `Datum` is easy to work with but very space inefficient. A `Datum::Int32(42)`
55/// is laid out in memory like this:
56///
57///   tag: 3
58///   padding: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
59///   data: 0 0 0 42
60///   padding: 0 0 0 0 0 0 0 0 0 0 0 0
61///
62/// For a total of 32 bytes! The second set of padding is needed in case we were
63/// to write a 16-byte datum into this location. The first set of padding is
64/// needed to align that hypothetical decimal to a 16 bytes boundary.
65///
66/// A `Row` stores zero or more `Datum`s without any padding. We avoid the need
67/// for the first set of padding by only providing access to the `Datum`s via
68/// calls to `ptr::read_unaligned`, which on modern x86 is barely penalized. We
69/// avoid the need for the second set of padding by not providing mutable access
70/// to the `Datum`. Instead, `Row` is append-only.
71///
72/// A `Row` can be built from a collection of `Datum`s using `Row::pack`, but it
73/// is more efficient to use `Row::pack_slice` so that a right-sized allocation
74/// can be created. If that is not possible, consider using the row buffer
75/// pattern: allocate one row, pack into it, and then call [`Row::clone`] to
76/// receive a copy of that row, leaving behind the original allocation to pack
77/// future rows.
78///
79/// Creating a row via [`Row::pack_slice`]:
80///
81/// ```
82/// # use mz_repr::{Row, Datum};
83/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
84/// assert_eq!(row.unpack(), vec![Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)])
85/// ```
86///
87/// `Row`s can be unpacked by iterating over them:
88///
89/// ```
90/// # use mz_repr::{Row, Datum};
91/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
92/// assert_eq!(row.iter().nth(1).unwrap(), Datum::Int32(1));
93/// ```
94///
95/// If you want random access to the `Datum`s in a `Row`, use `Row::unpack` to create a `Vec<Datum>`
96/// ```
97/// # use mz_repr::{Row, Datum};
98/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
99/// let datums = row.unpack();
100/// assert_eq!(datums[1], Datum::Int32(1));
101/// ```
102///
103/// # Performance
104///
105/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
106/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
107/// avoids the allocations involved in `Row::new()`.
108#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
109pub struct Row {
110    data: CompactBytes,
111}
112
113impl Row {
114    const SIZE: usize = CompactBytes::MAX_INLINE;
115
116    /// A variant of `Row::from_proto` that allows for reuse of internal allocs
117    /// and validates the decoding against a provided [`RelationDesc`].
118    pub fn decode_from_proto(
119        &mut self,
120        proto: &ProtoRow,
121        desc: &RelationDesc,
122    ) -> Result<(), String> {
123        let mut packer = self.packer();
124        for (col_idx, _, _) in desc.iter_all() {
125            let d = match proto.datums.get(col_idx.to_raw()) {
126                Some(x) => x,
127                None => {
128                    packer.push(Datum::Null);
129                    continue;
130                }
131            };
132            packer.try_push_proto(d)?;
133        }
134
135        Ok(())
136    }
137
138    /// Allocate an empty `Row` with a pre-allocated capacity.
139    #[inline]
140    pub fn with_capacity(cap: usize) -> Self {
141        Self {
142            data: CompactBytes::with_capacity(cap),
143        }
144    }
145
146    /// Create an empty `Row`.
147    #[inline]
148    pub const fn empty() -> Self {
149        Self {
150            data: CompactBytes::empty(),
151        }
152    }
153
154    /// Creates a new row from supplied bytes.
155    ///
156    /// # Safety
157    ///
158    /// This method relies on `data` being an appropriate row encoding, and can
159    /// result in unsafety if this is not the case.
160    pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
161        Row {
162            data: CompactBytes::new(data),
163        }
164    }
165
166    /// Constructs a [`RowPacker`] that will pack datums into this row's
167    /// allocation.
168    ///
169    /// This method clears the existing contents of the row, but retains the
170    /// allocation.
171    pub fn packer(&mut self) -> RowPacker<'_> {
172        self.clear();
173        RowPacker { row: self }
174    }
175
176    /// Take some `Datum`s and pack them into a `Row`.
177    ///
178    /// This method builds a `Row` by repeatedly increasing the backing
179    /// allocation. If the contents of the iterator are known ahead of
180    /// time, consider [`Row::with_capacity`] to right-size the allocation
181    /// first, and then [`RowPacker::extend`] to populate it with `Datum`s.
182    /// This avoids the repeated allocation resizing and copying.
183    pub fn pack<'a, I, D>(iter: I) -> Row
184    where
185        I: IntoIterator<Item = D>,
186        D: Borrow<Datum<'a>>,
187    {
188        let mut row = Row::default();
189        row.packer().extend(iter);
190        row
191    }
192
193    /// Use `self` to pack `iter`, and then clone the result.
194    ///
195    /// This is a convenience method meant to reduce boilerplate around row
196    /// formation.
197    pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
198    where
199        I: IntoIterator<Item = D>,
200        D: Borrow<Datum<'a>>,
201    {
202        self.packer().extend(iter);
203        self.clone()
204    }
205
206    /// Like [`Row::pack`], but the provided iterator is allowed to produce an
207    /// error, in which case the packing operation is aborted and the error
208    /// returned.
209    pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
210    where
211        I: IntoIterator<Item = Result<D, E>>,
212        D: Borrow<Datum<'a>>,
213    {
214        let mut row = Row::default();
215        row.packer().try_extend(iter)?;
216        Ok(row)
217    }
218
219    /// Pack a slice of `Datum`s into a `Row`.
220    ///
221    /// This method has the advantage over `pack` that it can determine the required
222    /// allocation before packing the elements, ensuring only one allocation and no
223    /// redundant copies required.
224    pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
225        // Pre-allocate the needed number of bytes.
226        let mut row = Row::with_capacity(datums_size(slice.iter()));
227        row.packer().extend(slice.iter());
228        row
229    }
230
231    /// Returns the total amount of bytes used by this row.
232    pub fn byte_len(&self) -> usize {
233        let heap_size = if self.data.spilled() {
234            self.data.len()
235        } else {
236            0
237        };
238        let inline_size = std::mem::size_of::<Self>();
239        inline_size.saturating_add(heap_size)
240    }
241
242    /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself.
243    pub fn data_len(&self) -> usize {
244        self.data.len()
245    }
246
247    /// Returns the total capacity in bytes used by this row.
248    pub fn byte_capacity(&self) -> usize {
249        self.data.capacity()
250    }
251
252    /// Extracts a Row slice containing the entire [`Row`].
253    #[inline]
254    pub fn as_row_ref(&self) -> &RowRef {
255        RowRef::from_slice(self.data.as_slice())
256    }
257
258    /// Clear the contents of the [`Row`], leaving any allocation in place.
259    #[inline]
260    fn clear(&mut self) {
261        self.data.clear();
262    }
263}
264
265impl Borrow<RowRef> for Row {
266    #[inline]
267    fn borrow(&self) -> &RowRef {
268        self.as_row_ref()
269    }
270}
271
272impl AsRef<RowRef> for Row {
273    #[inline]
274    fn as_ref(&self) -> &RowRef {
275        self.as_row_ref()
276    }
277}
278
279impl Deref for Row {
280    type Target = RowRef;
281
282    #[inline]
283    fn deref(&self) -> &Self::Target {
284        self.as_row_ref()
285    }
286}
287
288// Nothing depends on Row being exactly 24, we just want to add visibility to the size.
289static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
290
291impl Clone for Row {
292    fn clone(&self) -> Self {
293        Row {
294            data: self.data.clone(),
295        }
296    }
297
298    fn clone_from(&mut self, source: &Self) {
299        self.data.clone_from(&source.data);
300    }
301}
302
303// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
304impl std::hash::Hash for Row {
305    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
306        self.as_row_ref().hash(state)
307    }
308}
309
310impl Arbitrary for Row {
311    type Parameters = prop::collection::SizeRange;
312    type Strategy = BoxedStrategy<Row>;
313
314    fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
315        prop::collection::vec(arb_datum(), size)
316            .prop_map(|items| {
317                let mut row = Row::default();
318                let mut packer = row.packer();
319                for item in items.iter() {
320                    let datum: Datum<'_> = item.into();
321                    packer.push(datum);
322                }
323                row
324            })
325            .boxed()
326    }
327}
328
329impl PartialOrd for Row {
330    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
331        Some(self.cmp(other))
332    }
333}
334
335impl Ord for Row {
336    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
337        self.as_ref().cmp(other.as_ref())
338    }
339}
340
341#[allow(missing_debug_implementations)]
342mod columnation {
343    use columnation::{Columnation, Region};
344    use mz_ore::region::LgAllocRegion;
345
346    use crate::Row;
347
348    /// Region allocation for `Row` data.
349    ///
350    /// Content bytes are stored in stable contiguous memory locations,
351    /// and then a `Row` referencing them is falsified.
352    pub struct RowStack {
353        region: LgAllocRegion<u8>,
354    }
355
356    impl RowStack {
357        const LIMIT: usize = 2 << 20;
358    }
359
360    // Implement `Default` manually to specify a region allocation limit.
361    impl Default for RowStack {
362        fn default() -> Self {
363            Self {
364                // Limit the region size to 2MiB.
365                region: LgAllocRegion::with_limit(Self::LIMIT),
366            }
367        }
368    }
369
370    impl Columnation for Row {
371        type InnerRegion = RowStack;
372    }
373
374    impl Region for RowStack {
375        type Item = Row;
376        #[inline]
377        fn clear(&mut self) {
378            self.region.clear();
379        }
380        #[inline(always)]
381        unsafe fn copy(&mut self, item: &Row) -> Row {
382            if item.data.spilled() {
383                let bytes = self.region.copy_slice(&item.data[..]);
384                Row {
385                    data: compact_bytes::CompactBytes::from_raw_parts(
386                        bytes.as_mut_ptr(),
387                        item.data.len(),
388                        item.data.capacity(),
389                    ),
390                }
391            } else {
392                item.clone()
393            }
394        }
395
396        fn reserve_items<'a, I>(&mut self, items: I)
397        where
398            Self: 'a,
399            I: Iterator<Item = &'a Self::Item> + Clone,
400        {
401            let size = items
402                .filter(|row| row.data.spilled())
403                .map(|row| row.data.len())
404                .sum();
405            let size = std::cmp::min(size, Self::LIMIT);
406            self.region.reserve(size);
407        }
408
409        fn reserve_regions<'a, I>(&mut self, regions: I)
410        where
411            Self: 'a,
412            I: Iterator<Item = &'a Self> + Clone,
413        {
414            let size = regions.map(|r| r.region.len()).sum();
415            let size = std::cmp::min(size, Self::LIMIT);
416            self.region.reserve(size);
417        }
418
419        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
420            self.region.heap_size(callback)
421        }
422    }
423}
424
425mod columnar {
426    use columnar::common::PushIndexAs;
427    use columnar::{
428        AsBytes, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
429    };
430    use mz_ore::cast::CastFrom;
431
432    use crate::{Row, RowRef};
433
434    #[derive(Copy, Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
435    pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
436        /// Bounds container; provides indexed access to offsets.
437        pub bounds: BC,
438        /// Values container; provides slice access to bytes.
439        pub values: VC,
440    }
441
442    impl Columnar for Row {
443        #[inline(always)]
444        fn copy_from(&mut self, other: columnar::Ref<'_, Self>) {
445            self.clear();
446            self.data.extend_from_slice(other.data());
447        }
448        #[inline(always)]
449        fn into_owned(other: columnar::Ref<'_, Self>) -> Self {
450            other.to_owned()
451        }
452        type Container = Rows;
453        #[inline(always)]
454        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
455        where
456            Self: 'a,
457        {
458            thing
459        }
460    }
461
462    impl<BC: PushIndexAs<u64>> Container for Rows<BC, Vec<u8>> {
463        type Ref<'a> = &'a RowRef;
464        type Borrowed<'a>
465            = Rows<BC::Borrowed<'a>, &'a [u8]>
466        where
467            Self: 'a;
468        #[inline(always)]
469        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
470            Rows {
471                bounds: self.bounds.borrow(),
472                values: self.values.borrow(),
473            }
474        }
475        #[inline(always)]
476        fn reborrow<'c, 'a: 'c>(item: Self::Borrowed<'a>) -> Self::Borrowed<'c>
477        where
478            Self: 'a,
479        {
480            Rows {
481                bounds: BC::reborrow(item.bounds),
482                values: item.values,
483            }
484        }
485
486        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
487        where
488            Self: 'a,
489        {
490            item
491        }
492
493        fn reserve_for<'a, I>(&mut self, selves: I)
494        where
495            Self: 'a,
496            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
497        {
498            self.bounds.reserve_for(selves.clone().map(|r| r.bounds));
499            self.values.reserve_for(selves.map(|r| r.values));
500        }
501    }
502
503    impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
504        #[inline(always)]
505        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
506            columnar::chain(self.bounds.as_bytes(), self.values.as_bytes())
507        }
508    }
509    impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
510        #[inline(always)]
511        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
512            Self {
513                bounds: FromBytes::from_bytes(bytes),
514                values: FromBytes::from_bytes(bytes),
515            }
516        }
517    }
518
519    impl<BC: Len, VC> Len for Rows<BC, VC> {
520        #[inline(always)]
521        fn len(&self) -> usize {
522            self.bounds.len()
523        }
524    }
525
526    impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
527        type Ref = &'a RowRef;
528        #[inline(always)]
529        fn get(&self, index: usize) -> Self::Ref {
530            let lower = if index == 0 {
531                0
532            } else {
533                self.bounds.index_as(index - 1)
534            };
535            let upper = self.bounds.index_as(index);
536            let lower = usize::cast_from(lower);
537            let upper = usize::cast_from(upper);
538            RowRef::from_slice(&self.values[lower..upper])
539        }
540    }
541    impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
542        type Ref = &'a RowRef;
543        #[inline(always)]
544        fn get(&self, index: usize) -> Self::Ref {
545            let lower = if index == 0 {
546                0
547            } else {
548                self.bounds.index_as(index - 1)
549            };
550            let upper = self.bounds.index_as(index);
551            let lower = usize::cast_from(lower);
552            let upper = usize::cast_from(upper);
553            RowRef::from_slice(&self.values[lower..upper])
554        }
555    }
556
557    impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
558        #[inline(always)]
559        fn push(&mut self, item: &Row) {
560            self.values.extend_from_slice(item.data.as_slice());
561            self.bounds.push(u64::cast_from(self.values.len()));
562        }
563    }
564    impl<BC: for<'a> Push<&'a u64>> Push<&RowRef> for Rows<BC> {
565        #[inline(always)]
566        fn push(&mut self, item: &RowRef) {
567            self.values.extend_from_slice(item.data());
568            self.bounds.push(&u64::cast_from(self.values.len()));
569        }
570    }
571    impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
572        #[inline(always)]
573        fn clear(&mut self) {
574            self.bounds.clear();
575            self.values.clear();
576        }
577    }
578    impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
579        #[inline(always)]
580        fn heap_size(&self) -> (usize, usize) {
581            let (l0, c0) = self.bounds.heap_size();
582            let (l1, c1) = self.values.heap_size();
583            (l0 + l1, c0 + c1)
584        }
585    }
586}
587
588/// A contiguous slice of bytes that are row data.
589///
590/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
591#[derive(PartialEq, Eq, Hash)]
592#[repr(transparent)]
593pub struct RowRef([u8]);
594
595impl RowRef {
596    /// Create a [`RowRef`] from a slice of data.
597    ///
598    /// We do not check that the provided slice is valid [`Row`] data, will panic on read
599    /// if the data is invalid.
600    pub fn from_slice(row: &[u8]) -> &RowRef {
601        #[allow(clippy::as_conversions)]
602        let ptr = row as *const [u8] as *const RowRef;
603        // SAFETY: We know `ptr` is non-null and aligned because it came from a &[u8].
604        unsafe { &*ptr }
605    }
606
607    /// Unpack `self` into a `Vec<Datum>` for efficient random access.
608    pub fn unpack(&self) -> Vec<Datum<'_>> {
609        // It's usually cheaper to unpack twice to figure out the right length than it is to grow the vec as we go
610        let len = self.iter().count();
611        let mut vec = Vec::with_capacity(len);
612        vec.extend(self.iter());
613        vec
614    }
615
616    /// Return the first [`Datum`] in `self`
617    ///
618    /// Panics if the [`RowRef`] is empty.
619    pub fn unpack_first(&self) -> Datum<'_> {
620        self.iter().next().unwrap()
621    }
622
623    /// Iterate the [`Datum`] elements of the [`RowRef`].
624    pub fn iter(&self) -> DatumListIter<'_> {
625        DatumListIter { data: &self.0 }
626    }
627
628    /// Return the byte length of this [`RowRef`].
629    pub fn byte_len(&self) -> usize {
630        self.0.len()
631    }
632
633    /// For debugging only.
634    pub fn data(&self) -> &[u8] {
635        &self.0
636    }
637
638    /// True iff there is no data in this [`RowRef`].
639    pub fn is_empty(&self) -> bool {
640        self.0.is_empty()
641    }
642}
643
644impl ToOwned for RowRef {
645    type Owned = Row;
646
647    fn to_owned(&self) -> Self::Owned {
648        // SAFETY: RowRef has the invariant that the wrapped data must be a valid Row encoding.
649        unsafe { Row::from_bytes_unchecked(&self.0) }
650    }
651}
652
653impl<'a> IntoIterator for &'a RowRef {
654    type Item = Datum<'a>;
655    type IntoIter = DatumListIter<'a>;
656
657    fn into_iter(self) -> DatumListIter<'a> {
658        DatumListIter { data: &self.0 }
659    }
660}
661
662/// These implementations order first by length, and then by slice contents.
663/// This allows many comparisons to complete without dereferencing memory.
664/// Warning: These order by the u8 array representation, and NOT by Datum::cmp.
665impl PartialOrd for RowRef {
666    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
667        Some(self.cmp(other))
668    }
669}
670
671impl Ord for RowRef {
672    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
673        match self.0.len().cmp(&other.0.len()) {
674            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
675            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
676            std::cmp::Ordering::Equal => self.0.cmp(&other.0),
677        }
678    }
679}
680
681impl fmt::Debug for RowRef {
682    /// Debug representation using the internal datums
683    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
684        f.write_str("RowRef{")?;
685        f.debug_list().entries(self.into_iter()).finish()?;
686        f.write_str("}")
687    }
688}
689
690/// Packs datums into a [`Row`].
691///
692/// Creating a `RowPacker` via [`Row::packer`] starts a packing operation on the
693/// row. A packing operation always starts from scratch: the existing contents
694/// of the underlying row are cleared.
695///
696/// To complete a packing operation, drop the `RowPacker`.
697#[derive(Debug)]
698pub struct RowPacker<'a> {
699    row: &'a mut Row,
700}
701
702#[derive(Debug, Clone)]
703pub struct DatumListIter<'a> {
704    data: &'a [u8],
705}
706
707#[derive(Debug, Clone)]
708pub struct DatumDictIter<'a> {
709    data: &'a [u8],
710    prev_key: Option<&'a str>,
711}
712
713/// `RowArena` is used to hold on to temporary `Row`s for functions like `eval` that need to create complex `Datum`s but don't have a `Row` to put them in yet.
714#[derive(Debug)]
715pub struct RowArena {
716    // Semantically, this field would be better represented by a `Vec<Box<[u8]>>`,
717    // as once the arena takes ownership of a byte vector the vector is never
718    // modified. But `RowArena::push_bytes` takes ownership of a `Vec<u8>`, so
719    // storing that `Vec<u8>` directly avoids an allocation. The cost is
720    // additional memory use, as the vector may have spare capacity, but row
721    // arenas are short lived so this is the better tradeoff.
722    inner: RefCell<Vec<Vec<u8>>>,
723}
724
725// DatumList and DatumDict defined here rather than near Datum because we need private access to the unsafe data field
726
727/// A sequence of Datums
728#[derive(Clone, Copy, Eq, PartialEq, Hash)]
729pub struct DatumList<'a> {
730    /// Points at the serialized datums
731    data: &'a [u8],
732}
733
734impl<'a> Debug for DatumList<'a> {
735    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
736        f.debug_list().entries(self.iter()).finish()
737    }
738}
739
740impl Ord for DatumList<'_> {
741    fn cmp(&self, other: &DatumList) -> Ordering {
742        self.iter().cmp(other.iter())
743    }
744}
745
746impl PartialOrd for DatumList<'_> {
747    fn partial_cmp(&self, other: &DatumList) -> Option<Ordering> {
748        Some(self.cmp(other))
749    }
750}
751
752/// A mapping from string keys to Datums
753#[derive(Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
754pub struct DatumMap<'a> {
755    /// Points at the serialized datums, which should be sorted in key order
756    data: &'a [u8],
757}
758
759/// Represents a single `Datum`, appropriate to be nested inside other
760/// `Datum`s.
761#[derive(Clone, Copy, Eq, PartialEq, Hash)]
762pub struct DatumNested<'a> {
763    val: &'a [u8],
764}
765
766impl<'a> std::fmt::Display for DatumNested<'a> {
767    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
768        std::fmt::Display::fmt(&self.datum(), f)
769    }
770}
771
772impl<'a> std::fmt::Debug for DatumNested<'a> {
773    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
774        f.debug_struct("DatumNested")
775            .field("val", &self.datum())
776            .finish()
777    }
778}
779
780impl<'a> DatumNested<'a> {
781    // Figure out which bytes `read_datum` returns (e.g. including the tag),
782    // and then store a reference to those bytes, so we can "replay" this same
783    // call later on without storing the datum itself.
784    pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
785        let prev = *data;
786        let _ = unsafe { read_datum(data) };
787        DatumNested {
788            val: &prev[..(prev.len() - data.len())],
789        }
790    }
791
792    /// Returns the datum `self` contains.
793    pub fn datum(&self) -> Datum<'a> {
794        let mut temp = self.val;
795        unsafe { read_datum(&mut temp) }
796    }
797}
798
799impl<'a> Ord for DatumNested<'a> {
800    fn cmp(&self, other: &Self) -> Ordering {
801        self.datum().cmp(&other.datum())
802    }
803}
804
805impl<'a> PartialOrd for DatumNested<'a> {
806    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
807        Some(self.cmp(other))
808    }
809}
810
811// Prefer adding new tags to the end of the enum. Certain behavior, like row ordering and EXPLAIN
812// PHYSICAL PLAN, rely on the ordering of this enum. Neither of these are breaking changes, but
813// it's annoying when they change.
814#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
815#[repr(u8)]
816enum Tag {
817    Null,
818    False,
819    True,
820    Int16,
821    Int32,
822    Int64,
823    UInt8,
824    UInt32,
825    Float32,
826    Float64,
827    Date,
828    Time,
829    Timestamp,
830    TimestampTz,
831    Interval,
832    BytesTiny,
833    BytesShort,
834    BytesLong,
835    BytesHuge,
836    StringTiny,
837    StringShort,
838    StringLong,
839    StringHuge,
840    Uuid,
841    Array,
842    ListTiny,
843    ListShort,
844    ListLong,
845    ListHuge,
846    Dict,
847    JsonNull,
848    Dummy,
849    Numeric,
850    UInt16,
851    UInt64,
852    MzTimestamp,
853    Range,
854    MzAclItem,
855    AclItem,
856    // Everything except leap seconds and times beyond the range of
857    // i64 nanoseconds. (Note that Materialize does not support leap
858    // seconds, but this module does).
859    CheapTimestamp,
860    // Everything except leap seconds and times beyond the range of
861    // i64 nanoseconds. (Note that Materialize does not support leap
862    // seconds, but this module does).
863    CheapTimestampTz,
864    // The next several tags are for variable-length signed integer encoding.
865    // The basic idea is that `NonNegativeIntN_K` is used to encode a datum of type
866    // IntN whose actual value is positive or zero and fits in K bits, and similarly for
867    // NegativeIntN_K with negative values.
868    //
869    // The order of these tags matters, because we want to be able to choose the
870    // tag for a given datum quickly, with arithmetic, rather than slowly, with a
871    // stack of `if` statements.
872    //
873    // Separate tags for non-negative and negative numbers are used to avoid having to
874    // waste one bit in the actual data space to encode the sign.
875    NonNegativeInt16_0, // i.e., 0
876    NonNegativeInt16_8,
877    NonNegativeInt16_16,
878
879    NonNegativeInt32_0,
880    NonNegativeInt32_8,
881    NonNegativeInt32_16,
882    NonNegativeInt32_24,
883    NonNegativeInt32_32,
884
885    NonNegativeInt64_0,
886    NonNegativeInt64_8,
887    NonNegativeInt64_16,
888    NonNegativeInt64_24,
889    NonNegativeInt64_32,
890    NonNegativeInt64_40,
891    NonNegativeInt64_48,
892    NonNegativeInt64_56,
893    NonNegativeInt64_64,
894
895    NegativeInt16_0, // i.e., -1
896    NegativeInt16_8,
897    NegativeInt16_16,
898
899    NegativeInt32_0,
900    NegativeInt32_8,
901    NegativeInt32_16,
902    NegativeInt32_24,
903    NegativeInt32_32,
904
905    NegativeInt64_0,
906    NegativeInt64_8,
907    NegativeInt64_16,
908    NegativeInt64_24,
909    NegativeInt64_32,
910    NegativeInt64_40,
911    NegativeInt64_48,
912    NegativeInt64_56,
913    NegativeInt64_64,
914
915    // These are like the ones above, but for unsigned types. The
916    // situation is slightly simpler as we don't have negatives.
917    UInt8_0, // i.e., 0
918    UInt8_8,
919
920    UInt16_0,
921    UInt16_8,
922    UInt16_16,
923
924    UInt32_0,
925    UInt32_8,
926    UInt32_16,
927    UInt32_24,
928    UInt32_32,
929
930    UInt64_0,
931    UInt64_8,
932    UInt64_16,
933    UInt64_24,
934    UInt64_32,
935    UInt64_40,
936    UInt64_48,
937    UInt64_56,
938    UInt64_64,
939}
940
941impl Tag {
942    fn actual_int_length(self) -> Option<usize> {
943        use Tag::*;
944        let val = match self {
945            NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
946            | UInt32_0 | UInt64_0 => 0,
947            NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
948            | UInt32_8 | UInt64_8 => 1,
949            NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
950            | UInt32_16 | UInt64_16 => 2,
951            NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
952            NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
953            NonNegativeInt64_40 | UInt64_40 => 5,
954            NonNegativeInt64_48 | UInt64_48 => 6,
955            NonNegativeInt64_56 | UInt64_56 => 7,
956            NonNegativeInt64_64 | UInt64_64 => 8,
957            NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
958            NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
959            NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
960            NegativeInt32_24 | NegativeInt64_24 => 3,
961            NegativeInt32_32 | NegativeInt64_32 => 4,
962            NegativeInt64_40 => 5,
963            NegativeInt64_48 => 6,
964            NegativeInt64_56 => 7,
965            NegativeInt64_64 => 8,
966
967            _ => return None,
968        };
969        Some(val)
970    }
971}
972
973// --------------------------------------------------------------------------------
974// reading data
975
976/// Read a byte slice starting at byte `offset`.
977///
978/// Updates `offset` to point to the first byte after the end of the read region.
979fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
980    let len = u64::from_le_bytes(read_byte_array(data));
981    let len = usize::cast_from(len);
982    let (bytes, next) = data.split_at(len);
983    *data = next;
984    bytes
985}
986
987/// Read a data whose length is encoded in the row before its contents.
988///
989/// Updates `offset` to point to the first byte after the end of the read region.
990///
991/// # Safety
992///
993/// This function is safe if the datum's length and contents were previously written by `push_lengthed_bytes`,
994/// and it was only written with a `String` tag if it was indeed UTF-8.
995unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
996    let len = match tag {
997        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
998        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
999            usize::from(u16::from_le_bytes(read_byte_array(data)))
1000        }
1001        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1002            usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
1003        }
1004        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1005            usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
1006        }
1007        _ => unreachable!(),
1008    };
1009    let (bytes, next) = data.split_at(len);
1010    *data = next;
1011    match tag {
1012        Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
1013        Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
1014            Datum::String(str::from_utf8_unchecked(bytes))
1015        }
1016        Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
1017            Datum::List(DatumList { data: bytes })
1018        }
1019        _ => unreachable!(),
1020    }
1021}
1022
1023fn read_byte(data: &mut &[u8]) -> u8 {
1024    let byte = data[0];
1025    *data = &data[1..];
1026    byte
1027}
1028
1029/// Read `length` bytes from `data` at `offset`, updating the
1030/// latter. Extend the resulting buffer to an array of `N` bytes by
1031/// inserting `FILL` in the k most significant bytes, where k = N - length.
1032///
1033/// SAFETY:
1034///   * length <= N
1035///   * offset + length <= data.len()
1036fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1037    data: &mut &[u8],
1038    length: usize,
1039) -> [u8; N] {
1040    let mut raw = [FILL; N];
1041    let (prev, next) = data.split_at(length);
1042    (raw[..prev.len()]).copy_from_slice(prev);
1043    *data = next;
1044    raw
1045}
1046/// Read `length` bytes from `data` at `offset`, updating the
1047/// latter. Extend the resulting buffer to a negative `N`-byte
1048/// twos complement integer by filling the remaining bits with 1.
1049///
1050/// SAFETY:
1051///   * length <= N
1052///   * offset + length <= data.len()
1053fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1054    read_byte_array_sign_extending::<N, 255>(data, length)
1055}
1056
1057/// Read `length` bytes from `data` at `offset`, updating the
1058/// latter. Extend the resulting buffer to a positive or zero `N`-byte
1059/// twos complement integer by filling the remaining bits with 0.
1060///
1061/// SAFETY:
1062///   * length <= N
1063///   * offset + length <= data.len()
1064fn read_byte_array_extending_nonnegative<const N: usize>(
1065    data: &mut &[u8],
1066    length: usize,
1067) -> [u8; N] {
1068    read_byte_array_sign_extending::<N, 0>(data, length)
1069}
1070
1071pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1072    let (prev, next) = data.split_first_chunk().unwrap();
1073    *data = next;
1074    *prev
1075}
1076
1077pub(super) fn read_date(data: &mut &[u8]) -> Date {
1078    let days = i32::from_le_bytes(read_byte_array(data));
1079    Date::from_pg_epoch(days).expect("unexpected date")
1080}
1081
1082pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1083    let year = i32::from_le_bytes(read_byte_array(data));
1084    let ordinal = u32::from_le_bytes(read_byte_array(data));
1085    NaiveDate::from_yo_opt(year, ordinal).unwrap()
1086}
1087
1088pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1089    let secs = u32::from_le_bytes(read_byte_array(data));
1090    let nanos = u32::from_le_bytes(read_byte_array(data));
1091    NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1092}
1093
1094/// Read a datum starting at byte `offset`.
1095///
1096/// Updates `offset` to point to the first byte after the end of the read region.
1097///
1098/// # Safety
1099///
1100/// This function is safe if a `Datum` was previously written at this offset by `push_datum`.
1101/// Otherwise it could return invalid values, which is Undefined Behavior.
1102pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1103    let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1104    match tag {
1105        Tag::Null => Datum::Null,
1106        Tag::False => Datum::False,
1107        Tag::True => Datum::True,
1108        Tag::UInt8_0 | Tag::UInt8_8 => {
1109            let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1110                data,
1111                tag.actual_int_length()
1112                    .expect("returns a value for variable-length-encoded integer tags"),
1113            ));
1114            Datum::UInt8(i)
1115        }
1116        Tag::Int16 => {
1117            let i = i16::from_le_bytes(read_byte_array(data));
1118            Datum::Int16(i)
1119        }
1120        Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1121            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1122            // and `data` is big enough because it was encoded validly. These assumptions
1123            // are checked in debug asserts.
1124            let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1125                data,
1126                tag.actual_int_length()
1127                    .expect("returns a value for variable-length-encoded integer tags"),
1128            ));
1129            Datum::Int16(i)
1130        }
1131        Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1132            let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1133                data,
1134                tag.actual_int_length()
1135                    .expect("returns a value for variable-length-encoded integer tags"),
1136            ));
1137            Datum::UInt16(i)
1138        }
1139        Tag::Int32 => {
1140            let i = i32::from_le_bytes(read_byte_array(data));
1141            Datum::Int32(i)
1142        }
1143        Tag::NonNegativeInt32_0
1144        | Tag::NonNegativeInt32_32
1145        | Tag::NonNegativeInt32_8
1146        | Tag::NonNegativeInt32_16
1147        | Tag::NonNegativeInt32_24 => {
1148            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1149            // and `data` is big enough because it was encoded validly. These assumptions
1150            // are checked in debug asserts.
1151            let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1152                data,
1153                tag.actual_int_length()
1154                    .expect("returns a value for variable-length-encoded integer tags"),
1155            ));
1156            Datum::Int32(i)
1157        }
1158        Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1159            let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1160                data,
1161                tag.actual_int_length()
1162                    .expect("returns a value for variable-length-encoded integer tags"),
1163            ));
1164            Datum::UInt32(i)
1165        }
1166        Tag::Int64 => {
1167            let i = i64::from_le_bytes(read_byte_array(data));
1168            Datum::Int64(i)
1169        }
1170        Tag::NonNegativeInt64_0
1171        | Tag::NonNegativeInt64_64
1172        | Tag::NonNegativeInt64_8
1173        | Tag::NonNegativeInt64_16
1174        | Tag::NonNegativeInt64_24
1175        | Tag::NonNegativeInt64_32
1176        | Tag::NonNegativeInt64_40
1177        | Tag::NonNegativeInt64_48
1178        | Tag::NonNegativeInt64_56 => {
1179            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1180            // and `data` is big enough because it was encoded validly. These assumptions
1181            // are checked in debug asserts.
1182
1183            let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1184                data,
1185                tag.actual_int_length()
1186                    .expect("returns a value for variable-length-encoded integer tags"),
1187            ));
1188            Datum::Int64(i)
1189        }
1190        Tag::UInt64_0
1191        | Tag::UInt64_8
1192        | Tag::UInt64_16
1193        | Tag::UInt64_24
1194        | Tag::UInt64_32
1195        | Tag::UInt64_40
1196        | Tag::UInt64_48
1197        | Tag::UInt64_56
1198        | Tag::UInt64_64 => {
1199            let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1200                data,
1201                tag.actual_int_length()
1202                    .expect("returns a value for variable-length-encoded integer tags"),
1203            ));
1204            Datum::UInt64(i)
1205        }
1206        Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1207            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1208            // and `data` is big enough because it was encoded validly. These assumptions
1209            // are checked in debug asserts.
1210            let i = i16::from_le_bytes(read_byte_array_extending_negative(
1211                data,
1212                tag.actual_int_length()
1213                    .expect("returns a value for variable-length-encoded integer tags"),
1214            ));
1215            Datum::Int16(i)
1216        }
1217        Tag::NegativeInt32_0
1218        | Tag::NegativeInt32_32
1219        | Tag::NegativeInt32_8
1220        | Tag::NegativeInt32_16
1221        | Tag::NegativeInt32_24 => {
1222            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1223            // and `data` is big enough because it was encoded validly. These assumptions
1224            // are checked in debug asserts.
1225            let i = i32::from_le_bytes(read_byte_array_extending_negative(
1226                data,
1227                tag.actual_int_length()
1228                    .expect("returns a value for variable-length-encoded integer tags"),
1229            ));
1230            Datum::Int32(i)
1231        }
1232        Tag::NegativeInt64_0
1233        | Tag::NegativeInt64_64
1234        | Tag::NegativeInt64_8
1235        | Tag::NegativeInt64_16
1236        | Tag::NegativeInt64_24
1237        | Tag::NegativeInt64_32
1238        | Tag::NegativeInt64_40
1239        | Tag::NegativeInt64_48
1240        | Tag::NegativeInt64_56 => {
1241            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1242            // and `data` is big enough because the row was encoded validly. These assumptions
1243            // are checked in debug asserts.
1244            let i = i64::from_le_bytes(read_byte_array_extending_negative(
1245                data,
1246                tag.actual_int_length()
1247                    .expect("returns a value for variable-length-encoded integer tags"),
1248            ));
1249            Datum::Int64(i)
1250        }
1251
1252        Tag::UInt8 => {
1253            let i = u8::from_le_bytes(read_byte_array(data));
1254            Datum::UInt8(i)
1255        }
1256        Tag::UInt16 => {
1257            let i = u16::from_le_bytes(read_byte_array(data));
1258            Datum::UInt16(i)
1259        }
1260        Tag::UInt32 => {
1261            let i = u32::from_le_bytes(read_byte_array(data));
1262            Datum::UInt32(i)
1263        }
1264        Tag::UInt64 => {
1265            let i = u64::from_le_bytes(read_byte_array(data));
1266            Datum::UInt64(i)
1267        }
1268        Tag::Float32 => {
1269            let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1270            Datum::Float32(OrderedFloat::from(f))
1271        }
1272        Tag::Float64 => {
1273            let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1274            Datum::Float64(OrderedFloat::from(f))
1275        }
1276        Tag::Date => Datum::Date(read_date(data)),
1277        Tag::Time => Datum::Time(read_time(data)),
1278        Tag::CheapTimestamp => {
1279            let ts = i64::from_le_bytes(read_byte_array(data));
1280            let secs = ts.div_euclid(1_000_000_000);
1281            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1282            let ndt = DateTime::from_timestamp(secs, nsecs)
1283                .expect("We only write round-trippable timestamps")
1284                .naive_utc();
1285            Datum::Timestamp(
1286                CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1287            )
1288        }
1289        Tag::CheapTimestampTz => {
1290            let ts = i64::from_le_bytes(read_byte_array(data));
1291            let secs = ts.div_euclid(1_000_000_000);
1292            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1293            let dt = DateTime::from_timestamp(secs, nsecs)
1294                .expect("We only write round-trippable timestamps");
1295            Datum::TimestampTz(
1296                CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1297            )
1298        }
1299        Tag::Timestamp => {
1300            let date = read_naive_date(data);
1301            let time = read_time(data);
1302            Datum::Timestamp(
1303                CheckedTimestamp::from_timestamplike(date.and_time(time))
1304                    .expect("unexpected timestamp"),
1305            )
1306        }
1307        Tag::TimestampTz => {
1308            let date = read_naive_date(data);
1309            let time = read_time(data);
1310            Datum::TimestampTz(
1311                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1312                    date.and_time(time),
1313                    Utc,
1314                ))
1315                .expect("unexpected timestamptz"),
1316            )
1317        }
1318        Tag::Interval => {
1319            let months = i32::from_le_bytes(read_byte_array(data));
1320            let days = i32::from_le_bytes(read_byte_array(data));
1321            let micros = i64::from_le_bytes(read_byte_array(data));
1322            Datum::Interval(Interval {
1323                months,
1324                days,
1325                micros,
1326            })
1327        }
1328        Tag::BytesTiny
1329        | Tag::BytesShort
1330        | Tag::BytesLong
1331        | Tag::BytesHuge
1332        | Tag::StringTiny
1333        | Tag::StringShort
1334        | Tag::StringLong
1335        | Tag::StringHuge
1336        | Tag::ListTiny
1337        | Tag::ListShort
1338        | Tag::ListLong
1339        | Tag::ListHuge => read_lengthed_datum(data, tag),
1340        Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1341        Tag::Array => {
1342            // See the comment in `Row::push_array` for details on the encoding
1343            // of arrays.
1344            let ndims = read_byte(data);
1345            let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1346            let (dims, next) = data.split_at(dims_size);
1347            *data = next;
1348            let bytes = read_untagged_bytes(data);
1349            Datum::Array(Array {
1350                dims: ArrayDimensions { data: dims },
1351                elements: DatumList { data: bytes },
1352            })
1353        }
1354        Tag::Dict => {
1355            let bytes = read_untagged_bytes(data);
1356            Datum::Map(DatumMap { data: bytes })
1357        }
1358        Tag::JsonNull => Datum::JsonNull,
1359        Tag::Dummy => Datum::Dummy,
1360        Tag::Numeric => {
1361            let digits = read_byte(data).into();
1362            let exponent = i8::reinterpret_cast(read_byte(data));
1363            let bits = read_byte(data);
1364
1365            let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1366            let lsu_u8_len = lsu_u16_len * 2;
1367            let (lsu_u8, next) = data.split_at(lsu_u8_len);
1368            *data = next;
1369
1370            // TODO: if we refactor the decimal library to accept the owned
1371            // array as a parameter to `from_raw_parts` below, we could likely
1372            // avoid a copy because it is exactly the value we want
1373            let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1374            for (i, c) in lsu_u8.chunks(2).enumerate() {
1375                lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1376            }
1377
1378            let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1379            Datum::from(d)
1380        }
1381        Tag::MzTimestamp => {
1382            let t = Timestamp::decode(read_byte_array(data));
1383            Datum::MzTimestamp(t)
1384        }
1385        Tag::Range => {
1386            // See notes on `push_range_with` for details about encoding.
1387            let flag_byte = read_byte(data);
1388            let flags = range::InternalFlags::from_bits(flag_byte)
1389                .expect("range flags must be encoded validly");
1390
1391            if flags.contains(range::InternalFlags::EMPTY) {
1392                assert!(
1393                    flags == range::InternalFlags::EMPTY,
1394                    "empty ranges contain only RANGE_EMPTY flag"
1395                );
1396
1397                return Datum::Range(Range { inner: None });
1398            }
1399
1400            let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1401                None
1402            } else {
1403                Some(DatumNested::extract(data))
1404            };
1405
1406            let lower = RangeBound {
1407                inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1408                bound: lower_bound,
1409            };
1410
1411            let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1412                None
1413            } else {
1414                Some(DatumNested::extract(data))
1415            };
1416
1417            let upper = RangeBound {
1418                inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1419                bound: upper_bound,
1420            };
1421
1422            Datum::Range(Range {
1423                inner: Some(RangeInner { lower, upper }),
1424            })
1425        }
1426        Tag::MzAclItem => {
1427            const N: usize = MzAclItem::binary_size();
1428            let mz_acl_item =
1429                MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1430            Datum::MzAclItem(mz_acl_item)
1431        }
1432        Tag::AclItem => {
1433            const N: usize = AclItem::binary_size();
1434            let acl_item =
1435                AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1436            Datum::AclItem(acl_item)
1437        }
1438    }
1439}
1440
1441// --------------------------------------------------------------------------------
1442// writing data
1443
1444fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1445where
1446    D: Vector<u8>,
1447{
1448    let len = u64::cast_from(bytes.len());
1449    data.extend_from_slice(&len.to_le_bytes());
1450    data.extend_from_slice(bytes);
1451}
1452
1453fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1454where
1455    D: Vector<u8>,
1456{
1457    match tag {
1458        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1459            let len = bytes.len().to_le_bytes();
1460            data.push(len[0]);
1461        }
1462        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1463            let len = bytes.len().to_le_bytes();
1464            data.extend_from_slice(&len[0..2]);
1465        }
1466        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1467            let len = bytes.len().to_le_bytes();
1468            data.extend_from_slice(&len[0..4]);
1469        }
1470        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1471            let len = bytes.len().to_le_bytes();
1472            data.extend_from_slice(&len);
1473        }
1474        _ => unreachable!(),
1475    }
1476    data.extend_from_slice(bytes);
1477}
1478
1479pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1480    i32::to_le_bytes(date.pg_epoch_days())
1481}
1482
1483fn push_date<D>(data: &mut D, date: Date)
1484where
1485    D: Vector<u8>,
1486{
1487    data.extend_from_slice(&date_to_array(date));
1488}
1489
1490pub(super) fn naive_date_to_arrays(
1491    date: NaiveDate,
1492) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1493    (
1494        i32::to_le_bytes(date.year()),
1495        u32::to_le_bytes(date.ordinal()),
1496    )
1497}
1498
1499fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1500where
1501    D: Vector<u8>,
1502{
1503    let (ds1, ds2) = naive_date_to_arrays(date);
1504    data.extend_from_slice(&ds1);
1505    data.extend_from_slice(&ds2);
1506}
1507
1508pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1509    (
1510        u32::to_le_bytes(time.num_seconds_from_midnight()),
1511        u32::to_le_bytes(time.nanosecond()),
1512    )
1513}
1514
1515fn push_time<D>(data: &mut D, time: NaiveTime)
1516where
1517    D: Vector<u8>,
1518{
1519    let (ts1, ts2) = time_to_arrays(time);
1520    data.extend_from_slice(&ts1);
1521    data.extend_from_slice(&ts2);
1522}
1523
1524/// Returns an i64 representing a `NaiveDateTime`, if
1525/// said i64 can be round-tripped back to a `NaiveDateTime`.
1526///
1527/// The only exotic NDTs for which this can't happen are those that
1528/// are hundreds of years in the future or past, or those that
1529/// represent a leap second. (Note that Materialize does not support
1530/// leap seconds, but this module does).
1531// This function is inspired by `NaiveDateTime::timestamp_nanos`,
1532// with extra checking.
1533fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1534    let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1535    if subsec_nanos >= 1_000_000_000 {
1536        return None;
1537    }
1538    let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1539    as_ns.checked_add(i64::from(subsec_nanos))
1540}
1541
1542// This function is extremely hot, so
1543// we just use `as` to avoid the overhead of
1544// `try_into` followed by `unwrap`.
1545// `leading_ones` and `leading_zeros`
1546// can never return values greater than 64, so the conversion is safe.
1547#[inline(always)]
1548#[allow(clippy::as_conversions)]
1549fn min_bytes_signed<T>(i: T) -> u8
1550where
1551    T: Into<i64>,
1552{
1553    let i: i64 = i.into();
1554
1555    // To fit in n bytes, we require that
1556    // everything but the leading sign bits fits in n*8
1557    // bits.
1558    let n_sign_bits = if i.is_negative() {
1559        i.leading_ones() as u8
1560    } else {
1561        i.leading_zeros() as u8
1562    };
1563
1564    (64 - n_sign_bits + 7) / 8
1565}
1566
1567// In principle we could just use `min_bytes_signed`, rather than
1568// having a separate function here, as long as we made that one take
1569// `T: Into<i128>` instead of 64. But LLVM doesn't seem smart enough
1570// to realize that that function is the same as the current version,
1571// and generates worse code.
1572//
1573// Justification for `as` is the same as in `min_bytes_signed`.
1574#[inline(always)]
1575#[allow(clippy::as_conversions)]
1576fn min_bytes_unsigned<T>(i: T) -> u8
1577where
1578    T: Into<u64>,
1579{
1580    let i: u64 = i.into();
1581
1582    let n_sign_bits = i.leading_zeros() as u8;
1583
1584    (64 - n_sign_bits + 7) / 8
1585}
1586
1587const TINY: usize = 1 << 8;
1588const SHORT: usize = 1 << 16;
1589const LONG: usize = 1 << 32;
1590
1591fn push_datum<D>(data: &mut D, datum: Datum)
1592where
1593    D: Vector<u8>,
1594{
1595    match datum {
1596        Datum::Null => data.push(Tag::Null.into()),
1597        Datum::False => data.push(Tag::False.into()),
1598        Datum::True => data.push(Tag::True.into()),
1599        Datum::Int16(i) => {
1600            let mbs = min_bytes_signed(i);
1601            let tag = u8::from(if i.is_negative() {
1602                Tag::NegativeInt16_0
1603            } else {
1604                Tag::NonNegativeInt16_0
1605            }) + mbs;
1606
1607            data.push(tag);
1608            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1609        }
1610        Datum::Int32(i) => {
1611            let mbs = min_bytes_signed(i);
1612            let tag = u8::from(if i.is_negative() {
1613                Tag::NegativeInt32_0
1614            } else {
1615                Tag::NonNegativeInt32_0
1616            }) + mbs;
1617
1618            data.push(tag);
1619            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1620        }
1621        Datum::Int64(i) => {
1622            let mbs = min_bytes_signed(i);
1623            let tag = u8::from(if i.is_negative() {
1624                Tag::NegativeInt64_0
1625            } else {
1626                Tag::NonNegativeInt64_0
1627            }) + mbs;
1628
1629            data.push(tag);
1630            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1631        }
1632        Datum::UInt8(i) => {
1633            let mbu = min_bytes_unsigned(i);
1634            let tag = u8::from(Tag::UInt8_0) + mbu;
1635            data.push(tag);
1636            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1637        }
1638        Datum::UInt16(i) => {
1639            let mbu = min_bytes_unsigned(i);
1640            let tag = u8::from(Tag::UInt16_0) + mbu;
1641            data.push(tag);
1642            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1643        }
1644        Datum::UInt32(i) => {
1645            let mbu = min_bytes_unsigned(i);
1646            let tag = u8::from(Tag::UInt32_0) + mbu;
1647            data.push(tag);
1648            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1649        }
1650        Datum::UInt64(i) => {
1651            let mbu = min_bytes_unsigned(i);
1652            let tag = u8::from(Tag::UInt64_0) + mbu;
1653            data.push(tag);
1654            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1655        }
1656        Datum::Float32(f) => {
1657            data.push(Tag::Float32.into());
1658            data.extend_from_slice(&f.to_bits().to_le_bytes());
1659        }
1660        Datum::Float64(f) => {
1661            data.push(Tag::Float64.into());
1662            data.extend_from_slice(&f.to_bits().to_le_bytes());
1663        }
1664        Datum::Date(d) => {
1665            data.push(Tag::Date.into());
1666            push_date(data, d);
1667        }
1668        Datum::Time(t) => {
1669            data.push(Tag::Time.into());
1670            push_time(data, t);
1671        }
1672        Datum::Timestamp(t) => {
1673            let datetime = t.to_naive();
1674            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1675                data.push(Tag::CheapTimestamp.into());
1676                data.extend_from_slice(&nanos.to_le_bytes());
1677            } else {
1678                data.push(Tag::Timestamp.into());
1679                push_naive_date(data, datetime.date());
1680                push_time(data, datetime.time());
1681            }
1682        }
1683        Datum::TimestampTz(t) => {
1684            let datetime = t.to_naive();
1685            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1686                data.push(Tag::CheapTimestampTz.into());
1687                data.extend_from_slice(&nanos.to_le_bytes());
1688            } else {
1689                data.push(Tag::TimestampTz.into());
1690                push_naive_date(data, datetime.date());
1691                push_time(data, datetime.time());
1692            }
1693        }
1694        Datum::Interval(i) => {
1695            data.push(Tag::Interval.into());
1696            data.extend_from_slice(&i.months.to_le_bytes());
1697            data.extend_from_slice(&i.days.to_le_bytes());
1698            data.extend_from_slice(&i.micros.to_le_bytes());
1699        }
1700        Datum::Bytes(bytes) => {
1701            let tag = match bytes.len() {
1702                0..TINY => Tag::BytesTiny,
1703                TINY..SHORT => Tag::BytesShort,
1704                SHORT..LONG => Tag::BytesLong,
1705                _ => Tag::BytesHuge,
1706            };
1707            data.push(tag.into());
1708            push_lengthed_bytes(data, bytes, tag);
1709        }
1710        Datum::String(string) => {
1711            let tag = match string.len() {
1712                0..TINY => Tag::StringTiny,
1713                TINY..SHORT => Tag::StringShort,
1714                SHORT..LONG => Tag::StringLong,
1715                _ => Tag::StringHuge,
1716            };
1717            data.push(tag.into());
1718            push_lengthed_bytes(data, string.as_bytes(), tag);
1719        }
1720        Datum::List(list) => {
1721            let tag = match list.data.len() {
1722                0..TINY => Tag::ListTiny,
1723                TINY..SHORT => Tag::ListShort,
1724                SHORT..LONG => Tag::ListLong,
1725                _ => Tag::ListHuge,
1726            };
1727            data.push(tag.into());
1728            push_lengthed_bytes(data, list.data, tag);
1729        }
1730        Datum::Uuid(u) => {
1731            data.push(Tag::Uuid.into());
1732            data.extend_from_slice(u.as_bytes());
1733        }
1734        Datum::Array(array) => {
1735            // See the comment in `Row::push_array` for details on the encoding
1736            // of arrays.
1737            data.push(Tag::Array.into());
1738            data.push(array.dims.ndims());
1739            data.extend_from_slice(array.dims.data);
1740            push_untagged_bytes(data, array.elements.data);
1741        }
1742        Datum::Map(dict) => {
1743            data.push(Tag::Dict.into());
1744            push_untagged_bytes(data, dict.data);
1745        }
1746        Datum::JsonNull => data.push(Tag::JsonNull.into()),
1747        Datum::MzTimestamp(t) => {
1748            data.push(Tag::MzTimestamp.into());
1749            data.extend_from_slice(&t.encode());
1750        }
1751        Datum::Dummy => data.push(Tag::Dummy.into()),
1752        Datum::Numeric(mut n) => {
1753            // Pseudo-canonical representation of decimal values with
1754            // insignificant zeroes trimmed. This compresses the number further
1755            // than `Numeric::trim` by removing all zeroes, and not only those in
1756            // the fractional component.
1757            numeric::cx_datum().reduce(&mut n.0);
1758            let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1759            data.push(Tag::Numeric.into());
1760            data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1761            data.push(
1762                i8::try_from(exponent)
1763                    .expect("exponent to fit within i8; should not exceed +/- 39")
1764                    .to_le_bytes()[0],
1765            );
1766            data.push(bits);
1767
1768            let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1769
1770            // Little endian machines can take the lsu directly from u16 to u8.
1771            if cfg!(target_endian = "little") {
1772                // SAFETY: `lsu` (returned by `coefficient_units()`) is a `&[u16]`, so
1773                // each element can safely be transmuted into two `u8`s.
1774                let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
1775                // The `u8` aligned version of the `lsu` should have twice as many
1776                // elements as we expect for the `u16` version.
1777                soft_assert_no_log!(
1778                    lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
1779                    "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
1780                    Numeric::digits_to_lsu_elements_len(digits) * 2,
1781                    lsu_bytes.len()
1782                );
1783                // There should be no unaligned elements in the prefix or suffix.
1784                soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
1785                data.extend_from_slice(lsu_bytes);
1786            } else {
1787                for u in lsu {
1788                    data.extend_from_slice(&u.to_le_bytes());
1789                }
1790            }
1791        }
1792        Datum::Range(range) => {
1793            // See notes on `push_range_with` for details about encoding.
1794            data.push(Tag::Range.into());
1795            data.push(range.internal_flag_bits());
1796
1797            if let Some(RangeInner { lower, upper }) = range.inner {
1798                for bound in [lower.bound, upper.bound] {
1799                    if let Some(bound) = bound {
1800                        match bound.datum() {
1801                            Datum::Null => panic!("cannot push Datum::Null into range"),
1802                            d => push_datum::<D>(data, d),
1803                        }
1804                    }
1805                }
1806            }
1807        }
1808        Datum::MzAclItem(mz_acl_item) => {
1809            data.push(Tag::MzAclItem.into());
1810            data.extend_from_slice(&mz_acl_item.encode_binary());
1811        }
1812        Datum::AclItem(acl_item) => {
1813            data.push(Tag::AclItem.into());
1814            data.extend_from_slice(&acl_item.encode_binary());
1815        }
1816    }
1817}
1818
1819/// Return the number of bytes these Datums would use if packed as a Row.
1820pub fn row_size<'a, I>(a: I) -> usize
1821where
1822    I: IntoIterator<Item = Datum<'a>>,
1823{
1824    // Using datums_size instead of a.data().len() here is safer because it will
1825    // return the size of the datums if they were packed into a Row. Although
1826    // a.data().len() happens to give the correct answer (and is faster), data()
1827    // is documented as for debugging only.
1828    let sz = datums_size::<_, _>(a);
1829    let size_of_row = std::mem::size_of::<Row>();
1830    // The Row struct attempts to inline data until it can't fit in the
1831    // preallocated size. Otherwise it spills to heap, and uses the Row to point
1832    // to that.
1833    if sz > Row::SIZE {
1834        sz + size_of_row
1835    } else {
1836        size_of_row
1837    }
1838}
1839
1840/// Number of bytes required by the datum.
1841/// This is used to optimistically pre-allocate buffers for packing rows.
1842pub fn datum_size(datum: &Datum) -> usize {
1843    match datum {
1844        Datum::Null => 1,
1845        Datum::False => 1,
1846        Datum::True => 1,
1847        Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
1848        Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
1849        Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
1850        Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1851        Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1852        Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1853        Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1854        Datum::Float32(_) => 1 + size_of::<f32>(),
1855        Datum::Float64(_) => 1 + size_of::<f64>(),
1856        Datum::Date(_) => 1 + size_of::<i32>(),
1857        Datum::Time(_) => 1 + 8,
1858        Datum::Timestamp(t) => {
1859            1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
1860                8
1861            } else {
1862                16
1863            }
1864        }
1865        Datum::TimestampTz(t) => {
1866            1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
1867                8
1868            } else {
1869                16
1870            }
1871        }
1872        Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
1873        Datum::Bytes(bytes) => {
1874            // We use a variable length representation of slice length.
1875            let bytes_for_length = match bytes.len() {
1876                0..TINY => 1,
1877                TINY..SHORT => 2,
1878                SHORT..LONG => 4,
1879                _ => 8,
1880            };
1881            1 + bytes_for_length + bytes.len()
1882        }
1883        Datum::String(string) => {
1884            // We use a variable length representation of slice length.
1885            let bytes_for_length = match string.len() {
1886                0..TINY => 1,
1887                TINY..SHORT => 2,
1888                SHORT..LONG => 4,
1889                _ => 8,
1890            };
1891            1 + bytes_for_length + string.len()
1892        }
1893        Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
1894        Datum::Array(array) => {
1895            1 + size_of::<u8>()
1896                + array.dims.data.len()
1897                + size_of::<u64>()
1898                + array.elements.data.len()
1899        }
1900        Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
1901        Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
1902        Datum::JsonNull => 1,
1903        Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
1904        Datum::Dummy => 1,
1905        Datum::Numeric(d) => {
1906            let mut d = d.0.clone();
1907            // Values must be reduced to determine appropriate number of
1908            // coefficient units.
1909            numeric::cx_datum().reduce(&mut d);
1910            // 4 = 1 bit each for tag, digits, exponent, bits
1911            4 + (d.coefficient_units().len() * 2)
1912        }
1913        Datum::Range(Range { inner }) => {
1914            // Tag + flags
1915            2 + match inner {
1916                None => 0,
1917                Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
1918                    .iter()
1919                    .map(|bound| match bound {
1920                        None => 0,
1921                        Some(bound) => bound.val.len(),
1922                    })
1923                    .sum(),
1924            }
1925        }
1926        Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
1927        Datum::AclItem(_) => 1 + AclItem::binary_size(),
1928    }
1929}
1930
1931/// Number of bytes required by a sequence of datums.
1932///
1933/// This method can be used to right-size the allocation for a `Row`
1934/// before calling [`RowPacker::extend`].
1935pub fn datums_size<'a, I, D>(iter: I) -> usize
1936where
1937    I: IntoIterator<Item = D>,
1938    D: Borrow<Datum<'a>>,
1939{
1940    iter.into_iter().map(|d| datum_size(d.borrow())).sum()
1941}
1942
1943/// Number of bytes required by a list of datums. This computes the size that would be required if
1944/// the given datums were packed into a list.
1945///
1946/// This is used to optimistically pre-allocate buffers for packing rows.
1947pub fn datum_list_size<'a, I, D>(iter: I) -> usize
1948where
1949    I: IntoIterator<Item = D>,
1950    D: Borrow<Datum<'a>>,
1951{
1952    1 + size_of::<u64>() + datums_size(iter)
1953}
1954
1955impl RowPacker<'_> {
1956    /// Constructs a row packer that will pack additional datums into the
1957    /// provided row.
1958    ///
1959    /// This function is intentionally somewhat inconvenient to call. You
1960    /// usually want to call [`Row::packer`] instead to start packing from
1961    /// scratch.
1962    pub fn for_existing_row(row: &mut Row) -> RowPacker<'_> {
1963        RowPacker { row }
1964    }
1965
1966    /// Extend an existing `Row` with a `Datum`.
1967    #[inline]
1968    pub fn push<'a, D>(&mut self, datum: D)
1969    where
1970        D: Borrow<Datum<'a>>,
1971    {
1972        push_datum(&mut self.row.data, *datum.borrow());
1973    }
1974
1975    /// Extend an existing `Row` with additional `Datum`s.
1976    #[inline]
1977    pub fn extend<'a, I, D>(&mut self, iter: I)
1978    where
1979        I: IntoIterator<Item = D>,
1980        D: Borrow<Datum<'a>>,
1981    {
1982        for datum in iter {
1983            push_datum(&mut self.row.data, *datum.borrow())
1984        }
1985    }
1986
1987    /// Extend an existing `Row` with additional `Datum`s.
1988    ///
1989    /// In the case the iterator produces an error, the pushing of
1990    /// datums in terminated and the error returned. The `Row` will
1991    /// be incomplete, but it will be safe to read datums from it.
1992    #[inline]
1993    pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
1994    where
1995        I: IntoIterator<Item = Result<D, E>>,
1996        D: Borrow<Datum<'a>>,
1997    {
1998        for datum in iter {
1999            push_datum(&mut self.row.data, *datum?.borrow());
2000        }
2001        Ok(())
2002    }
2003
2004    /// Appends the datums of an entire `Row`.
2005    pub fn extend_by_row(&mut self, row: &Row) {
2006        self.row.data.extend_from_slice(row.data.as_slice());
2007    }
2008
2009    /// Appends the slice of data representing an entire `Row`. The data is not validated.
2010    ///
2011    /// # Safety
2012    ///
2013    /// The requirements from [`Row::from_bytes_unchecked`] apply here, too:
2014    /// This method relies on `data` being an appropriate row encoding, and can
2015    /// result in unsafety if this is not the case.
2016    #[inline]
2017    pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
2018        self.row.data.extend_from_slice(data)
2019    }
2020
2021    /// Pushes a [`DatumList`] that is built from a closure.
2022    ///
2023    /// The supplied closure will be invoked once with a `Row` that can be used
2024    /// to populate the list. It is valid to call any method on the
2025    /// [`RowPacker`] except for [`RowPacker::clear`], [`RowPacker::truncate`],
2026    /// or [`RowPacker::truncate_datums`].
2027    ///
2028    /// Returns the value returned by the closure, if any.
2029    ///
2030    /// ```
2031    /// # use mz_repr::{Row, Datum};
2032    /// let mut row = Row::default();
2033    /// row.packer().push_list_with(|row| {
2034    ///     row.push(Datum::String("age"));
2035    ///     row.push(Datum::Int64(42));
2036    /// });
2037    /// assert_eq!(
2038    ///     row.unpack_first().unwrap_list().iter().collect::<Vec<_>>(),
2039    ///     vec![Datum::String("age"), Datum::Int64(42)],
2040    /// );
2041    /// ```
2042    #[inline]
2043    pub fn push_list_with<F, R>(&mut self, f: F) -> R
2044    where
2045        F: FnOnce(&mut RowPacker) -> R,
2046    {
2047        // First, assume that the list will fit in 255 bytes, and thus the length will fit in
2048        // 1 byte. If not, we'll fix it up later.
2049        let start = self.row.data.len();
2050        self.row.data.push(Tag::ListTiny.into());
2051        // Write a dummy len, will fix it up later.
2052        self.row.data.push(0);
2053
2054        let out = f(self);
2055
2056        // The `- 1 - 1` is for the tag and the len.
2057        let len = self.row.data.len() - start - 1 - 1;
2058        // We now know the real len.
2059        if len < TINY {
2060            // If the len fits in 1 byte, we just need to fix up the len.
2061            self.row.data[start + 1] = len.to_le_bytes()[0];
2062        } else {
2063            // Note: We move this code path into its own function, so that the common case can be
2064            // inlined.
2065            long_list(&mut self.row.data, start, len);
2066        }
2067
2068        /// 1. Fix up the tag.
2069        /// 2. Move the actual data a bit (for which we also need to make room at the end).
2070        /// 3. Fix up the len.
2071        /// `data`: The row's backing data.
2072        /// `start`: where `push_list_with` started writing in `data`.
2073        /// `len`: the length of the data, excluding the tag and the length.
2074        #[cold]
2075        fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2076            // `len_len`: the length of the length. (Possible values are: 2, 4, 8. 1 is handled
2077            // elsewhere.) The other parameters are the same as for `long_list`.
2078            let long_list_inner = |data: &mut CompactBytes, len_len| {
2079                // We'll need memory for the new, bigger length, so make the `CompactBytes` bigger.
2080                // The `- 1` is because the old length was 1 byte.
2081                const ZEROS: [u8; 8] = [0; 8];
2082                data.extend_from_slice(&ZEROS[0..len_len - 1]);
2083                // Move the data to the end of the `CompactBytes`, to make space for the new length.
2084                // Originally, it started after the 1-byte tag and the 1-byte length, now it will
2085                // start after the 1-byte tag and the len_len-byte length.
2086                //
2087                // Note that this is the only operation in `long_list` whose cost is proportional
2088                // to `len`. Since `len` is at least 256 here, the other operations' cost are
2089                // negligible. `copy_within` is a memmove, which is probably a fair bit faster per
2090                // Datum than a Datum encoding in the `f` closure.
2091                data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2092                // Write the new length.
2093                data[start + 1..start + 1 + len_len]
2094                    .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2095            };
2096            match len {
2097                0..TINY => {
2098                    unreachable!()
2099                }
2100                TINY..SHORT => {
2101                    data[start] = Tag::ListShort.into();
2102                    long_list_inner(data, 2);
2103                }
2104                SHORT..LONG => {
2105                    data[start] = Tag::ListLong.into();
2106                    long_list_inner(data, 4);
2107                }
2108                _ => {
2109                    data[start] = Tag::ListHuge.into();
2110                    long_list_inner(data, 8);
2111                }
2112            };
2113        }
2114
2115        out
2116    }
2117
2118    /// Pushes a [`DatumMap`] that is built from a closure.
2119    ///
2120    /// The supplied closure will be invoked once with a `Row` that can be used
2121    /// to populate the dict.
2122    ///
2123    /// The closure **must** alternate pushing string keys and arbitrary values,
2124    /// otherwise reading the dict will cause a panic.
2125    ///
2126    /// The closure **must** push keys in ascending order, otherwise equality
2127    /// checks on the resulting `Row` may be wrong and reading the dict IN DEBUG
2128    /// MODE will cause a panic.
2129    ///
2130    /// The closure **must not** call [`RowPacker::clear`],
2131    /// [`RowPacker::truncate`], or [`RowPacker::truncate_datums`].
2132    ///
2133    /// # Example
2134    ///
2135    /// ```
2136    /// # use mz_repr::{Row, Datum};
2137    /// let mut row = Row::default();
2138    /// row.packer().push_dict_with(|row| {
2139    ///
2140    ///     // key
2141    ///     row.push(Datum::String("age"));
2142    ///     // value
2143    ///     row.push(Datum::Int64(42));
2144    ///
2145    ///     // key
2146    ///     row.push(Datum::String("name"));
2147    ///     // value
2148    ///     row.push(Datum::String("bob"));
2149    /// });
2150    /// assert_eq!(
2151    ///     row.unpack_first().unwrap_map().iter().collect::<Vec<_>>(),
2152    ///     vec![("age", Datum::Int64(42)), ("name", Datum::String("bob"))]
2153    /// );
2154    /// ```
2155    pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2156    where
2157        F: FnOnce(&mut RowPacker) -> R,
2158    {
2159        self.row.data.push(Tag::Dict.into());
2160        let start = self.row.data.len();
2161        // write a dummy len, will fix it up later
2162        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2163
2164        let res = f(self);
2165
2166        let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2167        // fix up the len
2168        self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2169
2170        res
2171    }
2172
2173    /// Convenience function to construct an array from an iter of `Datum`s.
2174    ///
2175    /// Returns an error if the number of elements in `iter` does not match
2176    /// the cardinality of the array as described by `dims`, or if the
2177    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2178    /// occurs, the packer's state will be unchanged.
2179    pub fn try_push_array<'a, I, D>(
2180        &mut self,
2181        dims: &[ArrayDimension],
2182        iter: I,
2183    ) -> Result<(), InvalidArrayError>
2184    where
2185        I: IntoIterator<Item = D>,
2186        D: Borrow<Datum<'a>>,
2187    {
2188        // SAFETY: The function returns the exact number of elements pushed into the array.
2189        unsafe {
2190            self.push_array_with_unchecked(dims, |packer| {
2191                let mut nelements = 0;
2192                for datum in iter {
2193                    packer.push(datum);
2194                    nelements += 1;
2195                }
2196                Ok::<_, InvalidArrayError>(nelements)
2197            })
2198        }
2199    }
2200
2201    /// Convenience function to construct an array from a function. The function must return the
2202    /// number of elements it pushed into the array. It is undefined behavior if the function returns
2203    /// a number different to the number of elements it pushed.
2204    ///
2205    /// Returns an error if the number of elements pushed by `f` does not match
2206    /// the cardinality of the array as described by `dims`, or if the
2207    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`], or if `f` errors. If an error
2208    /// occurs, the packer's state will be unchanged.
2209    pub unsafe fn push_array_with_unchecked<F, E>(
2210        &mut self,
2211        dims: &[ArrayDimension],
2212        f: F,
2213    ) -> Result<(), E>
2214    where
2215        F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2216        E: From<InvalidArrayError>,
2217    {
2218        // Arrays are encoded as follows.
2219        //
2220        // u8    ndims
2221        // u64   dim_0 lower bound
2222        // u64   dim_0 length
2223        // ...
2224        // u64   dim_n lower bound
2225        // u64   dim_n length
2226        // u64   element data size in bytes
2227        // u8    element data, where elements are encoded in row-major order
2228
2229        if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2230            return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2231        }
2232
2233        let start = self.row.data.len();
2234        self.row.data.push(Tag::Array.into());
2235
2236        // Write dimension information.
2237        self.row
2238            .data
2239            .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2240        for dim in dims {
2241            self.row
2242                .data
2243                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2244            self.row
2245                .data
2246                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2247        }
2248
2249        // Write elements.
2250        let off = self.row.data.len();
2251        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2252        let nelements = match f(self) {
2253            Ok(nelements) => nelements,
2254            Err(e) => {
2255                self.row.data.truncate(start);
2256                return Err(e);
2257            }
2258        };
2259        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2260        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2261
2262        // Check that the number of elements written matches the dimension
2263        // information.
2264        let cardinality = match dims {
2265            [] => 0,
2266            dims => dims.iter().map(|d| d.length).product(),
2267        };
2268        if nelements != cardinality {
2269            self.row.data.truncate(start);
2270            return Err(InvalidArrayError::WrongCardinality {
2271                actual: nelements,
2272                expected: cardinality,
2273            }
2274            .into());
2275        }
2276
2277        Ok(())
2278    }
2279
2280    /// Pushes an [`Array`] that is built from a closure.
2281    ///
2282    /// __WARNING__: This is fairly "sharp" tool that is easy to get wrong. You
2283    /// should prefer [`RowPacker::try_push_array`] when possible.
2284    ///
2285    /// Returns an error if the number of elements pushed does not match
2286    /// the cardinality of the array as described by `dims`, or if the
2287    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2288    /// occurs, the packer's state will be unchanged.
2289    pub fn push_array_with_row_major<F, I>(
2290        &mut self,
2291        dims: I,
2292        f: F,
2293    ) -> Result<(), InvalidArrayError>
2294    where
2295        I: IntoIterator<Item = ArrayDimension>,
2296        F: FnOnce(&mut RowPacker) -> usize,
2297    {
2298        let start = self.row.data.len();
2299        self.row.data.push(Tag::Array.into());
2300
2301        // Write dummy dimension length for now, we'll fix it up.
2302        let dims_start = self.row.data.len();
2303        self.row.data.push(42);
2304
2305        let mut num_dims: u8 = 0;
2306        let mut cardinality: usize = 1;
2307        for dim in dims {
2308            num_dims += 1;
2309            cardinality *= dim.length;
2310
2311            self.row
2312                .data
2313                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2314            self.row
2315                .data
2316                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2317        }
2318
2319        if num_dims > MAX_ARRAY_DIMENSIONS {
2320            // Reset the packer state so we don't have invalid data.
2321            self.row.data.truncate(start);
2322            return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2323        }
2324        // Fix up our dimension length.
2325        self.row.data[dims_start..dims_start + size_of::<u8>()]
2326            .copy_from_slice(&num_dims.to_le_bytes());
2327
2328        // Write elements.
2329        let off = self.row.data.len();
2330        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2331
2332        let nelements = f(self);
2333
2334        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2335        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2336
2337        // Check that the number of elements written matches the dimension
2338        // information.
2339        let cardinality = match num_dims {
2340            0 => 0,
2341            _ => cardinality,
2342        };
2343        if nelements != cardinality {
2344            self.row.data.truncate(start);
2345            return Err(InvalidArrayError::WrongCardinality {
2346                actual: nelements,
2347                expected: cardinality,
2348            });
2349        }
2350
2351        Ok(())
2352    }
2353
2354    /// Convenience function to push a `DatumList` from an iter of `Datum`s
2355    ///
2356    /// See [`RowPacker::push_dict_with`] if you need to be able to handle errors
2357    pub fn push_list<'a, I, D>(&mut self, iter: I)
2358    where
2359        I: IntoIterator<Item = D>,
2360        D: Borrow<Datum<'a>>,
2361    {
2362        self.push_list_with(|packer| {
2363            for elem in iter {
2364                packer.push(*elem.borrow())
2365            }
2366        });
2367    }
2368
2369    /// Convenience function to push a `DatumMap` from an iter of `(&str, Datum)` pairs
2370    pub fn push_dict<'a, I, D>(&mut self, iter: I)
2371    where
2372        I: IntoIterator<Item = (&'a str, D)>,
2373        D: Borrow<Datum<'a>>,
2374    {
2375        self.push_dict_with(|packer| {
2376            for (k, v) in iter {
2377                packer.push(Datum::String(k));
2378                packer.push(*v.borrow())
2379            }
2380        })
2381    }
2382
2383    /// Pushes a `Datum::Range` derived from the `Range<Datum<'a>`.
2384    ///
2385    /// # Panics
2386    /// - If lower and upper express finite values and they are datums of
2387    ///   different types.
2388    /// - If lower or upper express finite values and are equal to
2389    ///   `Datum::Null`. To handle `Datum::Null` properly, use
2390    ///   [`RangeBound::new`].
2391    ///
2392    /// # Notes
2393    /// - This function canonicalizes the range before pushing it to the row.
2394    /// - Prefer this function over `push_range_with` because of its
2395    ///   canonicaliztion.
2396    /// - Prefer creating [`RangeBound`]s using [`RangeBound::new`], which
2397    ///   handles `Datum::Null` in a SQL-friendly way.
2398    pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2399        range.canonicalize()?;
2400        match range.inner {
2401            None => {
2402                self.row.data.push(Tag::Range.into());
2403                // Untagged bytes only contains the `RANGE_EMPTY` flag value.
2404                self.row.data.push(range::InternalFlags::EMPTY.bits());
2405                Ok(())
2406            }
2407            Some(inner) => self.push_range_with(
2408                RangeLowerBound {
2409                    inclusive: inner.lower.inclusive,
2410                    bound: inner
2411                        .lower
2412                        .bound
2413                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2414                },
2415                RangeUpperBound {
2416                    inclusive: inner.upper.inclusive,
2417                    bound: inner
2418                        .upper
2419                        .bound
2420                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2421                },
2422            ),
2423        }
2424    }
2425
2426    /// Pushes a `DatumRange` built from the specified arguments.
2427    ///
2428    /// # Warning
2429    /// Unlike `push_range`, `push_range_with` _does not_ canonicalize its
2430    /// inputs. Consequentially, this means it's possible to generate ranges
2431    /// that will not reflect the proper ordering and equality.
2432    ///
2433    /// # Panics
2434    /// - If lower or upper expresses a finite value and does not push exactly
2435    ///   one value into the `RowPacker`.
2436    /// - If lower and upper express finite values and they are datums of
2437    ///   different types.
2438    /// - If lower or upper express finite values and push `Datum::Null`.
2439    ///
2440    /// # Notes
2441    /// - Prefer `push_range_with` over this function. This function should be
2442    ///   used only when you are not pushing `Datum`s to the inner row.
2443    /// - Range encoding is `[<flag bytes>,<lower>?,<upper>?]`, where `lower`
2444    ///   and `upper` are optional, contingent on the flag value expressing an
2445    ///   empty range (where neither will be present) or infinite bounds (where
2446    ///   each infinite bound will be absent).
2447    /// - To push an emtpy range, use `push_range` using `Range { inner: None }`.
2448    pub fn push_range_with<L, U, E>(
2449        &mut self,
2450        lower: RangeLowerBound<L>,
2451        upper: RangeUpperBound<U>,
2452    ) -> Result<(), E>
2453    where
2454        L: FnOnce(&mut RowPacker) -> Result<(), E>,
2455        U: FnOnce(&mut RowPacker) -> Result<(), E>,
2456        E: From<InvalidRangeError>,
2457    {
2458        let start = self.row.data.len();
2459        self.row.data.push(Tag::Range.into());
2460
2461        let mut flags = range::InternalFlags::empty();
2462
2463        flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2464        flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2465        flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2466        flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2467
2468        let mut expected_datums = 0;
2469
2470        self.row.data.push(flags.bits());
2471
2472        let datum_check = self.row.data.len();
2473
2474        if let Some(value) = lower.bound {
2475            let start = self.row.data.len();
2476            value(self)?;
2477            assert!(
2478                start < self.row.data.len(),
2479                "finite values must each push exactly one value; expected 1 but got 0"
2480            );
2481            expected_datums += 1;
2482        }
2483
2484        if let Some(value) = upper.bound {
2485            let start = self.row.data.len();
2486            value(self)?;
2487            assert!(
2488                start < self.row.data.len(),
2489                "finite values must each push exactly one value; expected 1 but got 0"
2490            );
2491            expected_datums += 1;
2492        }
2493
2494        // Validate the invariants that 0, 1, or 2 elements were pushed, none are Null,
2495        // and if two are pushed then the second is not less than the first. Panic in
2496        // some cases and error in others.
2497        let mut actual_datums = 0;
2498        let mut seen = None;
2499        let mut dataz = &self.row.data[datum_check..];
2500        while !dataz.is_empty() {
2501            let d = unsafe { read_datum(&mut dataz) };
2502            assert!(d != Datum::Null, "cannot push Datum::Null into range");
2503
2504            match seen {
2505                None => seen = Some(d),
2506                Some(seen) => {
2507                    let seen_kind = DatumKind::from(seen);
2508                    let d_kind = DatumKind::from(d);
2509                    assert!(
2510                        seen_kind == d_kind,
2511                        "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2512                    );
2513
2514                    if seen > d {
2515                        self.row.data.truncate(start);
2516                        return Err(InvalidRangeError::MisorderedRangeBounds.into());
2517                    }
2518                }
2519            }
2520            actual_datums += 1;
2521        }
2522
2523        assert!(
2524            actual_datums == expected_datums,
2525            "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2526        );
2527
2528        Ok(())
2529    }
2530
2531    /// Clears the contents of the packer without de-allocating its backing memory.
2532    pub fn clear(&mut self) {
2533        self.row.data.clear();
2534    }
2535
2536    /// Truncates the underlying storage to the specified byte position.
2537    ///
2538    /// # Safety
2539    ///
2540    /// `pos` MUST specify a byte offset that lies on a datum boundary.
2541    /// If `pos` specifies a byte offset that is *within* a datum, the row
2542    /// packer will produce an invalid row, the unpacking of which may
2543    /// trigger undefined behavior!
2544    ///
2545    /// To find the byte offset of a datum boundary, inspect the packer's
2546    /// byte length by calling `packer.data().len()` after pushing the desired
2547    /// number of datums onto the packer.
2548    pub unsafe fn truncate(&mut self, pos: usize) {
2549        self.row.data.truncate(pos)
2550    }
2551
2552    /// Truncates the underlying row to contain at most the first `n` datums.
2553    pub fn truncate_datums(&mut self, n: usize) {
2554        let prev_len = self.row.data.len();
2555        let mut iter = self.row.iter();
2556        for _ in iter.by_ref().take(n) {}
2557        let next_len = iter.data.len();
2558        // SAFETY: iterator offsets always lie on a datum boundary.
2559        unsafe { self.truncate(prev_len - next_len) }
2560    }
2561
2562    /// Returns the total amount of bytes used by the underlying row.
2563    pub fn byte_len(&self) -> usize {
2564        self.row.byte_len()
2565    }
2566}
2567
2568impl<'a> IntoIterator for &'a Row {
2569    type Item = Datum<'a>;
2570    type IntoIter = DatumListIter<'a>;
2571    fn into_iter(self) -> DatumListIter<'a> {
2572        self.iter()
2573    }
2574}
2575
2576impl fmt::Debug for Row {
2577    /// Debug representation using the internal datums
2578    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2579        f.write_str("Row{")?;
2580        f.debug_list().entries(self.iter()).finish()?;
2581        f.write_str("}")
2582    }
2583}
2584
2585impl fmt::Display for Row {
2586    /// Display representation using the internal datums
2587    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2588        f.write_str("(")?;
2589        for (i, datum) in self.iter().enumerate() {
2590            if i != 0 {
2591                f.write_str(", ")?;
2592            }
2593            write!(f, "{}", datum)?;
2594        }
2595        f.write_str(")")
2596    }
2597}
2598
2599impl<'a> DatumList<'a> {
2600    pub fn empty() -> DatumList<'static> {
2601        DatumList { data: &[] }
2602    }
2603
2604    pub fn iter(&self) -> DatumListIter<'a> {
2605        DatumListIter { data: self.data }
2606    }
2607
2608    /// For debugging only
2609    pub fn data(&self) -> &'a [u8] {
2610        self.data
2611    }
2612}
2613
2614impl<'a> IntoIterator for &'a DatumList<'a> {
2615    type Item = Datum<'a>;
2616    type IntoIter = DatumListIter<'a>;
2617    fn into_iter(self) -> DatumListIter<'a> {
2618        self.iter()
2619    }
2620}
2621
2622impl<'a> Iterator for DatumListIter<'a> {
2623    type Item = Datum<'a>;
2624    fn next(&mut self) -> Option<Self::Item> {
2625        if self.data.is_empty() {
2626            None
2627        } else {
2628            Some(unsafe { read_datum(&mut self.data) })
2629        }
2630    }
2631}
2632
2633impl<'a> DatumMap<'a> {
2634    pub fn empty() -> DatumMap<'static> {
2635        DatumMap { data: &[] }
2636    }
2637
2638    pub fn iter(&self) -> DatumDictIter<'a> {
2639        DatumDictIter {
2640            data: self.data,
2641            prev_key: None,
2642        }
2643    }
2644
2645    /// For debugging only
2646    pub fn data(&self) -> &'a [u8] {
2647        self.data
2648    }
2649}
2650
2651impl<'a> Debug for DatumMap<'a> {
2652    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2653        f.debug_map().entries(self.iter()).finish()
2654    }
2655}
2656
2657impl<'a> IntoIterator for &'a DatumMap<'a> {
2658    type Item = (&'a str, Datum<'a>);
2659    type IntoIter = DatumDictIter<'a>;
2660    fn into_iter(self) -> DatumDictIter<'a> {
2661        self.iter()
2662    }
2663}
2664
2665impl<'a> Iterator for DatumDictIter<'a> {
2666    type Item = (&'a str, Datum<'a>);
2667    fn next(&mut self) -> Option<Self::Item> {
2668        if self.data.is_empty() {
2669            None
2670        } else {
2671            let key_tag =
2672                Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2673            assert!(
2674                key_tag == Tag::StringTiny
2675                    || key_tag == Tag::StringShort
2676                    || key_tag == Tag::StringLong
2677                    || key_tag == Tag::StringHuge,
2678                "Dict keys must be strings, got {:?}",
2679                key_tag
2680            );
2681            let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
2682            let val = unsafe { read_datum(&mut self.data) };
2683
2684            // if in debug mode, sanity check keys
2685            if cfg!(debug_assertions) {
2686                if let Some(prev_key) = self.prev_key {
2687                    debug_assert!(
2688                        prev_key < key,
2689                        "Dict keys must be unique and given in ascending order: {} came before {}",
2690                        prev_key,
2691                        key
2692                    );
2693                }
2694                self.prev_key = Some(key);
2695            }
2696
2697            Some((key, val))
2698        }
2699    }
2700}
2701
2702impl RowArena {
2703    pub fn new() -> Self {
2704        RowArena {
2705            inner: RefCell::new(vec![]),
2706        }
2707    }
2708
2709    /// Creates a `RowArena` with a hint of how many rows will be created in the arena, to avoid
2710    /// reallocations of its internal vector.
2711    pub fn with_capacity(capacity: usize) -> Self {
2712        RowArena {
2713            inner: RefCell::new(Vec::with_capacity(capacity)),
2714        }
2715    }
2716
2717    /// Does a `reserve` on the underlying `Vec`. Call this when you expect `additional` more datums
2718    /// to be created in this arena.
2719    pub fn reserve(&self, additional: usize) {
2720        self.inner.borrow_mut().reserve(additional);
2721    }
2722
2723    /// Take ownership of `bytes` for the lifetime of the arena.
2724    #[allow(clippy::transmute_ptr_to_ptr)]
2725    pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
2726        let mut inner = self.inner.borrow_mut();
2727        inner.push(bytes);
2728        let owned_bytes = &inner[inner.len() - 1];
2729        unsafe {
2730            // This is safe because:
2731            //   * We only ever append to self.inner, so the byte vector
2732            //     will live as long as the arena.
2733            //   * We return a reference to the byte vector's contents, so it's
2734            //     okay if self.inner reallocates and moves the byte
2735            //     vector.
2736            //   * We don't allow access to the byte vector itself, so it will
2737            //     never reallocate.
2738            transmute::<&[u8], &'a [u8]>(owned_bytes)
2739        }
2740    }
2741
2742    /// Take ownership of `string` for the lifetime of the arena.
2743    pub fn push_string<'a>(&'a self, string: String) -> &'a str {
2744        let owned_bytes = self.push_bytes(string.into_bytes());
2745        unsafe {
2746            // This is safe because we know it was a `String` just before.
2747            std::str::from_utf8_unchecked(owned_bytes)
2748        }
2749    }
2750
2751    /// Take ownership of `row` for the lifetime of the arena, returning a
2752    /// reference to the first datum in the row.
2753    ///
2754    /// If we had an owned datum type, this method would be much clearer, and
2755    /// would be called `push_owned_datum`.
2756    pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
2757        let mut inner = self.inner.borrow_mut();
2758        inner.push(row.data.into_vec());
2759        unsafe {
2760            // This is safe because:
2761            //   * We only ever append to self.inner, so the row data will live
2762            //     as long as the arena.
2763            //   * We force the row data into its own heap allocation--
2764            //     importantly, we do NOT store the SmallVec, which might be
2765            //     storing data inline--so it's okay if self.inner reallocates
2766            //     and moves the row.
2767            //   * We don't allow access to the byte vector itself, so it will
2768            //     never reallocate.
2769            let datum = read_datum(&mut &inner[inner.len() - 1][..]);
2770            transmute::<Datum<'_>, Datum<'a>>(datum)
2771        }
2772    }
2773
2774    /// Equivalent to `push_unary_row` but returns a `DatumNested` rather than a
2775    /// `Datum`.
2776    fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
2777        let mut inner = self.inner.borrow_mut();
2778        inner.push(row.data.into_vec());
2779        unsafe {
2780            // This is safe because:
2781            //   * We only ever append to self.inner, so the row data will live
2782            //     as long as the arena.
2783            //   * We force the row data into its own heap allocation--
2784            //     importantly, we do NOT store the SmallVec, which might be
2785            //     storing data inline--so it's okay if self.inner reallocates
2786            //     and moves the row.
2787            //   * We don't allow access to the byte vector itself, so it will
2788            //     never reallocate.
2789            let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
2790            transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
2791        }
2792    }
2793
2794    /// Convenience function to make a new `Row` containing a single datum, and
2795    /// take ownership of it for the lifetime of the arena
2796    ///
2797    /// ```
2798    /// # use mz_repr::{RowArena, Datum};
2799    /// let arena = RowArena::new();
2800    /// let datum = arena.make_datum(|packer| {
2801    ///   packer.push_list(&[Datum::String("hello"), Datum::String("world")]);
2802    /// });
2803    /// assert_eq!(datum.unwrap_list().iter().collect::<Vec<_>>(), vec![Datum::String("hello"), Datum::String("world")]);
2804    /// ```
2805    pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
2806    where
2807        F: FnOnce(&mut RowPacker),
2808    {
2809        let mut row = Row::default();
2810        f(&mut row.packer());
2811        self.push_unary_row(row)
2812    }
2813
2814    /// Convenience function identical to `make_datum` but instead returns a
2815    /// `DatumNested`.
2816    pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
2817    where
2818        F: FnOnce(&mut RowPacker),
2819    {
2820        let mut row = Row::default();
2821        f(&mut row.packer());
2822        self.push_unary_row_datum_nested(row)
2823    }
2824
2825    /// Like [`RowArena::make_datum`], but the provided closure can return an error.
2826    pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
2827    where
2828        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2829    {
2830        let mut row = Row::default();
2831        f(&mut row.packer())?;
2832        Ok(self.push_unary_row(row))
2833    }
2834
2835    /// Clear the contents of the arena.
2836    pub fn clear(&mut self) {
2837        self.inner.borrow_mut().clear();
2838    }
2839}
2840
2841impl Default for RowArena {
2842    fn default() -> RowArena {
2843        RowArena::new()
2844    }
2845}
2846
2847/// A thread-local row, which can be borrowed and returned.
2848/// # Example
2849///
2850/// Use this type instead of creating a new row:
2851/// ```
2852/// use mz_repr::SharedRow;
2853///
2854/// let mut row_builder = SharedRow::get();
2855/// ```
2856///
2857/// This allows us to reuse an existing row allocation instead of creating a new one or retaining
2858/// an allocation locally. Additionally, we can observe the size of the local row in a central
2859/// place and potentially reallocate to reduce memory needs.
2860///
2861/// # Panic
2862///
2863/// [`SharedRow::get`] panics when trying to obtain multiple references to the shared row.
2864#[derive(Debug)]
2865pub struct SharedRow(Row);
2866
2867impl SharedRow {
2868    thread_local! {
2869        /// A thread-local slot containing a shared Row that can be temporarily used by a function.
2870        /// There can be at most one active user of this Row, which is tracked by the state of the
2871        /// `Option<_>` wrapper. When it is `Some(..)`, the row is available for using. When it
2872        /// is `None`, it is not, and the constructor will panic if a thread attempts to use it.
2873        static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
2874    }
2875
2876    /// Get the shared row.
2877    ///
2878    /// The row's contents are cleared before returning it.
2879    ///
2880    /// # Panic
2881    ///
2882    /// Panics when the row is already borrowed elsewhere.
2883    pub fn get() -> Self {
2884        let mut row = Self::SHARED_ROW
2885            .take()
2886            .expect("attempted to borrow already borrowed SharedRow");
2887        // Clear row
2888        row.packer();
2889        Self(row)
2890    }
2891
2892    /// Gets the shared row and uses it to pack `iter`.
2893    pub fn pack<'a, I, D>(iter: I) -> Row
2894    where
2895        I: IntoIterator<Item = D>,
2896        D: Borrow<Datum<'a>>,
2897    {
2898        let mut row_builder = Self::get();
2899        let mut row_packer = row_builder.packer();
2900        row_packer.extend(iter);
2901        row_builder.clone()
2902    }
2903}
2904
2905impl std::ops::Deref for SharedRow {
2906    type Target = Row;
2907
2908    fn deref(&self) -> &Self::Target {
2909        &self.0
2910    }
2911}
2912
2913impl std::ops::DerefMut for SharedRow {
2914    fn deref_mut(&mut self) -> &mut Self::Target {
2915        &mut self.0
2916    }
2917}
2918
2919impl Drop for SharedRow {
2920    fn drop(&mut self) {
2921        // Take the Row allocation from this instance and put it back in the thread local slot for
2922        // the next user. The Row in `self` is replaced with an empty Row which does not allocate.
2923        Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
2924    }
2925}
2926
2927#[cfg(test)]
2928mod tests {
2929    use chrono::{DateTime, NaiveDate};
2930    use mz_ore::{assert_err, assert_none};
2931
2932    use crate::ScalarType;
2933
2934    use super::*;
2935
2936    #[mz_ore::test]
2937    fn test_assumptions() {
2938        assert_eq!(size_of::<Tag>(), 1);
2939        #[cfg(target_endian = "big")]
2940        {
2941            // if you want to run this on a big-endian cpu, we'll need big-endian versions of the serialization code
2942            assert!(false);
2943        }
2944    }
2945
2946    #[mz_ore::test]
2947    fn miri_test_arena() {
2948        let arena = RowArena::new();
2949
2950        assert_eq!(arena.push_string("".to_owned()), "");
2951        assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
2952
2953        let empty: &[u8] = &[];
2954        assert_eq!(arena.push_bytes(vec![]), empty);
2955        assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
2956
2957        let mut row = Row::default();
2958        let mut packer = row.packer();
2959        packer.push_dict_with(|row| {
2960            row.push(Datum::String("a"));
2961            row.push_list_with(|row| {
2962                row.push(Datum::String("one"));
2963                row.push(Datum::String("two"));
2964                row.push(Datum::String("three"));
2965            });
2966            row.push(Datum::String("b"));
2967            row.push(Datum::String("c"));
2968        });
2969        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
2970    }
2971
2972    #[mz_ore::test]
2973    fn miri_test_round_trip() {
2974        fn round_trip(datums: Vec<Datum>) {
2975            let row = Row::pack(datums.clone());
2976
2977            // When run under miri this catches undefined bytes written to data
2978            // eg by calling push_copy! on a type which contains undefined padding values
2979            println!("{:?}", row.data());
2980
2981            let datums2 = row.iter().collect::<Vec<_>>();
2982            let datums3 = row.unpack();
2983            assert_eq!(datums, datums2);
2984            assert_eq!(datums, datums3);
2985        }
2986
2987        round_trip(vec![]);
2988        round_trip(
2989            ScalarType::enumerate()
2990                .iter()
2991                .flat_map(|r#type| r#type.interesting_datums())
2992                .collect(),
2993        );
2994        round_trip(vec![
2995            Datum::Null,
2996            Datum::Null,
2997            Datum::False,
2998            Datum::True,
2999            Datum::Int16(-21),
3000            Datum::Int32(-42),
3001            Datum::Int64(-2_147_483_648 - 42),
3002            Datum::UInt8(0),
3003            Datum::UInt8(1),
3004            Datum::UInt16(0),
3005            Datum::UInt16(1),
3006            Datum::UInt16(1 << 8),
3007            Datum::UInt32(0),
3008            Datum::UInt32(1),
3009            Datum::UInt32(1 << 8),
3010            Datum::UInt32(1 << 16),
3011            Datum::UInt32(1 << 24),
3012            Datum::UInt64(0),
3013            Datum::UInt64(1),
3014            Datum::UInt64(1 << 8),
3015            Datum::UInt64(1 << 16),
3016            Datum::UInt64(1 << 24),
3017            Datum::UInt64(1 << 32),
3018            Datum::UInt64(1 << 40),
3019            Datum::UInt64(1 << 48),
3020            Datum::UInt64(1 << 56),
3021            Datum::Float32(OrderedFloat::from(-42.12)),
3022            Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
3023            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
3024            Datum::Timestamp(
3025                CheckedTimestamp::from_timestamplike(
3026                    NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
3027                        .unwrap()
3028                        .and_hms_opt(14, 32, 11)
3029                        .unwrap(),
3030                )
3031                .unwrap(),
3032            ),
3033            Datum::TimestampTz(
3034                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3035                    .unwrap(),
3036            ),
3037            Datum::Interval(Interval {
3038                months: 312,
3039                ..Default::default()
3040            }),
3041            Datum::Interval(Interval::new(0, 0, 1_012_312)),
3042            Datum::Bytes(&[]),
3043            Datum::Bytes(&[0, 2, 1, 255]),
3044            Datum::String(""),
3045            Datum::String("العَرَبِيَّة"),
3046        ]);
3047    }
3048
3049    #[mz_ore::test]
3050    fn test_array() {
3051        // Construct an array using `Row::push_array` and verify that it unpacks
3052        // correctly.
3053        const DIM: ArrayDimension = ArrayDimension {
3054            lower_bound: 2,
3055            length: 2,
3056        };
3057        let mut row = Row::default();
3058        let mut packer = row.packer();
3059        packer
3060            .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3061            .unwrap();
3062        let arr1 = row.unpack_first().unwrap_array();
3063        assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3064        assert_eq!(
3065            arr1.elements().into_iter().collect::<Vec<_>>(),
3066            vec![Datum::Int32(1), Datum::Int32(2)]
3067        );
3068
3069        // Pack a previously-constructed `Datum::Array` and verify that it
3070        // unpacks correctly.
3071        let row = Row::pack_slice(&[Datum::Array(arr1)]);
3072        let arr2 = row.unpack_first().unwrap_array();
3073        assert_eq!(arr1, arr2);
3074    }
3075
3076    #[mz_ore::test]
3077    fn test_multidimensional_array() {
3078        let datums = vec![
3079            Datum::Int32(1),
3080            Datum::Int32(2),
3081            Datum::Int32(3),
3082            Datum::Int32(4),
3083            Datum::Int32(5),
3084            Datum::Int32(6),
3085            Datum::Int32(7),
3086            Datum::Int32(8),
3087        ];
3088
3089        let mut row = Row::default();
3090        let mut packer = row.packer();
3091        packer
3092            .try_push_array(
3093                &[
3094                    ArrayDimension {
3095                        lower_bound: 1,
3096                        length: 1,
3097                    },
3098                    ArrayDimension {
3099                        lower_bound: 1,
3100                        length: 4,
3101                    },
3102                    ArrayDimension {
3103                        lower_bound: 1,
3104                        length: 2,
3105                    },
3106                ],
3107                &datums,
3108            )
3109            .unwrap();
3110        let array = row.unpack_first().unwrap_array();
3111        assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3112    }
3113
3114    #[mz_ore::test]
3115    fn test_array_max_dimensions() {
3116        let mut row = Row::default();
3117        let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3118
3119        // An array with one too many dimensions should be rejected.
3120        let res = row.packer().try_push_array(
3121            &vec![
3122                ArrayDimension {
3123                    lower_bound: 1,
3124                    length: 1
3125                };
3126                max_dims + 1
3127            ],
3128            vec![Datum::Int32(4)],
3129        );
3130        assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3131        assert!(row.data.is_empty());
3132
3133        // An array with exactly the maximum allowable dimensions should be
3134        // accepted.
3135        row.packer()
3136            .try_push_array(
3137                &vec![
3138                    ArrayDimension {
3139                        lower_bound: 1,
3140                        length: 1
3141                    };
3142                    max_dims
3143                ],
3144                vec![Datum::Int32(4)],
3145            )
3146            .unwrap();
3147    }
3148
3149    #[mz_ore::test]
3150    fn test_array_wrong_cardinality() {
3151        let mut row = Row::default();
3152        let res = row.packer().try_push_array(
3153            &[
3154                ArrayDimension {
3155                    lower_bound: 1,
3156                    length: 2,
3157                },
3158                ArrayDimension {
3159                    lower_bound: 1,
3160                    length: 3,
3161                },
3162            ],
3163            vec![Datum::Int32(1), Datum::Int32(2)],
3164        );
3165        assert_eq!(
3166            res,
3167            Err(InvalidArrayError::WrongCardinality {
3168                actual: 2,
3169                expected: 6,
3170            })
3171        );
3172        assert!(row.data.is_empty());
3173    }
3174
3175    #[mz_ore::test]
3176    fn test_nesting() {
3177        let mut row = Row::default();
3178        row.packer().push_dict_with(|row| {
3179            row.push(Datum::String("favourites"));
3180            row.push_list_with(|row| {
3181                row.push(Datum::String("ice cream"));
3182                row.push(Datum::String("oreos"));
3183                row.push(Datum::String("cheesecake"));
3184            });
3185            row.push(Datum::String("name"));
3186            row.push(Datum::String("bob"));
3187        });
3188
3189        let mut iter = row.unpack_first().unwrap_map().iter();
3190
3191        let (k, v) = iter.next().unwrap();
3192        assert_eq!(k, "favourites");
3193        assert_eq!(
3194            v.unwrap_list().iter().collect::<Vec<_>>(),
3195            vec![
3196                Datum::String("ice cream"),
3197                Datum::String("oreos"),
3198                Datum::String("cheesecake"),
3199            ]
3200        );
3201
3202        let (k, v) = iter.next().unwrap();
3203        assert_eq!(k, "name");
3204        assert_eq!(v, Datum::String("bob"));
3205    }
3206
3207    #[mz_ore::test]
3208    fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3209        let pack = |ok| {
3210            let mut row = Row::default();
3211            row.packer().push_dict_with(|row| {
3212                if ok {
3213                    row.push(Datum::String("key"));
3214                    row.push(Datum::Int32(42));
3215                    Ok(7)
3216                } else {
3217                    Err("fail")
3218                }
3219            })?;
3220            Ok(row)
3221        };
3222
3223        assert_eq!(pack(false), Err("fail"));
3224
3225        let row = pack(true)?;
3226        let mut dict = row.unpack_first().unwrap_map().iter();
3227        assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3228        assert_eq!(dict.next(), None);
3229
3230        Ok(())
3231    }
3232
3233    #[mz_ore::test]
3234    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
3235    fn test_datum_sizes() {
3236        let arena = RowArena::new();
3237
3238        // Test the claims about various datum sizes.
3239        let values_of_interest = vec![
3240            Datum::Null,
3241            Datum::False,
3242            Datum::Int16(0),
3243            Datum::Int32(0),
3244            Datum::Int64(0),
3245            Datum::UInt8(0),
3246            Datum::UInt8(1),
3247            Datum::UInt16(0),
3248            Datum::UInt16(1),
3249            Datum::UInt16(1 << 8),
3250            Datum::UInt32(0),
3251            Datum::UInt32(1),
3252            Datum::UInt32(1 << 8),
3253            Datum::UInt32(1 << 16),
3254            Datum::UInt32(1 << 24),
3255            Datum::UInt64(0),
3256            Datum::UInt64(1),
3257            Datum::UInt64(1 << 8),
3258            Datum::UInt64(1 << 16),
3259            Datum::UInt64(1 << 24),
3260            Datum::UInt64(1 << 32),
3261            Datum::UInt64(1 << 40),
3262            Datum::UInt64(1 << 48),
3263            Datum::UInt64(1 << 56),
3264            Datum::Float32(OrderedFloat(0.0)),
3265            Datum::Float64(OrderedFloat(0.0)),
3266            Datum::from(numeric::Numeric::from(0)),
3267            Datum::from(numeric::Numeric::from(1000)),
3268            Datum::from(numeric::Numeric::from(9999)),
3269            Datum::Date(
3270                NaiveDate::from_ymd_opt(1, 1, 1)
3271                    .unwrap()
3272                    .try_into()
3273                    .unwrap(),
3274            ),
3275            Datum::Timestamp(
3276                CheckedTimestamp::from_timestamplike(
3277                    DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3278                )
3279                .unwrap(),
3280            ),
3281            Datum::TimestampTz(
3282                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3283                    .unwrap(),
3284            ),
3285            Datum::Interval(Interval::default()),
3286            Datum::Bytes(&[]),
3287            Datum::String(""),
3288            Datum::JsonNull,
3289            Datum::Range(Range { inner: None }),
3290            arena.make_datum(|packer| {
3291                packer
3292                    .push_range(Range::new(Some((
3293                        RangeLowerBound::new(Datum::Int32(-1), true),
3294                        RangeUpperBound::new(Datum::Int32(1), true),
3295                    ))))
3296                    .unwrap();
3297            }),
3298        ];
3299        for value in values_of_interest {
3300            if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3301                panic!("Disparity in claimed size for {:?}", value);
3302            }
3303        }
3304    }
3305
3306    #[mz_ore::test]
3307    fn test_range_errors() {
3308        fn test_range_errors_inner<'a>(
3309            datums: Vec<Vec<Datum<'a>>>,
3310        ) -> Result<(), InvalidRangeError> {
3311            let mut row = Row::default();
3312            let row_len = row.byte_len();
3313            let mut packer = row.packer();
3314            let r = packer.push_range_with(
3315                RangeLowerBound {
3316                    inclusive: true,
3317                    bound: Some(|row: &mut RowPacker| {
3318                        for d in &datums[0] {
3319                            row.push(d);
3320                        }
3321                        Ok(())
3322                    }),
3323                },
3324                RangeUpperBound {
3325                    inclusive: true,
3326                    bound: Some(|row: &mut RowPacker| {
3327                        for d in &datums[1] {
3328                            row.push(d);
3329                        }
3330                        Ok(())
3331                    }),
3332                },
3333            );
3334
3335            assert_eq!(row_len, row.byte_len());
3336
3337            r
3338        }
3339
3340        for panicking_case in [
3341            vec![vec![Datum::Int32(1)], vec![]],
3342            vec![
3343                vec![Datum::Int32(1), Datum::Int32(2)],
3344                vec![Datum::Int32(3)],
3345            ],
3346            vec![
3347                vec![Datum::Int32(1)],
3348                vec![Datum::Int32(2), Datum::Int32(3)],
3349            ],
3350            vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3351            vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3352            vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3353            vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3354        ] {
3355            #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
3356            let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3357            assert_err!(result);
3358        }
3359
3360        let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3361        assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3362    }
3363
3364    /// Lists have a variable-length encoding for their lengths. We test each case here.
3365    #[mz_ore::test]
3366    #[cfg_attr(miri, ignore)] // slow
3367    fn test_list_encoding() {
3368        fn test_list_encoding_inner(len: usize) {
3369            let list_elem = |i: usize| {
3370                if i % 2 == 0 {
3371                    Datum::False
3372                } else {
3373                    Datum::True
3374                }
3375            };
3376            let mut row = Row::default();
3377            {
3378                // Push some stuff.
3379                let mut packer = row.packer();
3380                packer.push(Datum::String("start"));
3381                packer.push_list_with(|packer| {
3382                    for i in 0..len {
3383                        packer.push(list_elem(i));
3384                    }
3385                });
3386                packer.push(Datum::String("end"));
3387            }
3388            // Check that we read back exactly what we pushed.
3389            let mut row_it = row.iter();
3390            assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3391            match row_it.next().unwrap() {
3392                Datum::List(list) => {
3393                    let mut list_it = list.iter();
3394                    for i in 0..len {
3395                        assert_eq!(list_it.next().unwrap(), list_elem(i));
3396                    }
3397                    assert_none!(list_it.next());
3398                }
3399                _ => panic!("expected Datum::List"),
3400            }
3401            assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3402            assert_none!(row_it.next());
3403        }
3404
3405        test_list_encoding_inner(0);
3406        test_list_encoding_inner(1);
3407        test_list_encoding_inner(10);
3408        test_list_encoding_inner(TINY - 1); // tiny
3409        test_list_encoding_inner(TINY + 1); // short
3410        test_list_encoding_inner(SHORT + 1); // long
3411
3412        // The biggest one takes 40 s on my laptop, probably not worth it.
3413        //test_list_encoding_inner(LONG + 1); // huge
3414    }
3415}