Skip to main content

mz_repr/
row.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::cell::{Cell, RefCell};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::hash::{Hash, Hasher};
16use std::mem::{size_of, transmute};
17use std::ops::Deref;
18use std::str;
19
20use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
21use compact_bytes::CompactBytes;
22use mz_ore::cast::{CastFrom, ReinterpretCast};
23use mz_ore::soft_assert_no_log;
24use mz_ore::vec::Vector;
25use mz_persist_types::Codec64;
26use num_enum::{IntoPrimitive, TryFromPrimitive};
27use ordered_float::OrderedFloat;
28use proptest::prelude::*;
29use proptest::strategy::{BoxedStrategy, Strategy};
30use serde::{Deserialize, Serialize};
31use uuid::Uuid;
32
33use crate::adt::array::{
34    Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
35};
36use crate::adt::date::Date;
37use crate::adt::interval::Interval;
38use crate::adt::mz_acl_item::{AclItem, MzAclItem};
39use crate::adt::numeric;
40use crate::adt::numeric::Numeric;
41use crate::adt::range::{
42    self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
43};
44use crate::adt::timestamp::CheckedTimestamp;
45use crate::scalar::{DatumKind, arb_datum};
46use crate::{Datum, RelationDesc, Timestamp};
47
48pub(crate) mod encode;
49pub mod iter;
50
51include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
52
53/// A packed representation for `Datum`s.
54///
55/// `Datum` is easy to work with but very space inefficient. A `Datum::Int32(42)`
56/// is laid out in memory like this:
57///
58///   tag: 3
59///   padding: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
60///   data: 0 0 0 42
61///   padding: 0 0 0 0 0 0 0 0 0 0 0 0
62///
63/// For a total of 32 bytes! The second set of padding is needed in case we were
64/// to write a 16-byte datum into this location. The first set of padding is
65/// needed to align that hypothetical decimal to a 16 bytes boundary.
66///
67/// A `Row` stores zero or more `Datum`s without any padding. We avoid the need
68/// for the first set of padding by only providing access to the `Datum`s via
69/// calls to `ptr::read_unaligned`, which on modern x86 is barely penalized. We
70/// avoid the need for the second set of padding by not providing mutable access
71/// to the `Datum`. Instead, `Row` is append-only.
72///
73/// A `Row` can be built from a collection of `Datum`s using `Row::pack`, but it
74/// is more efficient to use `Row::pack_slice` so that a right-sized allocation
75/// can be created. If that is not possible, consider using the row buffer
76/// pattern: allocate one row, pack into it, and then call [`Row::clone`] to
77/// receive a copy of that row, leaving behind the original allocation to pack
78/// future rows.
79///
80/// Creating a row via [`Row::pack_slice`]:
81///
82/// ```
83/// # use mz_repr::{Row, Datum};
84/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
85/// assert_eq!(row.unpack(), vec![Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)])
86/// ```
87///
88/// `Row`s can be unpacked by iterating over them:
89///
90/// ```
91/// # use mz_repr::{Row, Datum};
92/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
93/// assert_eq!(row.iter().nth(1).unwrap(), Datum::Int32(1));
94/// ```
95///
96/// If you want random access to the `Datum`s in a `Row`, use `Row::unpack` to create a `Vec<Datum>`
97/// ```
98/// # use mz_repr::{Row, Datum};
99/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
100/// let datums = row.unpack();
101/// assert_eq!(datums[1], Datum::Int32(1));
102/// ```
103///
104/// # Performance
105///
106/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
107/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
108/// avoids the allocations involved in `Row::new()`.
109#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
110pub struct Row {
111    data: CompactBytes,
112}
113
114impl Row {
115    const SIZE: usize = CompactBytes::MAX_INLINE;
116
117    /// A variant of `Row::from_proto` that allows for reuse of internal allocs
118    /// and validates the decoding against a provided [`RelationDesc`].
119    pub fn decode_from_proto(
120        &mut self,
121        proto: &ProtoRow,
122        desc: &RelationDesc,
123    ) -> Result<(), String> {
124        let mut packer = self.packer();
125        for (col_idx, _, _) in desc.iter_all() {
126            let d = match proto.datums.get(col_idx.to_raw()) {
127                Some(x) => x,
128                None => {
129                    packer.push(Datum::Null);
130                    continue;
131                }
132            };
133            packer.try_push_proto(d)?;
134        }
135
136        Ok(())
137    }
138
139    /// Allocate an empty `Row` with a pre-allocated capacity.
140    #[inline]
141    pub fn with_capacity(cap: usize) -> Self {
142        Self {
143            data: CompactBytes::with_capacity(cap),
144        }
145    }
146
147    /// Create an empty `Row`.
148    #[inline]
149    pub const fn empty() -> Self {
150        Self {
151            data: CompactBytes::empty(),
152        }
153    }
154
155    /// Creates a new row from supplied bytes.
156    ///
157    /// # Safety
158    ///
159    /// This method relies on `data` being an appropriate row encoding, and can
160    /// result in unsafety if this is not the case.
161    pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
162        Row {
163            data: CompactBytes::new(data),
164        }
165    }
166
167    /// Constructs a [`RowPacker`] that will pack datums into this row's
168    /// allocation.
169    ///
170    /// This method clears the existing contents of the row, but retains the
171    /// allocation.
172    pub fn packer(&mut self) -> RowPacker<'_> {
173        self.clear();
174        RowPacker { row: self }
175    }
176
177    /// Take some `Datum`s and pack them into a `Row`.
178    ///
179    /// This method builds a `Row` by repeatedly increasing the backing
180    /// allocation. If the contents of the iterator are known ahead of
181    /// time, consider [`Row::with_capacity`] to right-size the allocation
182    /// first, and then [`RowPacker::extend`] to populate it with `Datum`s.
183    /// This avoids the repeated allocation resizing and copying.
184    pub fn pack<'a, I, D>(iter: I) -> Row
185    where
186        I: IntoIterator<Item = D>,
187        D: Borrow<Datum<'a>>,
188    {
189        let mut row = Row::default();
190        row.packer().extend(iter);
191        row
192    }
193
194    /// Use `self` to pack `iter`, and then clone the result.
195    ///
196    /// This is a convenience method meant to reduce boilerplate around row
197    /// formation.
198    pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
199    where
200        I: IntoIterator<Item = D>,
201        D: Borrow<Datum<'a>>,
202    {
203        self.packer().extend(iter);
204        self.clone()
205    }
206
207    /// Like [`Row::pack`], but the provided iterator is allowed to produce an
208    /// error, in which case the packing operation is aborted and the error
209    /// returned.
210    pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
211    where
212        I: IntoIterator<Item = Result<D, E>>,
213        D: Borrow<Datum<'a>>,
214    {
215        let mut row = Row::default();
216        row.packer().try_extend(iter)?;
217        Ok(row)
218    }
219
220    /// Pack a slice of `Datum`s into a `Row`.
221    ///
222    /// This method has the advantage over `pack` that it can determine the required
223    /// allocation before packing the elements, ensuring only one allocation and no
224    /// redundant copies required.
225    pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
226        // Pre-allocate the needed number of bytes.
227        let mut row = Row::with_capacity(datums_size(slice.iter()));
228        row.packer().extend(slice.iter());
229        row
230    }
231
232    /// Returns the total amount of bytes used by this row.
233    pub fn byte_len(&self) -> usize {
234        let heap_size = if self.data.spilled() {
235            self.data.len()
236        } else {
237            0
238        };
239        let inline_size = std::mem::size_of::<Self>();
240        inline_size.saturating_add(heap_size)
241    }
242
243    /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself.
244    pub fn data_len(&self) -> usize {
245        self.data.len()
246    }
247
248    /// Returns the total capacity in bytes used by this row.
249    pub fn byte_capacity(&self) -> usize {
250        self.data.capacity()
251    }
252
253    /// Extracts a Row slice containing the entire [`Row`].
254    #[inline]
255    pub fn as_row_ref(&self) -> &RowRef {
256        // SAFETY: `Row` contains valid row data, by construction.
257        unsafe { RowRef::from_slice(self.data.as_slice()) }
258    }
259
260    /// Clear the contents of the [`Row`], leaving any allocation in place.
261    #[inline]
262    fn clear(&mut self) {
263        self.data.clear();
264    }
265}
266
267impl Borrow<RowRef> for Row {
268    #[inline]
269    fn borrow(&self) -> &RowRef {
270        self.as_row_ref()
271    }
272}
273
274impl AsRef<RowRef> for Row {
275    #[inline]
276    fn as_ref(&self) -> &RowRef {
277        self.as_row_ref()
278    }
279}
280
281impl Deref for Row {
282    type Target = RowRef;
283
284    #[inline]
285    fn deref(&self) -> &Self::Target {
286        self.as_row_ref()
287    }
288}
289
290// Nothing depends on Row being exactly 24, we just want to add visibility to the size.
291static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
292
293impl Clone for Row {
294    fn clone(&self) -> Self {
295        Row {
296            data: self.data.clone(),
297        }
298    }
299
300    fn clone_from(&mut self, source: &Self) {
301        self.data.clone_from(&source.data);
302    }
303}
304
305// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
306impl std::hash::Hash for Row {
307    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
308        self.as_row_ref().hash(state)
309    }
310}
311
312impl Arbitrary for Row {
313    type Parameters = prop::collection::SizeRange;
314    type Strategy = BoxedStrategy<Row>;
315
316    fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
317        prop::collection::vec(arb_datum(true), size)
318            .prop_map(|items| {
319                let mut row = Row::default();
320                let mut packer = row.packer();
321                for item in items.iter() {
322                    let datum: Datum<'_> = item.into();
323                    packer.push(datum);
324                }
325                row
326            })
327            .boxed()
328    }
329}
330
331impl PartialOrd for Row {
332    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
333        Some(self.cmp(other))
334    }
335}
336
337impl Ord for Row {
338    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
339        self.as_ref().cmp(other.as_ref())
340    }
341}
342
343#[allow(missing_debug_implementations)]
344mod columnation {
345    use columnation::{Columnation, Region};
346    use mz_ore::region::LgAllocRegion;
347
348    use crate::Row;
349
350    /// Region allocation for `Row` data.
351    ///
352    /// Content bytes are stored in stable contiguous memory locations,
353    /// and then a `Row` referencing them is falsified.
354    pub struct RowStack {
355        region: LgAllocRegion<u8>,
356    }
357
358    impl RowStack {
359        const LIMIT: usize = 2 << 20;
360    }
361
362    // Implement `Default` manually to specify a region allocation limit.
363    impl Default for RowStack {
364        fn default() -> Self {
365            Self {
366                // Limit the region size to 2MiB.
367                region: LgAllocRegion::with_limit(Self::LIMIT),
368            }
369        }
370    }
371
372    impl Columnation for Row {
373        type InnerRegion = RowStack;
374    }
375
376    impl Region for RowStack {
377        type Item = Row;
378        #[inline]
379        fn clear(&mut self) {
380            self.region.clear();
381        }
382        #[inline(always)]
383        unsafe fn copy(&mut self, item: &Row) -> Row {
384            if item.data.spilled() {
385                let bytes = self.region.copy_slice(&item.data[..]);
386                Row {
387                    data: compact_bytes::CompactBytes::from_raw_parts(
388                        bytes.as_mut_ptr(),
389                        item.data.len(),
390                        item.data.capacity(),
391                    ),
392                }
393            } else {
394                item.clone()
395            }
396        }
397
398        fn reserve_items<'a, I>(&mut self, items: I)
399        where
400            Self: 'a,
401            I: Iterator<Item = &'a Self::Item> + Clone,
402        {
403            let size = items
404                .filter(|row| row.data.spilled())
405                .map(|row| row.data.len())
406                .sum();
407            let size = std::cmp::min(size, Self::LIMIT);
408            self.region.reserve(size);
409        }
410
411        fn reserve_regions<'a, I>(&mut self, regions: I)
412        where
413            Self: 'a,
414            I: Iterator<Item = &'a Self> + Clone,
415        {
416            let size = regions.map(|r| r.region.len()).sum();
417            let size = std::cmp::min(size, Self::LIMIT);
418            self.region.reserve(size);
419        }
420
421        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
422            self.region.heap_size(callback)
423        }
424    }
425}
426
427mod columnar {
428    use columnar::common::PushIndexAs;
429    use columnar::{
430        AsBytes, Borrow, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
431    };
432    use mz_ore::cast::CastFrom;
433    use std::ops::Range;
434
435    use crate::{Row, RowRef};
436
437    #[derive(
438        Copy,
439        Clone,
440        Debug,
441        Default,
442        PartialEq,
443        serde::Serialize,
444        serde::Deserialize
445    )]
446    pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
447        /// Bounds container; provides indexed access to offsets.
448        bounds: BC,
449        /// Values container; provides slice access to bytes.
450        values: VC,
451    }
452
453    impl Columnar for Row {
454        #[inline(always)]
455        fn copy_from(&mut self, other: columnar::Ref<'_, Self>) {
456            self.clear();
457            self.data.extend_from_slice(other.data());
458        }
459        #[inline(always)]
460        fn into_owned(other: columnar::Ref<'_, Self>) -> Self {
461            other.to_owned()
462        }
463        type Container = Rows;
464        #[inline(always)]
465        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
466        where
467            Self: 'a,
468        {
469            thing
470        }
471    }
472
473    impl<BC: PushIndexAs<u64>> Borrow for Rows<BC, Vec<u8>> {
474        type Ref<'a> = &'a RowRef;
475        type Borrowed<'a>
476            = Rows<BC::Borrowed<'a>, &'a [u8]>
477        where
478            Self: 'a;
479        #[inline(always)]
480        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
481            Rows {
482                bounds: self.bounds.borrow(),
483                values: self.values.borrow(),
484            }
485        }
486        #[inline(always)]
487        fn reborrow<'c, 'a: 'c>(item: Self::Borrowed<'a>) -> Self::Borrowed<'c>
488        where
489            Self: 'a,
490        {
491            Rows {
492                bounds: BC::reborrow(item.bounds),
493                values: item.values,
494            }
495        }
496
497        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
498        where
499            Self: 'a,
500        {
501            item
502        }
503    }
504
505    impl<BC: PushIndexAs<u64>> Container for Rows<BC, Vec<u8>> {
506        fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
507            if !range.is_empty() {
508                // Imported bounds will be relative to this starting offset.
509                let values_len: u64 = self.values.len().try_into().expect("must fit");
510
511                // Push all bytes that we can, all at once.
512                let other_lower = if range.start == 0 {
513                    0
514                } else {
515                    other.bounds.index_as(range.start - 1)
516                };
517                let other_upper = other.bounds.index_as(range.end - 1);
518                self.values.extend_from_self(
519                    other.values,
520                    usize::try_from(other_lower).expect("must fit")
521                        ..usize::try_from(other_upper).expect("must fit"),
522                );
523
524                // Each bound needs to be shifted by `values_len - other_lower`.
525                if values_len == other_lower {
526                    self.bounds.extend_from_self(other.bounds, range);
527                } else {
528                    for index in range {
529                        let shifted = other.bounds.index_as(index) - other_lower + values_len;
530                        self.bounds.push(&shifted)
531                    }
532                }
533            }
534        }
535        fn reserve_for<'a, I>(&mut self, selves: I)
536        where
537            Self: 'a,
538            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
539        {
540            self.bounds.reserve_for(selves.clone().map(|r| r.bounds));
541            self.values.reserve_for(selves.map(|r| r.values));
542        }
543    }
544
545    impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
546        #[inline(always)]
547        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
548            columnar::chain(self.bounds.as_bytes(), self.values.as_bytes())
549        }
550    }
551    impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
552        #[inline(always)]
553        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
554            Self {
555                bounds: FromBytes::from_bytes(bytes),
556                values: FromBytes::from_bytes(bytes),
557            }
558        }
559    }
560
561    impl<BC: Len, VC> Len for Rows<BC, VC> {
562        #[inline(always)]
563        fn len(&self) -> usize {
564            self.bounds.len()
565        }
566    }
567
568    impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
569        type Ref = &'a RowRef;
570        #[inline(always)]
571        fn get(&self, index: usize) -> Self::Ref {
572            let lower = if index == 0 {
573                0
574            } else {
575                self.bounds.index_as(index - 1)
576            };
577            let upper = self.bounds.index_as(index);
578            let lower = usize::cast_from(lower);
579            let upper = usize::cast_from(upper);
580            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
581            // that correspond to the original rows.
582            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
583        }
584    }
585    impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
586        type Ref = &'a RowRef;
587        #[inline(always)]
588        fn get(&self, index: usize) -> Self::Ref {
589            let lower = if index == 0 {
590                0
591            } else {
592                self.bounds.index_as(index - 1)
593            };
594            let upper = self.bounds.index_as(index);
595            let lower = usize::cast_from(lower);
596            let upper = usize::cast_from(upper);
597            // SAFETY: self.values contains only valid row data, and self.metadata delimits only ranges
598            // that correspond to the original rows.
599            unsafe { RowRef::from_slice(&self.values[lower..upper]) }
600        }
601    }
602
603    impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
604        #[inline(always)]
605        fn push(&mut self, item: &Row) {
606            self.values.extend_from_slice(item.data.as_slice());
607            self.bounds.push(u64::cast_from(self.values.len()));
608        }
609    }
610    impl<BC: for<'a> Push<&'a u64>> Push<&RowRef> for Rows<BC> {
611        #[inline(always)]
612        fn push(&mut self, item: &RowRef) {
613            self.values.extend_from_slice(item.data());
614            self.bounds.push(&u64::cast_from(self.values.len()));
615        }
616    }
617    impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
618        #[inline(always)]
619        fn clear(&mut self) {
620            self.bounds.clear();
621            self.values.clear();
622        }
623    }
624    impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
625        #[inline(always)]
626        fn heap_size(&self) -> (usize, usize) {
627            let (l0, c0) = self.bounds.heap_size();
628            let (l1, c1) = self.values.heap_size();
629            (l0 + l1, c0 + c1)
630        }
631    }
632}
633
634/// A contiguous slice of bytes that are row data.
635///
636/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
637#[derive(PartialEq, Eq, Hash)]
638#[repr(transparent)]
639pub struct RowRef([u8]);
640
641impl RowRef {
642    /// Create a [`RowRef`] from a slice of data.
643    ///
644    /// # Safety
645    ///
646    /// We do not check that the provided slice is valid [`Row`] data; the caller is required to
647    /// ensure this.
648    pub unsafe fn from_slice(row: &[u8]) -> &RowRef {
649        #[allow(clippy::as_conversions)]
650        let ptr = row as *const [u8] as *const RowRef;
651        // SAFETY: We know `ptr` is non-null and aligned because it came from a &[u8].
652        unsafe { &*ptr }
653    }
654
655    /// Unpack `self` into a `Vec<Datum>` for efficient random access.
656    pub fn unpack(&self) -> Vec<Datum<'_>> {
657        // It's usually cheaper to unpack twice to figure out the right length than it is to grow the vec as we go
658        let len = self.iter().count();
659        let mut vec = Vec::with_capacity(len);
660        vec.extend(self.iter());
661        vec
662    }
663
664    /// Return the first [`Datum`] in `self`
665    ///
666    /// Panics if the [`RowRef`] is empty.
667    pub fn unpack_first(&self) -> Datum<'_> {
668        self.iter().next().unwrap()
669    }
670
671    /// Iterate the [`Datum`] elements of the [`RowRef`].
672    pub fn iter(&self) -> DatumListIter<'_> {
673        DatumListIter { data: &self.0 }
674    }
675
676    /// Return the byte length of this [`RowRef`].
677    pub fn byte_len(&self) -> usize {
678        self.0.len()
679    }
680
681    /// For debugging only.
682    pub fn data(&self) -> &[u8] {
683        &self.0
684    }
685
686    /// True iff there is no data in this [`RowRef`].
687    pub fn is_empty(&self) -> bool {
688        self.0.is_empty()
689    }
690}
691
692impl ToOwned for RowRef {
693    type Owned = Row;
694
695    fn to_owned(&self) -> Self::Owned {
696        // SAFETY: RowRef has the invariant that the wrapped data must be a valid Row encoding.
697        unsafe { Row::from_bytes_unchecked(&self.0) }
698    }
699}
700
701impl<'a> IntoIterator for &'a RowRef {
702    type Item = Datum<'a>;
703    type IntoIter = DatumListIter<'a>;
704
705    fn into_iter(self) -> DatumListIter<'a> {
706        DatumListIter { data: &self.0 }
707    }
708}
709
710/// These implementations order first by length, and then by slice contents.
711/// This allows many comparisons to complete without dereferencing memory.
712/// Warning: These order by the u8 array representation, and NOT by Datum::cmp.
713impl PartialOrd for RowRef {
714    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
715        Some(self.cmp(other))
716    }
717}
718
719impl Ord for RowRef {
720    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
721        match self.0.len().cmp(&other.0.len()) {
722            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
723            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
724            std::cmp::Ordering::Equal => self.0.cmp(&other.0),
725        }
726    }
727}
728
729impl fmt::Debug for RowRef {
730    /// Debug representation using the internal datums
731    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
732        f.write_str("RowRef{")?;
733        f.debug_list().entries(self.into_iter()).finish()?;
734        f.write_str("}")
735    }
736}
737
738/// Packs datums into a [`Row`].
739///
740/// Creating a `RowPacker` via [`Row::packer`] starts a packing operation on the
741/// row. A packing operation always starts from scratch: the existing contents
742/// of the underlying row are cleared.
743///
744/// To complete a packing operation, drop the `RowPacker`.
745#[derive(Debug)]
746pub struct RowPacker<'a> {
747    row: &'a mut Row,
748}
749
750#[derive(Debug, Clone)]
751pub struct DatumListIter<'a> {
752    data: &'a [u8],
753}
754
755#[derive(Debug, Clone)]
756pub struct DatumDictIter<'a> {
757    data: &'a [u8],
758    prev_key: Option<&'a str>,
759}
760
761/// `RowArena` is used to hold on to temporary `Row`s for functions like `eval` that need to create complex `Datum`s but don't have a `Row` to put them in yet.
762#[derive(Debug)]
763pub struct RowArena {
764    // Semantically, this field would be better represented by a `Vec<Box<[u8]>>`,
765    // as once the arena takes ownership of a byte vector the vector is never
766    // modified. But `RowArena::push_bytes` takes ownership of a `Vec<u8>`, so
767    // storing that `Vec<u8>` directly avoids an allocation. The cost is
768    // additional memory use, as the vector may have spare capacity, but row
769    // arenas are short lived so this is the better tradeoff.
770    inner: RefCell<Vec<Vec<u8>>>,
771}
772
773// DatumList and DatumDict defined here rather than near Datum because we need private access to the unsafe data field
774
775/// A sequence of Datums
776#[derive(Clone, Copy)]
777pub struct DatumList<'a> {
778    /// Points at the serialized datums
779    data: &'a [u8],
780}
781
782impl<'a> Debug for DatumList<'a> {
783    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
784        f.debug_list().entries(self.iter()).finish()
785    }
786}
787
788impl<'a> PartialEq for DatumList<'a> {
789    #[inline(always)]
790    fn eq(&self, other: &DatumList<'a>) -> bool {
791        self.iter().eq(other.iter())
792    }
793}
794
795impl<'a> Eq for DatumList<'a> {}
796
797impl<'a> Hash for DatumList<'a> {
798    #[inline(always)]
799    fn hash<H: Hasher>(&self, state: &mut H) {
800        for d in self.iter() {
801            d.hash(state);
802        }
803    }
804}
805
806impl Ord for DatumList<'_> {
807    #[inline(always)]
808    fn cmp(&self, other: &DatumList) -> Ordering {
809        self.iter().cmp(other.iter())
810    }
811}
812
813impl PartialOrd for DatumList<'_> {
814    #[inline(always)]
815    fn partial_cmp(&self, other: &DatumList) -> Option<Ordering> {
816        Some(self.cmp(other))
817    }
818}
819
820/// A mapping from string keys to Datums
821#[derive(Clone, Copy)]
822pub struct DatumMap<'a> {
823    /// Points at the serialized datums, which should be sorted in key order
824    data: &'a [u8],
825}
826
827impl<'a> PartialEq for DatumMap<'a> {
828    #[inline(always)]
829    fn eq(&self, other: &DatumMap<'a>) -> bool {
830        self.iter().eq(other.iter())
831    }
832}
833
834impl<'a> Eq for DatumMap<'a> {}
835
836impl<'a> Hash for DatumMap<'a> {
837    #[inline(always)]
838    fn hash<H: Hasher>(&self, state: &mut H) {
839        for (k, v) in self.iter() {
840            k.hash(state);
841            v.hash(state);
842        }
843    }
844}
845
846impl<'a> Ord for DatumMap<'a> {
847    #[inline(always)]
848    fn cmp(&self, other: &DatumMap<'a>) -> Ordering {
849        self.iter().cmp(other.iter())
850    }
851}
852
853impl<'a> PartialOrd for DatumMap<'a> {
854    #[inline(always)]
855    fn partial_cmp(&self, other: &DatumMap<'a>) -> Option<Ordering> {
856        Some(self.cmp(other))
857    }
858}
859
860/// Represents a single `Datum`, appropriate to be nested inside other
861/// `Datum`s.
862#[derive(Clone, Copy, Eq, PartialEq, Hash)]
863pub struct DatumNested<'a> {
864    val: &'a [u8],
865}
866
867impl<'a> std::fmt::Display for DatumNested<'a> {
868    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
869        std::fmt::Display::fmt(&self.datum(), f)
870    }
871}
872
873impl<'a> std::fmt::Debug for DatumNested<'a> {
874    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
875        f.debug_struct("DatumNested")
876            .field("val", &self.datum())
877            .finish()
878    }
879}
880
881impl<'a> DatumNested<'a> {
882    // Figure out which bytes `read_datum` returns (e.g. including the tag),
883    // and then store a reference to those bytes, so we can "replay" this same
884    // call later on without storing the datum itself.
885    pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
886        let prev = *data;
887        let _ = unsafe { read_datum(data) };
888        DatumNested {
889            val: &prev[..(prev.len() - data.len())],
890        }
891    }
892
893    /// Returns the datum `self` contains.
894    pub fn datum(&self) -> Datum<'a> {
895        let mut temp = self.val;
896        unsafe { read_datum(&mut temp) }
897    }
898}
899
900impl<'a> Ord for DatumNested<'a> {
901    fn cmp(&self, other: &Self) -> Ordering {
902        self.datum().cmp(&other.datum())
903    }
904}
905
906impl<'a> PartialOrd for DatumNested<'a> {
907    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
908        Some(self.cmp(other))
909    }
910}
911
912// Prefer adding new tags to the end of the enum. Certain behavior, like row ordering and EXPLAIN
913// PHYSICAL PLAN, rely on the ordering of this enum. Neither of these are breaking changes, but
914// it's annoying when they change.
915#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
916#[repr(u8)]
917enum Tag {
918    Null,
919    False,
920    True,
921    Int16,
922    Int32,
923    Int64,
924    UInt8,
925    UInt32,
926    Float32,
927    Float64,
928    Date,
929    Time,
930    Timestamp,
931    TimestampTz,
932    Interval,
933    BytesTiny,
934    BytesShort,
935    BytesLong,
936    BytesHuge,
937    StringTiny,
938    StringShort,
939    StringLong,
940    StringHuge,
941    Uuid,
942    Array,
943    ListTiny,
944    ListShort,
945    ListLong,
946    ListHuge,
947    Dict,
948    JsonNull,
949    Dummy,
950    Numeric,
951    UInt16,
952    UInt64,
953    MzTimestamp,
954    Range,
955    MzAclItem,
956    AclItem,
957    // Everything except leap seconds and times beyond the range of
958    // i64 nanoseconds. (Note that Materialize does not support leap
959    // seconds, but this module does).
960    CheapTimestamp,
961    // Everything except leap seconds and times beyond the range of
962    // i64 nanoseconds. (Note that Materialize does not support leap
963    // seconds, but this module does).
964    CheapTimestampTz,
965    // The next several tags are for variable-length signed integer encoding.
966    // The basic idea is that `NonNegativeIntN_K` is used to encode a datum of type
967    // IntN whose actual value is positive or zero and fits in K bits, and similarly for
968    // NegativeIntN_K with negative values.
969    //
970    // The order of these tags matters, because we want to be able to choose the
971    // tag for a given datum quickly, with arithmetic, rather than slowly, with a
972    // stack of `if` statements.
973    //
974    // Separate tags for non-negative and negative numbers are used to avoid having to
975    // waste one bit in the actual data space to encode the sign.
976    NonNegativeInt16_0, // i.e., 0
977    NonNegativeInt16_8,
978    NonNegativeInt16_16,
979
980    NonNegativeInt32_0,
981    NonNegativeInt32_8,
982    NonNegativeInt32_16,
983    NonNegativeInt32_24,
984    NonNegativeInt32_32,
985
986    NonNegativeInt64_0,
987    NonNegativeInt64_8,
988    NonNegativeInt64_16,
989    NonNegativeInt64_24,
990    NonNegativeInt64_32,
991    NonNegativeInt64_40,
992    NonNegativeInt64_48,
993    NonNegativeInt64_56,
994    NonNegativeInt64_64,
995
996    NegativeInt16_0, // i.e., -1
997    NegativeInt16_8,
998    NegativeInt16_16,
999
1000    NegativeInt32_0,
1001    NegativeInt32_8,
1002    NegativeInt32_16,
1003    NegativeInt32_24,
1004    NegativeInt32_32,
1005
1006    NegativeInt64_0,
1007    NegativeInt64_8,
1008    NegativeInt64_16,
1009    NegativeInt64_24,
1010    NegativeInt64_32,
1011    NegativeInt64_40,
1012    NegativeInt64_48,
1013    NegativeInt64_56,
1014    NegativeInt64_64,
1015
1016    // These are like the ones above, but for unsigned types. The
1017    // situation is slightly simpler as we don't have negatives.
1018    UInt8_0, // i.e., 0
1019    UInt8_8,
1020
1021    UInt16_0,
1022    UInt16_8,
1023    UInt16_16,
1024
1025    UInt32_0,
1026    UInt32_8,
1027    UInt32_16,
1028    UInt32_24,
1029    UInt32_32,
1030
1031    UInt64_0,
1032    UInt64_8,
1033    UInt64_16,
1034    UInt64_24,
1035    UInt64_32,
1036    UInt64_40,
1037    UInt64_48,
1038    UInt64_56,
1039    UInt64_64,
1040}
1041
1042impl Tag {
1043    fn actual_int_length(self) -> Option<usize> {
1044        use Tag::*;
1045        let val = match self {
1046            NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
1047            | UInt32_0 | UInt64_0 => 0,
1048            NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
1049            | UInt32_8 | UInt64_8 => 1,
1050            NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
1051            | UInt32_16 | UInt64_16 => 2,
1052            NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
1053            NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
1054            NonNegativeInt64_40 | UInt64_40 => 5,
1055            NonNegativeInt64_48 | UInt64_48 => 6,
1056            NonNegativeInt64_56 | UInt64_56 => 7,
1057            NonNegativeInt64_64 | UInt64_64 => 8,
1058            NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
1059            NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
1060            NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
1061            NegativeInt32_24 | NegativeInt64_24 => 3,
1062            NegativeInt32_32 | NegativeInt64_32 => 4,
1063            NegativeInt64_40 => 5,
1064            NegativeInt64_48 => 6,
1065            NegativeInt64_56 => 7,
1066            NegativeInt64_64 => 8,
1067
1068            _ => return None,
1069        };
1070        Some(val)
1071    }
1072}
1073
1074// --------------------------------------------------------------------------------
1075// reading data
1076
1077/// Read a byte slice starting at byte `offset`.
1078///
1079/// Updates `offset` to point to the first byte after the end of the read region.
1080fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
1081    let len = u64::from_le_bytes(read_byte_array(data));
1082    let len = usize::cast_from(len);
1083    let (bytes, next) = data.split_at(len);
1084    *data = next;
1085    bytes
1086}
1087
1088/// Read a data whose length is encoded in the row before its contents.
1089///
1090/// Updates `offset` to point to the first byte after the end of the read region.
1091///
1092/// # Safety
1093///
1094/// This function is safe if the datum's length and contents were previously written by `push_lengthed_bytes`,
1095/// and it was only written with a `String` tag if it was indeed UTF-8.
1096unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
1097    let len = match tag {
1098        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
1099        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1100            usize::from(u16::from_le_bytes(read_byte_array(data)))
1101        }
1102        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1103            usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
1104        }
1105        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1106            usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
1107        }
1108        _ => unreachable!(),
1109    };
1110    let (bytes, next) = data.split_at(len);
1111    *data = next;
1112    match tag {
1113        Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
1114        Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
1115            Datum::String(str::from_utf8_unchecked(bytes))
1116        }
1117        Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
1118            Datum::List(DatumList { data: bytes })
1119        }
1120        _ => unreachable!(),
1121    }
1122}
1123
1124fn read_byte(data: &mut &[u8]) -> u8 {
1125    let byte = data[0];
1126    *data = &data[1..];
1127    byte
1128}
1129
1130/// Read `length` bytes from `data` at `offset`, updating the
1131/// latter. Extend the resulting buffer to an array of `N` bytes by
1132/// inserting `FILL` in the k most significant bytes, where k = N - length.
1133///
1134/// SAFETY:
1135///   * length <= N
1136///   * offset + length <= data.len()
1137fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1138    data: &mut &[u8],
1139    length: usize,
1140) -> [u8; N] {
1141    let mut raw = [FILL; N];
1142    let (prev, next) = data.split_at(length);
1143    (raw[..prev.len()]).copy_from_slice(prev);
1144    *data = next;
1145    raw
1146}
1147/// Read `length` bytes from `data` at `offset`, updating the
1148/// latter. Extend the resulting buffer to a negative `N`-byte
1149/// twos complement integer by filling the remaining bits with 1.
1150///
1151/// SAFETY:
1152///   * length <= N
1153///   * offset + length <= data.len()
1154fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1155    read_byte_array_sign_extending::<N, 255>(data, length)
1156}
1157
1158/// Read `length` bytes from `data` at `offset`, updating the
1159/// latter. Extend the resulting buffer to a positive or zero `N`-byte
1160/// twos complement integer by filling the remaining bits with 0.
1161///
1162/// SAFETY:
1163///   * length <= N
1164///   * offset + length <= data.len()
1165fn read_byte_array_extending_nonnegative<const N: usize>(
1166    data: &mut &[u8],
1167    length: usize,
1168) -> [u8; N] {
1169    read_byte_array_sign_extending::<N, 0>(data, length)
1170}
1171
1172pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1173    let (prev, next) = data.split_first_chunk().unwrap();
1174    *data = next;
1175    *prev
1176}
1177
1178pub(super) fn read_date(data: &mut &[u8]) -> Date {
1179    let days = i32::from_le_bytes(read_byte_array(data));
1180    Date::from_pg_epoch(days).expect("unexpected date")
1181}
1182
1183pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1184    let year = i32::from_le_bytes(read_byte_array(data));
1185    let ordinal = u32::from_le_bytes(read_byte_array(data));
1186    NaiveDate::from_yo_opt(year, ordinal).unwrap()
1187}
1188
1189pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1190    let secs = u32::from_le_bytes(read_byte_array(data));
1191    let nanos = u32::from_le_bytes(read_byte_array(data));
1192    NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1193}
1194
1195/// Read a datum starting at byte `offset`.
1196///
1197/// Updates `offset` to point to the first byte after the end of the read region.
1198///
1199/// # Safety
1200///
1201/// This function is safe if a `Datum` was previously written at this offset by `push_datum`.
1202/// Otherwise it could return invalid values, which is Undefined Behavior.
1203pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1204    let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1205    match tag {
1206        Tag::Null => Datum::Null,
1207        Tag::False => Datum::False,
1208        Tag::True => Datum::True,
1209        Tag::UInt8_0 | Tag::UInt8_8 => {
1210            let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1211                data,
1212                tag.actual_int_length()
1213                    .expect("returns a value for variable-length-encoded integer tags"),
1214            ));
1215            Datum::UInt8(i)
1216        }
1217        Tag::Int16 => {
1218            let i = i16::from_le_bytes(read_byte_array(data));
1219            Datum::Int16(i)
1220        }
1221        Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1222            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1223            // and `data` is big enough because it was encoded validly. These assumptions
1224            // are checked in debug asserts.
1225            let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1226                data,
1227                tag.actual_int_length()
1228                    .expect("returns a value for variable-length-encoded integer tags"),
1229            ));
1230            Datum::Int16(i)
1231        }
1232        Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1233            let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1234                data,
1235                tag.actual_int_length()
1236                    .expect("returns a value for variable-length-encoded integer tags"),
1237            ));
1238            Datum::UInt16(i)
1239        }
1240        Tag::Int32 => {
1241            let i = i32::from_le_bytes(read_byte_array(data));
1242            Datum::Int32(i)
1243        }
1244        Tag::NonNegativeInt32_0
1245        | Tag::NonNegativeInt32_32
1246        | Tag::NonNegativeInt32_8
1247        | Tag::NonNegativeInt32_16
1248        | Tag::NonNegativeInt32_24 => {
1249            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1250            // and `data` is big enough because it was encoded validly. These assumptions
1251            // are checked in debug asserts.
1252            let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1253                data,
1254                tag.actual_int_length()
1255                    .expect("returns a value for variable-length-encoded integer tags"),
1256            ));
1257            Datum::Int32(i)
1258        }
1259        Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1260            let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1261                data,
1262                tag.actual_int_length()
1263                    .expect("returns a value for variable-length-encoded integer tags"),
1264            ));
1265            Datum::UInt32(i)
1266        }
1267        Tag::Int64 => {
1268            let i = i64::from_le_bytes(read_byte_array(data));
1269            Datum::Int64(i)
1270        }
1271        Tag::NonNegativeInt64_0
1272        | Tag::NonNegativeInt64_64
1273        | Tag::NonNegativeInt64_8
1274        | Tag::NonNegativeInt64_16
1275        | Tag::NonNegativeInt64_24
1276        | Tag::NonNegativeInt64_32
1277        | Tag::NonNegativeInt64_40
1278        | Tag::NonNegativeInt64_48
1279        | Tag::NonNegativeInt64_56 => {
1280            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1281            // and `data` is big enough because it was encoded validly. These assumptions
1282            // are checked in debug asserts.
1283
1284            let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1285                data,
1286                tag.actual_int_length()
1287                    .expect("returns a value for variable-length-encoded integer tags"),
1288            ));
1289            Datum::Int64(i)
1290        }
1291        Tag::UInt64_0
1292        | Tag::UInt64_8
1293        | Tag::UInt64_16
1294        | Tag::UInt64_24
1295        | Tag::UInt64_32
1296        | Tag::UInt64_40
1297        | Tag::UInt64_48
1298        | Tag::UInt64_56
1299        | Tag::UInt64_64 => {
1300            let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1301                data,
1302                tag.actual_int_length()
1303                    .expect("returns a value for variable-length-encoded integer tags"),
1304            ));
1305            Datum::UInt64(i)
1306        }
1307        Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1308            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1309            // and `data` is big enough because it was encoded validly. These assumptions
1310            // are checked in debug asserts.
1311            let i = i16::from_le_bytes(read_byte_array_extending_negative(
1312                data,
1313                tag.actual_int_length()
1314                    .expect("returns a value for variable-length-encoded integer tags"),
1315            ));
1316            Datum::Int16(i)
1317        }
1318        Tag::NegativeInt32_0
1319        | Tag::NegativeInt32_32
1320        | Tag::NegativeInt32_8
1321        | Tag::NegativeInt32_16
1322        | Tag::NegativeInt32_24 => {
1323            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1324            // and `data` is big enough because it was encoded validly. These assumptions
1325            // are checked in debug asserts.
1326            let i = i32::from_le_bytes(read_byte_array_extending_negative(
1327                data,
1328                tag.actual_int_length()
1329                    .expect("returns a value for variable-length-encoded integer tags"),
1330            ));
1331            Datum::Int32(i)
1332        }
1333        Tag::NegativeInt64_0
1334        | Tag::NegativeInt64_64
1335        | Tag::NegativeInt64_8
1336        | Tag::NegativeInt64_16
1337        | Tag::NegativeInt64_24
1338        | Tag::NegativeInt64_32
1339        | Tag::NegativeInt64_40
1340        | Tag::NegativeInt64_48
1341        | Tag::NegativeInt64_56 => {
1342            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1343            // and `data` is big enough because the row was encoded validly. These assumptions
1344            // are checked in debug asserts.
1345            let i = i64::from_le_bytes(read_byte_array_extending_negative(
1346                data,
1347                tag.actual_int_length()
1348                    .expect("returns a value for variable-length-encoded integer tags"),
1349            ));
1350            Datum::Int64(i)
1351        }
1352
1353        Tag::UInt8 => {
1354            let i = u8::from_le_bytes(read_byte_array(data));
1355            Datum::UInt8(i)
1356        }
1357        Tag::UInt16 => {
1358            let i = u16::from_le_bytes(read_byte_array(data));
1359            Datum::UInt16(i)
1360        }
1361        Tag::UInt32 => {
1362            let i = u32::from_le_bytes(read_byte_array(data));
1363            Datum::UInt32(i)
1364        }
1365        Tag::UInt64 => {
1366            let i = u64::from_le_bytes(read_byte_array(data));
1367            Datum::UInt64(i)
1368        }
1369        Tag::Float32 => {
1370            let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1371            Datum::Float32(OrderedFloat::from(f))
1372        }
1373        Tag::Float64 => {
1374            let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1375            Datum::Float64(OrderedFloat::from(f))
1376        }
1377        Tag::Date => Datum::Date(read_date(data)),
1378        Tag::Time => Datum::Time(read_time(data)),
1379        Tag::CheapTimestamp => {
1380            let ts = i64::from_le_bytes(read_byte_array(data));
1381            let secs = ts.div_euclid(1_000_000_000);
1382            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1383            let ndt = DateTime::from_timestamp(secs, nsecs)
1384                .expect("We only write round-trippable timestamps")
1385                .naive_utc();
1386            Datum::Timestamp(
1387                CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1388            )
1389        }
1390        Tag::CheapTimestampTz => {
1391            let ts = i64::from_le_bytes(read_byte_array(data));
1392            let secs = ts.div_euclid(1_000_000_000);
1393            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1394            let dt = DateTime::from_timestamp(secs, nsecs)
1395                .expect("We only write round-trippable timestamps");
1396            Datum::TimestampTz(
1397                CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1398            )
1399        }
1400        Tag::Timestamp => {
1401            let date = read_naive_date(data);
1402            let time = read_time(data);
1403            Datum::Timestamp(
1404                CheckedTimestamp::from_timestamplike(date.and_time(time))
1405                    .expect("unexpected timestamp"),
1406            )
1407        }
1408        Tag::TimestampTz => {
1409            let date = read_naive_date(data);
1410            let time = read_time(data);
1411            Datum::TimestampTz(
1412                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1413                    date.and_time(time),
1414                    Utc,
1415                ))
1416                .expect("unexpected timestamptz"),
1417            )
1418        }
1419        Tag::Interval => {
1420            let months = i32::from_le_bytes(read_byte_array(data));
1421            let days = i32::from_le_bytes(read_byte_array(data));
1422            let micros = i64::from_le_bytes(read_byte_array(data));
1423            Datum::Interval(Interval {
1424                months,
1425                days,
1426                micros,
1427            })
1428        }
1429        Tag::BytesTiny
1430        | Tag::BytesShort
1431        | Tag::BytesLong
1432        | Tag::BytesHuge
1433        | Tag::StringTiny
1434        | Tag::StringShort
1435        | Tag::StringLong
1436        | Tag::StringHuge
1437        | Tag::ListTiny
1438        | Tag::ListShort
1439        | Tag::ListLong
1440        | Tag::ListHuge => read_lengthed_datum(data, tag),
1441        Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1442        Tag::Array => {
1443            // See the comment in `Row::push_array` for details on the encoding
1444            // of arrays.
1445            let ndims = read_byte(data);
1446            let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1447            let (dims, next) = data.split_at(dims_size);
1448            *data = next;
1449            let bytes = read_untagged_bytes(data);
1450            Datum::Array(Array {
1451                dims: ArrayDimensions { data: dims },
1452                elements: DatumList { data: bytes },
1453            })
1454        }
1455        Tag::Dict => {
1456            let bytes = read_untagged_bytes(data);
1457            Datum::Map(DatumMap { data: bytes })
1458        }
1459        Tag::JsonNull => Datum::JsonNull,
1460        Tag::Dummy => Datum::Dummy,
1461        Tag::Numeric => {
1462            let digits = read_byte(data).into();
1463            let exponent = i8::reinterpret_cast(read_byte(data));
1464            let bits = read_byte(data);
1465
1466            let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1467            let lsu_u8_len = lsu_u16_len * 2;
1468            let (lsu_u8, next) = data.split_at(lsu_u8_len);
1469            *data = next;
1470
1471            // TODO: if we refactor the decimal library to accept the owned
1472            // array as a parameter to `from_raw_parts` below, we could likely
1473            // avoid a copy because it is exactly the value we want
1474            let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1475            for (i, c) in lsu_u8.chunks(2).enumerate() {
1476                lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1477            }
1478
1479            let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1480            Datum::from(d)
1481        }
1482        Tag::MzTimestamp => {
1483            let t = Timestamp::decode(read_byte_array(data));
1484            Datum::MzTimestamp(t)
1485        }
1486        Tag::Range => {
1487            // See notes on `push_range_with` for details about encoding.
1488            let flag_byte = read_byte(data);
1489            let flags = range::InternalFlags::from_bits(flag_byte)
1490                .expect("range flags must be encoded validly");
1491
1492            if flags.contains(range::InternalFlags::EMPTY) {
1493                assert!(
1494                    flags == range::InternalFlags::EMPTY,
1495                    "empty ranges contain only RANGE_EMPTY flag"
1496                );
1497
1498                return Datum::Range(Range { inner: None });
1499            }
1500
1501            let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1502                None
1503            } else {
1504                Some(DatumNested::extract(data))
1505            };
1506
1507            let lower = RangeBound {
1508                inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1509                bound: lower_bound,
1510            };
1511
1512            let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1513                None
1514            } else {
1515                Some(DatumNested::extract(data))
1516            };
1517
1518            let upper = RangeBound {
1519                inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1520                bound: upper_bound,
1521            };
1522
1523            Datum::Range(Range {
1524                inner: Some(RangeInner { lower, upper }),
1525            })
1526        }
1527        Tag::MzAclItem => {
1528            const N: usize = MzAclItem::binary_size();
1529            let mz_acl_item =
1530                MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1531            Datum::MzAclItem(mz_acl_item)
1532        }
1533        Tag::AclItem => {
1534            const N: usize = AclItem::binary_size();
1535            let acl_item =
1536                AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1537            Datum::AclItem(acl_item)
1538        }
1539    }
1540}
1541
1542// --------------------------------------------------------------------------------
1543// writing data
1544
1545fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1546where
1547    D: Vector<u8>,
1548{
1549    let len = u64::cast_from(bytes.len());
1550    data.extend_from_slice(&len.to_le_bytes());
1551    data.extend_from_slice(bytes);
1552}
1553
1554fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1555where
1556    D: Vector<u8>,
1557{
1558    match tag {
1559        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1560            let len = bytes.len().to_le_bytes();
1561            data.push(len[0]);
1562        }
1563        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1564            let len = bytes.len().to_le_bytes();
1565            data.extend_from_slice(&len[0..2]);
1566        }
1567        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1568            let len = bytes.len().to_le_bytes();
1569            data.extend_from_slice(&len[0..4]);
1570        }
1571        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1572            let len = bytes.len().to_le_bytes();
1573            data.extend_from_slice(&len);
1574        }
1575        _ => unreachable!(),
1576    }
1577    data.extend_from_slice(bytes);
1578}
1579
1580pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1581    i32::to_le_bytes(date.pg_epoch_days())
1582}
1583
1584fn push_date<D>(data: &mut D, date: Date)
1585where
1586    D: Vector<u8>,
1587{
1588    data.extend_from_slice(&date_to_array(date));
1589}
1590
1591pub(super) fn naive_date_to_arrays(
1592    date: NaiveDate,
1593) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1594    (
1595        i32::to_le_bytes(date.year()),
1596        u32::to_le_bytes(date.ordinal()),
1597    )
1598}
1599
1600fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1601where
1602    D: Vector<u8>,
1603{
1604    let (ds1, ds2) = naive_date_to_arrays(date);
1605    data.extend_from_slice(&ds1);
1606    data.extend_from_slice(&ds2);
1607}
1608
1609pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1610    (
1611        u32::to_le_bytes(time.num_seconds_from_midnight()),
1612        u32::to_le_bytes(time.nanosecond()),
1613    )
1614}
1615
1616fn push_time<D>(data: &mut D, time: NaiveTime)
1617where
1618    D: Vector<u8>,
1619{
1620    let (ts1, ts2) = time_to_arrays(time);
1621    data.extend_from_slice(&ts1);
1622    data.extend_from_slice(&ts2);
1623}
1624
1625/// Returns an i64 representing a `NaiveDateTime`, if
1626/// said i64 can be round-tripped back to a `NaiveDateTime`.
1627///
1628/// The only exotic NDTs for which this can't happen are those that
1629/// are hundreds of years in the future or past, or those that
1630/// represent a leap second. (Note that Materialize does not support
1631/// leap seconds, but this module does).
1632// This function is inspired by `NaiveDateTime::timestamp_nanos`,
1633// with extra checking.
1634fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1635    let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1636    if subsec_nanos >= 1_000_000_000 {
1637        return None;
1638    }
1639    let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1640    as_ns.checked_add(i64::from(subsec_nanos))
1641}
1642
1643// This function is extremely hot, so
1644// we just use `as` to avoid the overhead of
1645// `try_into` followed by `unwrap`.
1646// `leading_ones` and `leading_zeros`
1647// can never return values greater than 64, so the conversion is safe.
1648#[inline(always)]
1649#[allow(clippy::as_conversions)]
1650fn min_bytes_signed<T>(i: T) -> u8
1651where
1652    T: Into<i64>,
1653{
1654    let i: i64 = i.into();
1655
1656    // To fit in n bytes, we require that
1657    // everything but the leading sign bits fits in n*8
1658    // bits.
1659    let n_sign_bits = if i.is_negative() {
1660        i.leading_ones() as u8
1661    } else {
1662        i.leading_zeros() as u8
1663    };
1664
1665    (64 - n_sign_bits + 7) / 8
1666}
1667
1668// In principle we could just use `min_bytes_signed`, rather than
1669// having a separate function here, as long as we made that one take
1670// `T: Into<i128>` instead of 64. But LLVM doesn't seem smart enough
1671// to realize that that function is the same as the current version,
1672// and generates worse code.
1673//
1674// Justification for `as` is the same as in `min_bytes_signed`.
1675#[inline(always)]
1676#[allow(clippy::as_conversions)]
1677fn min_bytes_unsigned<T>(i: T) -> u8
1678where
1679    T: Into<u64>,
1680{
1681    let i: u64 = i.into();
1682
1683    let n_sign_bits = i.leading_zeros() as u8;
1684
1685    (64 - n_sign_bits + 7) / 8
1686}
1687
1688const TINY: usize = 1 << 8;
1689const SHORT: usize = 1 << 16;
1690const LONG: usize = 1 << 32;
1691
1692fn push_datum<D>(data: &mut D, datum: Datum)
1693where
1694    D: Vector<u8>,
1695{
1696    match datum {
1697        Datum::Null => data.push(Tag::Null.into()),
1698        Datum::False => data.push(Tag::False.into()),
1699        Datum::True => data.push(Tag::True.into()),
1700        Datum::Int16(i) => {
1701            let mbs = min_bytes_signed(i);
1702            let tag = u8::from(if i.is_negative() {
1703                Tag::NegativeInt16_0
1704            } else {
1705                Tag::NonNegativeInt16_0
1706            }) + mbs;
1707
1708            data.push(tag);
1709            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1710        }
1711        Datum::Int32(i) => {
1712            let mbs = min_bytes_signed(i);
1713            let tag = u8::from(if i.is_negative() {
1714                Tag::NegativeInt32_0
1715            } else {
1716                Tag::NonNegativeInt32_0
1717            }) + mbs;
1718
1719            data.push(tag);
1720            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1721        }
1722        Datum::Int64(i) => {
1723            let mbs = min_bytes_signed(i);
1724            let tag = u8::from(if i.is_negative() {
1725                Tag::NegativeInt64_0
1726            } else {
1727                Tag::NonNegativeInt64_0
1728            }) + mbs;
1729
1730            data.push(tag);
1731            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1732        }
1733        Datum::UInt8(i) => {
1734            let mbu = min_bytes_unsigned(i);
1735            let tag = u8::from(Tag::UInt8_0) + mbu;
1736            data.push(tag);
1737            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1738        }
1739        Datum::UInt16(i) => {
1740            let mbu = min_bytes_unsigned(i);
1741            let tag = u8::from(Tag::UInt16_0) + mbu;
1742            data.push(tag);
1743            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1744        }
1745        Datum::UInt32(i) => {
1746            let mbu = min_bytes_unsigned(i);
1747            let tag = u8::from(Tag::UInt32_0) + mbu;
1748            data.push(tag);
1749            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1750        }
1751        Datum::UInt64(i) => {
1752            let mbu = min_bytes_unsigned(i);
1753            let tag = u8::from(Tag::UInt64_0) + mbu;
1754            data.push(tag);
1755            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1756        }
1757        Datum::Float32(f) => {
1758            data.push(Tag::Float32.into());
1759            data.extend_from_slice(&f.to_bits().to_le_bytes());
1760        }
1761        Datum::Float64(f) => {
1762            data.push(Tag::Float64.into());
1763            data.extend_from_slice(&f.to_bits().to_le_bytes());
1764        }
1765        Datum::Date(d) => {
1766            data.push(Tag::Date.into());
1767            push_date(data, d);
1768        }
1769        Datum::Time(t) => {
1770            data.push(Tag::Time.into());
1771            push_time(data, t);
1772        }
1773        Datum::Timestamp(t) => {
1774            let datetime = t.to_naive();
1775            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1776                data.push(Tag::CheapTimestamp.into());
1777                data.extend_from_slice(&nanos.to_le_bytes());
1778            } else {
1779                data.push(Tag::Timestamp.into());
1780                push_naive_date(data, datetime.date());
1781                push_time(data, datetime.time());
1782            }
1783        }
1784        Datum::TimestampTz(t) => {
1785            let datetime = t.to_naive();
1786            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1787                data.push(Tag::CheapTimestampTz.into());
1788                data.extend_from_slice(&nanos.to_le_bytes());
1789            } else {
1790                data.push(Tag::TimestampTz.into());
1791                push_naive_date(data, datetime.date());
1792                push_time(data, datetime.time());
1793            }
1794        }
1795        Datum::Interval(i) => {
1796            data.push(Tag::Interval.into());
1797            data.extend_from_slice(&i.months.to_le_bytes());
1798            data.extend_from_slice(&i.days.to_le_bytes());
1799            data.extend_from_slice(&i.micros.to_le_bytes());
1800        }
1801        Datum::Bytes(bytes) => {
1802            let tag = match bytes.len() {
1803                0..TINY => Tag::BytesTiny,
1804                TINY..SHORT => Tag::BytesShort,
1805                SHORT..LONG => Tag::BytesLong,
1806                _ => Tag::BytesHuge,
1807            };
1808            data.push(tag.into());
1809            push_lengthed_bytes(data, bytes, tag);
1810        }
1811        Datum::String(string) => {
1812            let tag = match string.len() {
1813                0..TINY => Tag::StringTiny,
1814                TINY..SHORT => Tag::StringShort,
1815                SHORT..LONG => Tag::StringLong,
1816                _ => Tag::StringHuge,
1817            };
1818            data.push(tag.into());
1819            push_lengthed_bytes(data, string.as_bytes(), tag);
1820        }
1821        Datum::List(list) => {
1822            let tag = match list.data.len() {
1823                0..TINY => Tag::ListTiny,
1824                TINY..SHORT => Tag::ListShort,
1825                SHORT..LONG => Tag::ListLong,
1826                _ => Tag::ListHuge,
1827            };
1828            data.push(tag.into());
1829            push_lengthed_bytes(data, list.data, tag);
1830        }
1831        Datum::Uuid(u) => {
1832            data.push(Tag::Uuid.into());
1833            data.extend_from_slice(u.as_bytes());
1834        }
1835        Datum::Array(array) => {
1836            // See the comment in `Row::push_array` for details on the encoding
1837            // of arrays.
1838            data.push(Tag::Array.into());
1839            data.push(array.dims.ndims());
1840            data.extend_from_slice(array.dims.data);
1841            push_untagged_bytes(data, array.elements.data);
1842        }
1843        Datum::Map(dict) => {
1844            data.push(Tag::Dict.into());
1845            push_untagged_bytes(data, dict.data);
1846        }
1847        Datum::JsonNull => data.push(Tag::JsonNull.into()),
1848        Datum::MzTimestamp(t) => {
1849            data.push(Tag::MzTimestamp.into());
1850            data.extend_from_slice(&t.encode());
1851        }
1852        Datum::Dummy => data.push(Tag::Dummy.into()),
1853        Datum::Numeric(mut n) => {
1854            // Pseudo-canonical representation of decimal values with
1855            // insignificant zeroes trimmed. This compresses the number further
1856            // than `Numeric::trim` by removing all zeroes, and not only those in
1857            // the fractional component.
1858            numeric::cx_datum().reduce(&mut n.0);
1859            let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1860            data.push(Tag::Numeric.into());
1861            data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1862            data.push(
1863                i8::try_from(exponent)
1864                    .expect("exponent to fit within i8; should not exceed +/- 39")
1865                    .to_le_bytes()[0],
1866            );
1867            data.push(bits);
1868
1869            let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1870
1871            // Little endian machines can take the lsu directly from u16 to u8.
1872            if cfg!(target_endian = "little") {
1873                // SAFETY: `lsu` (returned by `coefficient_units()`) is a `&[u16]`, so
1874                // each element can safely be transmuted into two `u8`s.
1875                let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
1876                // The `u8` aligned version of the `lsu` should have twice as many
1877                // elements as we expect for the `u16` version.
1878                soft_assert_no_log!(
1879                    lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
1880                    "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
1881                    Numeric::digits_to_lsu_elements_len(digits) * 2,
1882                    lsu_bytes.len()
1883                );
1884                // There should be no unaligned elements in the prefix or suffix.
1885                soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
1886                data.extend_from_slice(lsu_bytes);
1887            } else {
1888                for u in lsu {
1889                    data.extend_from_slice(&u.to_le_bytes());
1890                }
1891            }
1892        }
1893        Datum::Range(range) => {
1894            // See notes on `push_range_with` for details about encoding.
1895            data.push(Tag::Range.into());
1896            data.push(range.internal_flag_bits());
1897
1898            if let Some(RangeInner { lower, upper }) = range.inner {
1899                for bound in [lower.bound, upper.bound] {
1900                    if let Some(bound) = bound {
1901                        match bound.datum() {
1902                            Datum::Null => panic!("cannot push Datum::Null into range"),
1903                            d => push_datum::<D>(data, d),
1904                        }
1905                    }
1906                }
1907            }
1908        }
1909        Datum::MzAclItem(mz_acl_item) => {
1910            data.push(Tag::MzAclItem.into());
1911            data.extend_from_slice(&mz_acl_item.encode_binary());
1912        }
1913        Datum::AclItem(acl_item) => {
1914            data.push(Tag::AclItem.into());
1915            data.extend_from_slice(&acl_item.encode_binary());
1916        }
1917    }
1918}
1919
1920/// Return the number of bytes these Datums would use if packed as a Row.
1921pub fn row_size<'a, I>(a: I) -> usize
1922where
1923    I: IntoIterator<Item = Datum<'a>>,
1924{
1925    // Using datums_size instead of a.data().len() here is safer because it will
1926    // return the size of the datums if they were packed into a Row. Although
1927    // a.data().len() happens to give the correct answer (and is faster), data()
1928    // is documented as for debugging only.
1929    let sz = datums_size::<_, _>(a);
1930    let size_of_row = std::mem::size_of::<Row>();
1931    // The Row struct attempts to inline data until it can't fit in the
1932    // preallocated size. Otherwise it spills to heap, and uses the Row to point
1933    // to that.
1934    if sz > Row::SIZE {
1935        sz + size_of_row
1936    } else {
1937        size_of_row
1938    }
1939}
1940
1941/// Number of bytes required by the datum.
1942/// This is used to optimistically pre-allocate buffers for packing rows.
1943pub fn datum_size(datum: &Datum) -> usize {
1944    match datum {
1945        Datum::Null => 1,
1946        Datum::False => 1,
1947        Datum::True => 1,
1948        Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
1949        Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
1950        Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
1951        Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1952        Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1953        Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1954        Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1955        Datum::Float32(_) => 1 + size_of::<f32>(),
1956        Datum::Float64(_) => 1 + size_of::<f64>(),
1957        Datum::Date(_) => 1 + size_of::<i32>(),
1958        Datum::Time(_) => 1 + 8,
1959        Datum::Timestamp(t) => {
1960            1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
1961                8
1962            } else {
1963                16
1964            }
1965        }
1966        Datum::TimestampTz(t) => {
1967            1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
1968                8
1969            } else {
1970                16
1971            }
1972        }
1973        Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
1974        Datum::Bytes(bytes) => {
1975            // We use a variable length representation of slice length.
1976            let bytes_for_length = match bytes.len() {
1977                0..TINY => 1,
1978                TINY..SHORT => 2,
1979                SHORT..LONG => 4,
1980                _ => 8,
1981            };
1982            1 + bytes_for_length + bytes.len()
1983        }
1984        Datum::String(string) => {
1985            // We use a variable length representation of slice length.
1986            let bytes_for_length = match string.len() {
1987                0..TINY => 1,
1988                TINY..SHORT => 2,
1989                SHORT..LONG => 4,
1990                _ => 8,
1991            };
1992            1 + bytes_for_length + string.len()
1993        }
1994        Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
1995        Datum::Array(array) => {
1996            1 + size_of::<u8>()
1997                + array.dims.data.len()
1998                + size_of::<u64>()
1999                + array.elements.data.len()
2000        }
2001        Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
2002        Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
2003        Datum::JsonNull => 1,
2004        Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
2005        Datum::Dummy => 1,
2006        Datum::Numeric(d) => {
2007            let mut d = d.0.clone();
2008            // Values must be reduced to determine appropriate number of
2009            // coefficient units.
2010            numeric::cx_datum().reduce(&mut d);
2011            // 4 = 1 bit each for tag, digits, exponent, bits
2012            4 + (d.coefficient_units().len() * 2)
2013        }
2014        Datum::Range(Range { inner }) => {
2015            // Tag + flags
2016            2 + match inner {
2017                None => 0,
2018                Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
2019                    .iter()
2020                    .map(|bound| match bound {
2021                        None => 0,
2022                        Some(bound) => bound.val.len(),
2023                    })
2024                    .sum(),
2025            }
2026        }
2027        Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
2028        Datum::AclItem(_) => 1 + AclItem::binary_size(),
2029    }
2030}
2031
2032/// Number of bytes required by a sequence of datums.
2033///
2034/// This method can be used to right-size the allocation for a `Row`
2035/// before calling [`RowPacker::extend`].
2036pub fn datums_size<'a, I, D>(iter: I) -> usize
2037where
2038    I: IntoIterator<Item = D>,
2039    D: Borrow<Datum<'a>>,
2040{
2041    iter.into_iter().map(|d| datum_size(d.borrow())).sum()
2042}
2043
2044/// Number of bytes required by a list of datums. This computes the size that would be required if
2045/// the given datums were packed into a list.
2046///
2047/// This is used to optimistically pre-allocate buffers for packing rows.
2048pub fn datum_list_size<'a, I, D>(iter: I) -> usize
2049where
2050    I: IntoIterator<Item = D>,
2051    D: Borrow<Datum<'a>>,
2052{
2053    1 + size_of::<u64>() + datums_size(iter)
2054}
2055
2056impl RowPacker<'_> {
2057    /// Constructs a row packer that will pack additional datums into the
2058    /// provided row.
2059    ///
2060    /// This function is intentionally somewhat inconvenient to call. You
2061    /// usually want to call [`Row::packer`] instead to start packing from
2062    /// scratch.
2063    pub fn for_existing_row(row: &mut Row) -> RowPacker<'_> {
2064        RowPacker { row }
2065    }
2066
2067    /// Extend an existing `Row` with a `Datum`.
2068    #[inline]
2069    pub fn push<'a, D>(&mut self, datum: D)
2070    where
2071        D: Borrow<Datum<'a>>,
2072    {
2073        push_datum(&mut self.row.data, *datum.borrow());
2074    }
2075
2076    /// Extend an existing `Row` with additional `Datum`s.
2077    #[inline]
2078    pub fn extend<'a, I, D>(&mut self, iter: I)
2079    where
2080        I: IntoIterator<Item = D>,
2081        D: Borrow<Datum<'a>>,
2082    {
2083        for datum in iter {
2084            push_datum(&mut self.row.data, *datum.borrow())
2085        }
2086    }
2087
2088    /// Extend an existing `Row` with additional `Datum`s.
2089    ///
2090    /// In the case the iterator produces an error, the pushing of
2091    /// datums in terminated and the error returned. The `Row` will
2092    /// be incomplete, but it will be safe to read datums from it.
2093    #[inline]
2094    pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
2095    where
2096        I: IntoIterator<Item = Result<D, E>>,
2097        D: Borrow<Datum<'a>>,
2098    {
2099        for datum in iter {
2100            push_datum(&mut self.row.data, *datum?.borrow());
2101        }
2102        Ok(())
2103    }
2104
2105    /// Appends the datums of an entire `Row`.
2106    pub fn extend_by_row(&mut self, row: &Row) {
2107        self.row.data.extend_from_slice(row.data.as_slice());
2108    }
2109
2110    /// Appends the slice of data representing an entire `Row`. The data is not validated.
2111    ///
2112    /// # Safety
2113    ///
2114    /// The requirements from [`Row::from_bytes_unchecked`] apply here, too:
2115    /// This method relies on `data` being an appropriate row encoding, and can
2116    /// result in unsafety if this is not the case.
2117    #[inline]
2118    pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
2119        self.row.data.extend_from_slice(data)
2120    }
2121
2122    /// Pushes a [`DatumList`] that is built from a closure.
2123    ///
2124    /// The supplied closure will be invoked once with a `Row` that can be used
2125    /// to populate the list. It is valid to call any method on the
2126    /// [`RowPacker`] except for [`RowPacker::clear`], [`RowPacker::truncate`],
2127    /// or [`RowPacker::truncate_datums`].
2128    ///
2129    /// Returns the value returned by the closure, if any.
2130    ///
2131    /// ```
2132    /// # use mz_repr::{Row, Datum};
2133    /// let mut row = Row::default();
2134    /// row.packer().push_list_with(|row| {
2135    ///     row.push(Datum::String("age"));
2136    ///     row.push(Datum::Int64(42));
2137    /// });
2138    /// assert_eq!(
2139    ///     row.unpack_first().unwrap_list().iter().collect::<Vec<_>>(),
2140    ///     vec![Datum::String("age"), Datum::Int64(42)],
2141    /// );
2142    /// ```
2143    #[inline]
2144    pub fn push_list_with<F, R>(&mut self, f: F) -> R
2145    where
2146        F: FnOnce(&mut RowPacker) -> R,
2147    {
2148        // First, assume that the list will fit in 255 bytes, and thus the length will fit in
2149        // 1 byte. If not, we'll fix it up later.
2150        let start = self.row.data.len();
2151        self.row.data.push(Tag::ListTiny.into());
2152        // Write a dummy len, will fix it up later.
2153        self.row.data.push(0);
2154
2155        let out = f(self);
2156
2157        // The `- 1 - 1` is for the tag and the len.
2158        let len = self.row.data.len() - start - 1 - 1;
2159        // We now know the real len.
2160        if len < TINY {
2161            // If the len fits in 1 byte, we just need to fix up the len.
2162            self.row.data[start + 1] = len.to_le_bytes()[0];
2163        } else {
2164            // Note: We move this code path into its own function, so that the common case can be
2165            // inlined.
2166            long_list(&mut self.row.data, start, len);
2167        }
2168
2169        /// 1. Fix up the tag.
2170        /// 2. Move the actual data a bit (for which we also need to make room at the end).
2171        /// 3. Fix up the len.
2172        /// `data`: The row's backing data.
2173        /// `start`: where `push_list_with` started writing in `data`.
2174        /// `len`: the length of the data, excluding the tag and the length.
2175        #[cold]
2176        fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2177            // `len_len`: the length of the length. (Possible values are: 2, 4, 8. 1 is handled
2178            // elsewhere.) The other parameters are the same as for `long_list`.
2179            let long_list_inner = |data: &mut CompactBytes, len_len| {
2180                // We'll need memory for the new, bigger length, so make the `CompactBytes` bigger.
2181                // The `- 1` is because the old length was 1 byte.
2182                const ZEROS: [u8; 8] = [0; 8];
2183                data.extend_from_slice(&ZEROS[0..len_len - 1]);
2184                // Move the data to the end of the `CompactBytes`, to make space for the new length.
2185                // Originally, it started after the 1-byte tag and the 1-byte length, now it will
2186                // start after the 1-byte tag and the len_len-byte length.
2187                //
2188                // Note that this is the only operation in `long_list` whose cost is proportional
2189                // to `len`. Since `len` is at least 256 here, the other operations' cost are
2190                // negligible. `copy_within` is a memmove, which is probably a fair bit faster per
2191                // Datum than a Datum encoding in the `f` closure.
2192                data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2193                // Write the new length.
2194                data[start + 1..start + 1 + len_len]
2195                    .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2196            };
2197            match len {
2198                0..TINY => {
2199                    unreachable!()
2200                }
2201                TINY..SHORT => {
2202                    data[start] = Tag::ListShort.into();
2203                    long_list_inner(data, 2);
2204                }
2205                SHORT..LONG => {
2206                    data[start] = Tag::ListLong.into();
2207                    long_list_inner(data, 4);
2208                }
2209                _ => {
2210                    data[start] = Tag::ListHuge.into();
2211                    long_list_inner(data, 8);
2212                }
2213            };
2214        }
2215
2216        out
2217    }
2218
2219    /// Pushes a [`DatumMap`] that is built from a closure.
2220    ///
2221    /// The supplied closure will be invoked once with a `Row` that can be used
2222    /// to populate the dict.
2223    ///
2224    /// The closure **must** alternate pushing string keys and arbitrary values,
2225    /// otherwise reading the dict will cause a panic.
2226    ///
2227    /// The closure **must** push keys in ascending order, otherwise equality
2228    /// checks on the resulting `Row` may be wrong and reading the dict IN DEBUG
2229    /// MODE will cause a panic.
2230    ///
2231    /// The closure **must not** call [`RowPacker::clear`],
2232    /// [`RowPacker::truncate`], or [`RowPacker::truncate_datums`].
2233    ///
2234    /// # Example
2235    ///
2236    /// ```
2237    /// # use mz_repr::{Row, Datum};
2238    /// let mut row = Row::default();
2239    /// row.packer().push_dict_with(|row| {
2240    ///
2241    ///     // key
2242    ///     row.push(Datum::String("age"));
2243    ///     // value
2244    ///     row.push(Datum::Int64(42));
2245    ///
2246    ///     // key
2247    ///     row.push(Datum::String("name"));
2248    ///     // value
2249    ///     row.push(Datum::String("bob"));
2250    /// });
2251    /// assert_eq!(
2252    ///     row.unpack_first().unwrap_map().iter().collect::<Vec<_>>(),
2253    ///     vec![("age", Datum::Int64(42)), ("name", Datum::String("bob"))]
2254    /// );
2255    /// ```
2256    pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2257    where
2258        F: FnOnce(&mut RowPacker) -> R,
2259    {
2260        self.row.data.push(Tag::Dict.into());
2261        let start = self.row.data.len();
2262        // write a dummy len, will fix it up later
2263        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2264
2265        let res = f(self);
2266
2267        let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2268        // fix up the len
2269        self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2270
2271        res
2272    }
2273
2274    /// Like [`RowPacker::push_dict_with`], but accepts a fallible closure.
2275    pub fn try_push_dict_with<F, E>(&mut self, f: F) -> Result<(), E>
2276    where
2277        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2278    {
2279        self.push_dict_with(f)
2280    }
2281
2282    /// Convenience function to construct an array from an iter of `Datum`s.
2283    ///
2284    /// Returns an error if the number of elements in `iter` does not match
2285    /// the cardinality of the array as described by `dims`, or if the
2286    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2287    /// occurs, the packer's state will be unchanged.
2288    pub fn try_push_array<'a, I, D>(
2289        &mut self,
2290        dims: &[ArrayDimension],
2291        iter: I,
2292    ) -> Result<(), InvalidArrayError>
2293    where
2294        I: IntoIterator<Item = D>,
2295        D: Borrow<Datum<'a>>,
2296    {
2297        // SAFETY: The function returns the exact number of elements pushed into the array.
2298        unsafe {
2299            self.push_array_with_unchecked(dims, |packer| {
2300                let mut nelements = 0;
2301                for datum in iter {
2302                    packer.push(datum);
2303                    nelements += 1;
2304                }
2305                Ok::<_, InvalidArrayError>(nelements)
2306            })
2307        }
2308    }
2309
2310    /// Convenience function to construct an array from a function. The function must return the
2311    /// number of elements it pushed into the array. It is undefined behavior if the function returns
2312    /// a number different to the number of elements it pushed.
2313    ///
2314    /// Returns an error if the number of elements pushed by `f` does not match
2315    /// the cardinality of the array as described by `dims`, or if the
2316    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`], or if `f` errors. If an error
2317    /// occurs, the packer's state will be unchanged.
2318    pub unsafe fn push_array_with_unchecked<F, E>(
2319        &mut self,
2320        dims: &[ArrayDimension],
2321        f: F,
2322    ) -> Result<(), E>
2323    where
2324        F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2325        E: From<InvalidArrayError>,
2326    {
2327        // Arrays are encoded as follows.
2328        //
2329        // u8    ndims
2330        // u64   dim_0 lower bound
2331        // u64   dim_0 length
2332        // ...
2333        // u64   dim_n lower bound
2334        // u64   dim_n length
2335        // u64   element data size in bytes
2336        // u8    element data, where elements are encoded in row-major order
2337
2338        if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2339            return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2340        }
2341
2342        let start = self.row.data.len();
2343        self.row.data.push(Tag::Array.into());
2344
2345        // Write dimension information.
2346        self.row
2347            .data
2348            .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2349        for dim in dims {
2350            self.row
2351                .data
2352                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2353            self.row
2354                .data
2355                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2356        }
2357
2358        // Write elements.
2359        let off = self.row.data.len();
2360        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2361        let nelements = match f(self) {
2362            Ok(nelements) => nelements,
2363            Err(e) => {
2364                self.row.data.truncate(start);
2365                return Err(e);
2366            }
2367        };
2368        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2369        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2370
2371        // Check that the number of elements written matches the dimension
2372        // information.
2373        let cardinality = match dims {
2374            [] => 0,
2375            dims => dims.iter().map(|d| d.length).product(),
2376        };
2377        if nelements != cardinality {
2378            self.row.data.truncate(start);
2379            return Err(InvalidArrayError::WrongCardinality {
2380                actual: nelements,
2381                expected: cardinality,
2382            }
2383            .into());
2384        }
2385
2386        Ok(())
2387    }
2388
2389    /// Pushes an [`Array`] that is built from a closure.
2390    ///
2391    /// __WARNING__: This is fairly "sharp" tool that is easy to get wrong. You
2392    /// should prefer [`RowPacker::try_push_array`] when possible.
2393    ///
2394    /// Returns an error if the number of elements pushed does not match
2395    /// the cardinality of the array as described by `dims`, or if the
2396    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2397    /// occurs, the packer's state will be unchanged.
2398    pub fn push_array_with_row_major<F, I>(
2399        &mut self,
2400        dims: I,
2401        f: F,
2402    ) -> Result<(), InvalidArrayError>
2403    where
2404        I: IntoIterator<Item = ArrayDimension>,
2405        F: FnOnce(&mut RowPacker) -> usize,
2406    {
2407        let start = self.row.data.len();
2408        self.row.data.push(Tag::Array.into());
2409
2410        // Write dummy dimension length for now, we'll fix it up.
2411        let dims_start = self.row.data.len();
2412        self.row.data.push(42);
2413
2414        let mut num_dims: u8 = 0;
2415        let mut cardinality: usize = 1;
2416        for dim in dims {
2417            num_dims += 1;
2418            cardinality *= dim.length;
2419
2420            self.row
2421                .data
2422                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2423            self.row
2424                .data
2425                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2426        }
2427
2428        if num_dims > MAX_ARRAY_DIMENSIONS {
2429            // Reset the packer state so we don't have invalid data.
2430            self.row.data.truncate(start);
2431            return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2432        }
2433        // Fix up our dimension length.
2434        self.row.data[dims_start..dims_start + size_of::<u8>()]
2435            .copy_from_slice(&num_dims.to_le_bytes());
2436
2437        // Write elements.
2438        let off = self.row.data.len();
2439        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2440
2441        let nelements = f(self);
2442
2443        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2444        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2445
2446        // Check that the number of elements written matches the dimension
2447        // information.
2448        let cardinality = match num_dims {
2449            0 => 0,
2450            _ => cardinality,
2451        };
2452        if nelements != cardinality {
2453            self.row.data.truncate(start);
2454            return Err(InvalidArrayError::WrongCardinality {
2455                actual: nelements,
2456                expected: cardinality,
2457            });
2458        }
2459
2460        Ok(())
2461    }
2462
2463    /// Convenience function to push a `DatumList` from an iter of `Datum`s
2464    ///
2465    /// See [`RowPacker::push_dict_with`] if you need to be able to handle errors
2466    pub fn push_list<'a, I, D>(&mut self, iter: I)
2467    where
2468        I: IntoIterator<Item = D>,
2469        D: Borrow<Datum<'a>>,
2470    {
2471        self.push_list_with(|packer| {
2472            for elem in iter {
2473                packer.push(*elem.borrow())
2474            }
2475        });
2476    }
2477
2478    /// Convenience function to push a `DatumMap` from an iter of `(&str, Datum)` pairs
2479    pub fn push_dict<'a, I, D>(&mut self, iter: I)
2480    where
2481        I: IntoIterator<Item = (&'a str, D)>,
2482        D: Borrow<Datum<'a>>,
2483    {
2484        self.push_dict_with(|packer| {
2485            for (k, v) in iter {
2486                packer.push(Datum::String(k));
2487                packer.push(*v.borrow())
2488            }
2489        })
2490    }
2491
2492    /// Pushes a `Datum::Range` derived from the `Range<Datum<'a>`.
2493    ///
2494    /// # Panics
2495    /// - If lower and upper express finite values and they are datums of
2496    ///   different types.
2497    /// - If lower or upper express finite values and are equal to
2498    ///   `Datum::Null`. To handle `Datum::Null` properly, use
2499    ///   [`RangeBound::new`].
2500    ///
2501    /// # Notes
2502    /// - This function canonicalizes the range before pushing it to the row.
2503    /// - Prefer this function over `push_range_with` because of its
2504    ///   canonicaliztion.
2505    /// - Prefer creating [`RangeBound`]s using [`RangeBound::new`], which
2506    ///   handles `Datum::Null` in a SQL-friendly way.
2507    pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2508        range.canonicalize()?;
2509        match range.inner {
2510            None => {
2511                self.row.data.push(Tag::Range.into());
2512                // Untagged bytes only contains the `RANGE_EMPTY` flag value.
2513                self.row.data.push(range::InternalFlags::EMPTY.bits());
2514                Ok(())
2515            }
2516            Some(inner) => self.push_range_with(
2517                RangeLowerBound {
2518                    inclusive: inner.lower.inclusive,
2519                    bound: inner
2520                        .lower
2521                        .bound
2522                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2523                },
2524                RangeUpperBound {
2525                    inclusive: inner.upper.inclusive,
2526                    bound: inner
2527                        .upper
2528                        .bound
2529                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2530                },
2531            ),
2532        }
2533    }
2534
2535    /// Pushes a `DatumRange` built from the specified arguments.
2536    ///
2537    /// # Warning
2538    /// Unlike `push_range`, `push_range_with` _does not_ canonicalize its
2539    /// inputs. Consequentially, this means it's possible to generate ranges
2540    /// that will not reflect the proper ordering and equality.
2541    ///
2542    /// # Panics
2543    /// - If lower or upper expresses a finite value and does not push exactly
2544    ///   one value into the `RowPacker`.
2545    /// - If lower and upper express finite values and they are datums of
2546    ///   different types.
2547    /// - If lower or upper express finite values and push `Datum::Null`.
2548    ///
2549    /// # Notes
2550    /// - Prefer `push_range_with` over this function. This function should be
2551    ///   used only when you are not pushing `Datum`s to the inner row.
2552    /// - Range encoding is `[<flag bytes>,<lower>?,<upper>?]`, where `lower`
2553    ///   and `upper` are optional, contingent on the flag value expressing an
2554    ///   empty range (where neither will be present) or infinite bounds (where
2555    ///   each infinite bound will be absent).
2556    /// - To push an emtpy range, use `push_range` using `Range { inner: None }`.
2557    pub fn push_range_with<L, U, E>(
2558        &mut self,
2559        lower: RangeLowerBound<L>,
2560        upper: RangeUpperBound<U>,
2561    ) -> Result<(), E>
2562    where
2563        L: FnOnce(&mut RowPacker) -> Result<(), E>,
2564        U: FnOnce(&mut RowPacker) -> Result<(), E>,
2565        E: From<InvalidRangeError>,
2566    {
2567        let start = self.row.data.len();
2568        self.row.data.push(Tag::Range.into());
2569
2570        let mut flags = range::InternalFlags::empty();
2571
2572        flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2573        flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2574        flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2575        flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2576
2577        let mut expected_datums = 0;
2578
2579        self.row.data.push(flags.bits());
2580
2581        let datum_check = self.row.data.len();
2582
2583        if let Some(value) = lower.bound {
2584            let start = self.row.data.len();
2585            value(self)?;
2586            assert!(
2587                start < self.row.data.len(),
2588                "finite values must each push exactly one value; expected 1 but got 0"
2589            );
2590            expected_datums += 1;
2591        }
2592
2593        if let Some(value) = upper.bound {
2594            let start = self.row.data.len();
2595            value(self)?;
2596            assert!(
2597                start < self.row.data.len(),
2598                "finite values must each push exactly one value; expected 1 but got 0"
2599            );
2600            expected_datums += 1;
2601        }
2602
2603        // Validate the invariants that 0, 1, or 2 elements were pushed, none are Null,
2604        // and if two are pushed then the second is not less than the first. Panic in
2605        // some cases and error in others.
2606        let mut actual_datums = 0;
2607        let mut seen = None;
2608        let mut dataz = &self.row.data[datum_check..];
2609        while !dataz.is_empty() {
2610            let d = unsafe { read_datum(&mut dataz) };
2611            assert!(d != Datum::Null, "cannot push Datum::Null into range");
2612
2613            match seen {
2614                None => seen = Some(d),
2615                Some(seen) => {
2616                    let seen_kind = DatumKind::from(seen);
2617                    let d_kind = DatumKind::from(d);
2618                    assert!(
2619                        seen_kind == d_kind,
2620                        "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2621                    );
2622
2623                    if seen > d {
2624                        self.row.data.truncate(start);
2625                        return Err(InvalidRangeError::MisorderedRangeBounds.into());
2626                    }
2627                }
2628            }
2629            actual_datums += 1;
2630        }
2631
2632        assert!(
2633            actual_datums == expected_datums,
2634            "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2635        );
2636
2637        Ok(())
2638    }
2639
2640    /// Clears the contents of the packer without de-allocating its backing memory.
2641    pub fn clear(&mut self) {
2642        self.row.data.clear();
2643    }
2644
2645    /// Truncates the underlying storage to the specified byte position.
2646    ///
2647    /// # Safety
2648    ///
2649    /// `pos` MUST specify a byte offset that lies on a datum boundary.
2650    /// If `pos` specifies a byte offset that is *within* a datum, the row
2651    /// packer will produce an invalid row, the unpacking of which may
2652    /// trigger undefined behavior!
2653    ///
2654    /// To find the byte offset of a datum boundary, inspect the packer's
2655    /// byte length by calling `packer.data().len()` after pushing the desired
2656    /// number of datums onto the packer.
2657    pub unsafe fn truncate(&mut self, pos: usize) {
2658        self.row.data.truncate(pos)
2659    }
2660
2661    /// Truncates the underlying row to contain at most the first `n` datums.
2662    pub fn truncate_datums(&mut self, n: usize) {
2663        let prev_len = self.row.data.len();
2664        let mut iter = self.row.iter();
2665        for _ in iter.by_ref().take(n) {}
2666        let next_len = iter.data.len();
2667        // SAFETY: iterator offsets always lie on a datum boundary.
2668        unsafe { self.truncate(prev_len - next_len) }
2669    }
2670
2671    /// Returns the total amount of bytes used by the underlying row.
2672    pub fn byte_len(&self) -> usize {
2673        self.row.byte_len()
2674    }
2675}
2676
2677impl<'a> IntoIterator for &'a Row {
2678    type Item = Datum<'a>;
2679    type IntoIter = DatumListIter<'a>;
2680    fn into_iter(self) -> DatumListIter<'a> {
2681        self.iter()
2682    }
2683}
2684
2685impl fmt::Debug for Row {
2686    /// Debug representation using the internal datums
2687    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2688        f.write_str("Row{")?;
2689        f.debug_list().entries(self.iter()).finish()?;
2690        f.write_str("}")
2691    }
2692}
2693
2694impl fmt::Display for Row {
2695    /// Display representation using the internal datums
2696    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2697        f.write_str("(")?;
2698        for (i, datum) in self.iter().enumerate() {
2699            if i != 0 {
2700                f.write_str(", ")?;
2701            }
2702            write!(f, "{}", datum)?;
2703        }
2704        f.write_str(")")
2705    }
2706}
2707
2708impl<'a> DatumList<'a> {
2709    pub fn empty() -> DatumList<'static> {
2710        DatumList { data: &[] }
2711    }
2712
2713    pub fn iter(&self) -> DatumListIter<'a> {
2714        DatumListIter { data: self.data }
2715    }
2716
2717    /// For debugging only
2718    pub fn data(&self) -> &'a [u8] {
2719        self.data
2720    }
2721}
2722
2723impl<'a> IntoIterator for DatumList<'a> {
2724    type Item = Datum<'a>;
2725    type IntoIter = DatumListIter<'a>;
2726    fn into_iter(self) -> DatumListIter<'a> {
2727        self.iter()
2728    }
2729}
2730
2731impl<'a> Iterator for DatumListIter<'a> {
2732    type Item = Datum<'a>;
2733    fn next(&mut self) -> Option<Self::Item> {
2734        if self.data.is_empty() {
2735            None
2736        } else {
2737            Some(unsafe { read_datum(&mut self.data) })
2738        }
2739    }
2740}
2741
2742impl<'a> DatumMap<'a> {
2743    pub fn empty() -> DatumMap<'static> {
2744        DatumMap { data: &[] }
2745    }
2746
2747    pub fn iter(&self) -> DatumDictIter<'a> {
2748        DatumDictIter {
2749            data: self.data,
2750            prev_key: None,
2751        }
2752    }
2753
2754    /// For debugging only
2755    pub fn data(&self) -> &'a [u8] {
2756        self.data
2757    }
2758}
2759
2760impl<'a> Debug for DatumMap<'a> {
2761    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2762        f.debug_map().entries(self.iter()).finish()
2763    }
2764}
2765
2766impl<'a> IntoIterator for &'a DatumMap<'a> {
2767    type Item = (&'a str, Datum<'a>);
2768    type IntoIter = DatumDictIter<'a>;
2769    fn into_iter(self) -> DatumDictIter<'a> {
2770        self.iter()
2771    }
2772}
2773
2774impl<'a> Iterator for DatumDictIter<'a> {
2775    type Item = (&'a str, Datum<'a>);
2776    fn next(&mut self) -> Option<Self::Item> {
2777        if self.data.is_empty() {
2778            None
2779        } else {
2780            let key_tag =
2781                Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2782            assert!(
2783                key_tag == Tag::StringTiny
2784                    || key_tag == Tag::StringShort
2785                    || key_tag == Tag::StringLong
2786                    || key_tag == Tag::StringHuge,
2787                "Dict keys must be strings, got {:?}",
2788                key_tag
2789            );
2790            let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
2791            let val = unsafe { read_datum(&mut self.data) };
2792
2793            // if in debug mode, sanity check keys
2794            if cfg!(debug_assertions) {
2795                if let Some(prev_key) = self.prev_key {
2796                    debug_assert!(
2797                        prev_key < key,
2798                        "Dict keys must be unique and given in ascending order: {} came before {}",
2799                        prev_key,
2800                        key
2801                    );
2802                }
2803                self.prev_key = Some(key);
2804            }
2805
2806            Some((key, val))
2807        }
2808    }
2809}
2810
2811impl RowArena {
2812    pub fn new() -> Self {
2813        RowArena {
2814            inner: RefCell::new(vec![]),
2815        }
2816    }
2817
2818    /// Creates a `RowArena` with a hint of how many rows will be created in the arena, to avoid
2819    /// reallocations of its internal vector.
2820    pub fn with_capacity(capacity: usize) -> Self {
2821        RowArena {
2822            inner: RefCell::new(Vec::with_capacity(capacity)),
2823        }
2824    }
2825
2826    /// Does a `reserve` on the underlying `Vec`. Call this when you expect `additional` more datums
2827    /// to be created in this arena.
2828    pub fn reserve(&self, additional: usize) {
2829        self.inner.borrow_mut().reserve(additional);
2830    }
2831
2832    /// Take ownership of `bytes` for the lifetime of the arena.
2833    #[allow(clippy::transmute_ptr_to_ptr)]
2834    pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
2835        let mut inner = self.inner.borrow_mut();
2836        inner.push(bytes);
2837        let owned_bytes = &inner[inner.len() - 1];
2838        unsafe {
2839            // This is safe because:
2840            //   * We only ever append to self.inner, so the byte vector
2841            //     will live as long as the arena.
2842            //   * We return a reference to the byte vector's contents, so it's
2843            //     okay if self.inner reallocates and moves the byte
2844            //     vector.
2845            //   * We don't allow access to the byte vector itself, so it will
2846            //     never reallocate.
2847            transmute::<&[u8], &'a [u8]>(owned_bytes)
2848        }
2849    }
2850
2851    /// Take ownership of `string` for the lifetime of the arena.
2852    pub fn push_string<'a>(&'a self, string: String) -> &'a str {
2853        let owned_bytes = self.push_bytes(string.into_bytes());
2854        unsafe {
2855            // This is safe because we know it was a `String` just before.
2856            std::str::from_utf8_unchecked(owned_bytes)
2857        }
2858    }
2859
2860    /// Take ownership of `row` for the lifetime of the arena, returning a
2861    /// reference to the first datum in the row.
2862    ///
2863    /// If we had an owned datum type, this method would be much clearer, and
2864    /// would be called `push_owned_datum`.
2865    pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
2866        let mut inner = self.inner.borrow_mut();
2867        inner.push(row.data.into_vec());
2868        unsafe {
2869            // This is safe because:
2870            //   * We only ever append to self.inner, so the row data will live
2871            //     as long as the arena.
2872            //   * We force the row data into its own heap allocation--
2873            //     importantly, we do NOT store the SmallVec, which might be
2874            //     storing data inline--so it's okay if self.inner reallocates
2875            //     and moves the row.
2876            //   * We don't allow access to the byte vector itself, so it will
2877            //     never reallocate.
2878            let datum = read_datum(&mut &inner[inner.len() - 1][..]);
2879            transmute::<Datum<'_>, Datum<'a>>(datum)
2880        }
2881    }
2882
2883    /// Equivalent to `push_unary_row` but returns a `DatumNested` rather than a
2884    /// `Datum`.
2885    fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
2886        let mut inner = self.inner.borrow_mut();
2887        inner.push(row.data.into_vec());
2888        unsafe {
2889            // This is safe because:
2890            //   * We only ever append to self.inner, so the row data will live
2891            //     as long as the arena.
2892            //   * We force the row data into its own heap allocation--
2893            //     importantly, we do NOT store the SmallVec, which might be
2894            //     storing data inline--so it's okay if self.inner reallocates
2895            //     and moves the row.
2896            //   * We don't allow access to the byte vector itself, so it will
2897            //     never reallocate.
2898            let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
2899            transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
2900        }
2901    }
2902
2903    /// Convenience function to make a new `Row` containing a single datum, and
2904    /// take ownership of it for the lifetime of the arena
2905    ///
2906    /// ```
2907    /// # use mz_repr::{RowArena, Datum};
2908    /// let arena = RowArena::new();
2909    /// let datum = arena.make_datum(|packer| {
2910    ///   packer.push_list(&[Datum::String("hello"), Datum::String("world")]);
2911    /// });
2912    /// assert_eq!(datum.unwrap_list().iter().collect::<Vec<_>>(), vec![Datum::String("hello"), Datum::String("world")]);
2913    /// ```
2914    pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
2915    where
2916        F: FnOnce(&mut RowPacker),
2917    {
2918        let mut row = Row::default();
2919        f(&mut row.packer());
2920        self.push_unary_row(row)
2921    }
2922
2923    /// Convenience function identical to `make_datum` but instead returns a
2924    /// `DatumNested`.
2925    pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
2926    where
2927        F: FnOnce(&mut RowPacker),
2928    {
2929        let mut row = Row::default();
2930        f(&mut row.packer());
2931        self.push_unary_row_datum_nested(row)
2932    }
2933
2934    /// Like [`RowArena::make_datum`], but the provided closure can return an error.
2935    pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
2936    where
2937        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2938    {
2939        let mut row = Row::default();
2940        f(&mut row.packer())?;
2941        Ok(self.push_unary_row(row))
2942    }
2943
2944    /// Clear the contents of the arena.
2945    pub fn clear(&mut self) {
2946        self.inner.borrow_mut().clear();
2947    }
2948}
2949
2950impl Default for RowArena {
2951    fn default() -> RowArena {
2952        RowArena::new()
2953    }
2954}
2955
2956/// A thread-local row, which can be borrowed and returned.
2957/// # Example
2958///
2959/// Use this type instead of creating a new row:
2960/// ```
2961/// use mz_repr::SharedRow;
2962///
2963/// let mut row_builder = SharedRow::get();
2964/// ```
2965///
2966/// This allows us to reuse an existing row allocation instead of creating a new one or retaining
2967/// an allocation locally. Additionally, we can observe the size of the local row in a central
2968/// place and potentially reallocate to reduce memory needs.
2969///
2970/// # Panic
2971///
2972/// [`SharedRow::get`] panics when trying to obtain multiple references to the shared row.
2973#[derive(Debug)]
2974pub struct SharedRow(Row);
2975
2976impl SharedRow {
2977    thread_local! {
2978        /// A thread-local slot containing a shared Row that can be temporarily used by a function.
2979        /// There can be at most one active user of this Row, which is tracked by the state of the
2980        /// `Option<_>` wrapper. When it is `Some(..)`, the row is available for using. When it
2981        /// is `None`, it is not, and the constructor will panic if a thread attempts to use it.
2982        static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
2983    }
2984
2985    /// Get the shared row.
2986    ///
2987    /// The row's contents are cleared before returning it.
2988    ///
2989    /// # Panic
2990    ///
2991    /// Panics when the row is already borrowed elsewhere.
2992    pub fn get() -> Self {
2993        let mut row = Self::SHARED_ROW
2994            .take()
2995            .expect("attempted to borrow already borrowed SharedRow");
2996        // Clear row
2997        row.packer();
2998        Self(row)
2999    }
3000
3001    /// Gets the shared row and uses it to pack `iter`.
3002    pub fn pack<'a, I, D>(iter: I) -> Row
3003    where
3004        I: IntoIterator<Item = D>,
3005        D: Borrow<Datum<'a>>,
3006    {
3007        let mut row_builder = Self::get();
3008        let mut row_packer = row_builder.packer();
3009        row_packer.extend(iter);
3010        row_builder.clone()
3011    }
3012}
3013
3014impl std::ops::Deref for SharedRow {
3015    type Target = Row;
3016
3017    fn deref(&self) -> &Self::Target {
3018        &self.0
3019    }
3020}
3021
3022impl std::ops::DerefMut for SharedRow {
3023    fn deref_mut(&mut self) -> &mut Self::Target {
3024        &mut self.0
3025    }
3026}
3027
3028impl Drop for SharedRow {
3029    fn drop(&mut self) {
3030        // Take the Row allocation from this instance and put it back in the thread local slot for
3031        // the next user. The Row in `self` is replaced with an empty Row which does not allocate.
3032        Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
3033    }
3034}
3035
3036#[cfg(test)]
3037mod tests {
3038    use std::cmp::Ordering;
3039    use std::collections::hash_map::DefaultHasher;
3040    use std::hash::{Hash, Hasher};
3041
3042    use chrono::{DateTime, NaiveDate};
3043    use itertools::Itertools;
3044    use mz_ore::{assert_err, assert_none};
3045    use ordered_float::OrderedFloat;
3046
3047    use crate::SqlScalarType;
3048
3049    use super::*;
3050
3051    fn hash<T: Hash>(t: &T) -> u64 {
3052        let mut hasher = DefaultHasher::new();
3053        t.hash(&mut hasher);
3054        hasher.finish()
3055    }
3056
3057    #[mz_ore::test]
3058    fn test_assumptions() {
3059        assert_eq!(size_of::<Tag>(), 1);
3060        #[cfg(target_endian = "big")]
3061        {
3062            // if you want to run this on a big-endian cpu, we'll need big-endian versions of the serialization code
3063            assert!(false);
3064        }
3065    }
3066
3067    #[mz_ore::test]
3068    fn miri_test_arena() {
3069        let arena = RowArena::new();
3070
3071        assert_eq!(arena.push_string("".to_owned()), "");
3072        assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
3073
3074        let empty: &[u8] = &[];
3075        assert_eq!(arena.push_bytes(vec![]), empty);
3076        assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
3077
3078        let mut row = Row::default();
3079        let mut packer = row.packer();
3080        packer.push_dict_with(|row| {
3081            row.push(Datum::String("a"));
3082            row.push_list_with(|row| {
3083                row.push(Datum::String("one"));
3084                row.push(Datum::String("two"));
3085                row.push(Datum::String("three"));
3086            });
3087            row.push(Datum::String("b"));
3088            row.push(Datum::String("c"));
3089        });
3090        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
3091    }
3092
3093    #[mz_ore::test]
3094    fn miri_test_round_trip() {
3095        fn round_trip(datums: Vec<Datum>) {
3096            let row = Row::pack(datums.clone());
3097
3098            // When run under miri this catches undefined bytes written to data
3099            // eg by calling push_copy! on a type which contains undefined padding values
3100            println!("{:?}", row.data());
3101
3102            let datums2 = row.iter().collect::<Vec<_>>();
3103            let datums3 = row.unpack();
3104            assert_eq!(datums, datums2);
3105            assert_eq!(datums, datums3);
3106        }
3107
3108        round_trip(vec![]);
3109        round_trip(
3110            SqlScalarType::enumerate()
3111                .iter()
3112                .flat_map(|r#type| r#type.interesting_datums())
3113                .collect(),
3114        );
3115        round_trip(vec![
3116            Datum::Null,
3117            Datum::Null,
3118            Datum::False,
3119            Datum::True,
3120            Datum::Int16(-21),
3121            Datum::Int32(-42),
3122            Datum::Int64(-2_147_483_648 - 42),
3123            Datum::UInt8(0),
3124            Datum::UInt8(1),
3125            Datum::UInt16(0),
3126            Datum::UInt16(1),
3127            Datum::UInt16(1 << 8),
3128            Datum::UInt32(0),
3129            Datum::UInt32(1),
3130            Datum::UInt32(1 << 8),
3131            Datum::UInt32(1 << 16),
3132            Datum::UInt32(1 << 24),
3133            Datum::UInt64(0),
3134            Datum::UInt64(1),
3135            Datum::UInt64(1 << 8),
3136            Datum::UInt64(1 << 16),
3137            Datum::UInt64(1 << 24),
3138            Datum::UInt64(1 << 32),
3139            Datum::UInt64(1 << 40),
3140            Datum::UInt64(1 << 48),
3141            Datum::UInt64(1 << 56),
3142            Datum::Float32(OrderedFloat::from(-42.12)),
3143            Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
3144            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
3145            Datum::Timestamp(
3146                CheckedTimestamp::from_timestamplike(
3147                    NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
3148                        .unwrap()
3149                        .and_hms_opt(14, 32, 11)
3150                        .unwrap(),
3151                )
3152                .unwrap(),
3153            ),
3154            Datum::TimestampTz(
3155                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3156                    .unwrap(),
3157            ),
3158            Datum::Interval(Interval {
3159                months: 312,
3160                ..Default::default()
3161            }),
3162            Datum::Interval(Interval::new(0, 0, 1_012_312)),
3163            Datum::Bytes(&[]),
3164            Datum::Bytes(&[0, 2, 1, 255]),
3165            Datum::String(""),
3166            Datum::String("العَرَبِيَّة"),
3167        ]);
3168    }
3169
3170    #[mz_ore::test]
3171    fn test_array() {
3172        // Construct an array using `Row::push_array` and verify that it unpacks
3173        // correctly.
3174        const DIM: ArrayDimension = ArrayDimension {
3175            lower_bound: 2,
3176            length: 2,
3177        };
3178        let mut row = Row::default();
3179        let mut packer = row.packer();
3180        packer
3181            .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3182            .unwrap();
3183        let arr1 = row.unpack_first().unwrap_array();
3184        assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3185        assert_eq!(
3186            arr1.elements().into_iter().collect::<Vec<_>>(),
3187            vec![Datum::Int32(1), Datum::Int32(2)]
3188        );
3189
3190        // Pack a previously-constructed `Datum::Array` and verify that it
3191        // unpacks correctly.
3192        let row = Row::pack_slice(&[Datum::Array(arr1)]);
3193        let arr2 = row.unpack_first().unwrap_array();
3194        assert_eq!(arr1, arr2);
3195    }
3196
3197    #[mz_ore::test]
3198    fn test_multidimensional_array() {
3199        let datums = vec![
3200            Datum::Int32(1),
3201            Datum::Int32(2),
3202            Datum::Int32(3),
3203            Datum::Int32(4),
3204            Datum::Int32(5),
3205            Datum::Int32(6),
3206            Datum::Int32(7),
3207            Datum::Int32(8),
3208        ];
3209
3210        let mut row = Row::default();
3211        let mut packer = row.packer();
3212        packer
3213            .try_push_array(
3214                &[
3215                    ArrayDimension {
3216                        lower_bound: 1,
3217                        length: 1,
3218                    },
3219                    ArrayDimension {
3220                        lower_bound: 1,
3221                        length: 4,
3222                    },
3223                    ArrayDimension {
3224                        lower_bound: 1,
3225                        length: 2,
3226                    },
3227                ],
3228                &datums,
3229            )
3230            .unwrap();
3231        let array = row.unpack_first().unwrap_array();
3232        assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3233    }
3234
3235    #[mz_ore::test]
3236    fn test_array_max_dimensions() {
3237        let mut row = Row::default();
3238        let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3239
3240        // An array with one too many dimensions should be rejected.
3241        let res = row.packer().try_push_array(
3242            &vec![
3243                ArrayDimension {
3244                    lower_bound: 1,
3245                    length: 1
3246                };
3247                max_dims + 1
3248            ],
3249            vec![Datum::Int32(4)],
3250        );
3251        assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3252        assert!(row.data.is_empty());
3253
3254        // An array with exactly the maximum allowable dimensions should be
3255        // accepted.
3256        row.packer()
3257            .try_push_array(
3258                &vec![
3259                    ArrayDimension {
3260                        lower_bound: 1,
3261                        length: 1
3262                    };
3263                    max_dims
3264                ],
3265                vec![Datum::Int32(4)],
3266            )
3267            .unwrap();
3268    }
3269
3270    #[mz_ore::test]
3271    fn test_array_wrong_cardinality() {
3272        let mut row = Row::default();
3273        let res = row.packer().try_push_array(
3274            &[
3275                ArrayDimension {
3276                    lower_bound: 1,
3277                    length: 2,
3278                },
3279                ArrayDimension {
3280                    lower_bound: 1,
3281                    length: 3,
3282                },
3283            ],
3284            vec![Datum::Int32(1), Datum::Int32(2)],
3285        );
3286        assert_eq!(
3287            res,
3288            Err(InvalidArrayError::WrongCardinality {
3289                actual: 2,
3290                expected: 6,
3291            })
3292        );
3293        assert!(row.data.is_empty());
3294    }
3295
3296    #[mz_ore::test]
3297    fn test_nesting() {
3298        let mut row = Row::default();
3299        row.packer().push_dict_with(|row| {
3300            row.push(Datum::String("favourites"));
3301            row.push_list_with(|row| {
3302                row.push(Datum::String("ice cream"));
3303                row.push(Datum::String("oreos"));
3304                row.push(Datum::String("cheesecake"));
3305            });
3306            row.push(Datum::String("name"));
3307            row.push(Datum::String("bob"));
3308        });
3309
3310        let mut iter = row.unpack_first().unwrap_map().iter();
3311
3312        let (k, v) = iter.next().unwrap();
3313        assert_eq!(k, "favourites");
3314        assert_eq!(
3315            v.unwrap_list().iter().collect::<Vec<_>>(),
3316            vec![
3317                Datum::String("ice cream"),
3318                Datum::String("oreos"),
3319                Datum::String("cheesecake"),
3320            ]
3321        );
3322
3323        let (k, v) = iter.next().unwrap();
3324        assert_eq!(k, "name");
3325        assert_eq!(v, Datum::String("bob"));
3326    }
3327
3328    #[mz_ore::test]
3329    fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3330        let pack = |ok| {
3331            let mut row = Row::default();
3332            row.packer().push_dict_with(|row| {
3333                if ok {
3334                    row.push(Datum::String("key"));
3335                    row.push(Datum::Int32(42));
3336                    Ok(7)
3337                } else {
3338                    Err("fail")
3339                }
3340            })?;
3341            Ok(row)
3342        };
3343
3344        assert_eq!(pack(false), Err("fail"));
3345
3346        let row = pack(true)?;
3347        let mut dict = row.unpack_first().unwrap_map().iter();
3348        assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3349        assert_eq!(dict.next(), None);
3350
3351        Ok(())
3352    }
3353
3354    #[mz_ore::test]
3355    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
3356    fn test_datum_sizes() {
3357        let arena = RowArena::new();
3358
3359        // Test the claims about various datum sizes.
3360        let values_of_interest = vec![
3361            Datum::Null,
3362            Datum::False,
3363            Datum::Int16(0),
3364            Datum::Int32(0),
3365            Datum::Int64(0),
3366            Datum::UInt8(0),
3367            Datum::UInt8(1),
3368            Datum::UInt16(0),
3369            Datum::UInt16(1),
3370            Datum::UInt16(1 << 8),
3371            Datum::UInt32(0),
3372            Datum::UInt32(1),
3373            Datum::UInt32(1 << 8),
3374            Datum::UInt32(1 << 16),
3375            Datum::UInt32(1 << 24),
3376            Datum::UInt64(0),
3377            Datum::UInt64(1),
3378            Datum::UInt64(1 << 8),
3379            Datum::UInt64(1 << 16),
3380            Datum::UInt64(1 << 24),
3381            Datum::UInt64(1 << 32),
3382            Datum::UInt64(1 << 40),
3383            Datum::UInt64(1 << 48),
3384            Datum::UInt64(1 << 56),
3385            Datum::Float32(OrderedFloat(0.0)),
3386            Datum::Float64(OrderedFloat(0.0)),
3387            Datum::from(numeric::Numeric::from(0)),
3388            Datum::from(numeric::Numeric::from(1000)),
3389            Datum::from(numeric::Numeric::from(9999)),
3390            Datum::Date(
3391                NaiveDate::from_ymd_opt(1, 1, 1)
3392                    .unwrap()
3393                    .try_into()
3394                    .unwrap(),
3395            ),
3396            Datum::Timestamp(
3397                CheckedTimestamp::from_timestamplike(
3398                    DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3399                )
3400                .unwrap(),
3401            ),
3402            Datum::TimestampTz(
3403                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3404                    .unwrap(),
3405            ),
3406            Datum::Interval(Interval::default()),
3407            Datum::Bytes(&[]),
3408            Datum::String(""),
3409            Datum::JsonNull,
3410            Datum::Range(Range { inner: None }),
3411            arena.make_datum(|packer| {
3412                packer
3413                    .push_range(Range::new(Some((
3414                        RangeLowerBound::new(Datum::Int32(-1), true),
3415                        RangeUpperBound::new(Datum::Int32(1), true),
3416                    ))))
3417                    .unwrap();
3418            }),
3419        ];
3420        for value in values_of_interest {
3421            if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3422                panic!("Disparity in claimed size for {:?}", value);
3423            }
3424        }
3425    }
3426
3427    #[mz_ore::test]
3428    fn test_range_errors() {
3429        fn test_range_errors_inner<'a>(
3430            datums: Vec<Vec<Datum<'a>>>,
3431        ) -> Result<(), InvalidRangeError> {
3432            let mut row = Row::default();
3433            let row_len = row.byte_len();
3434            let mut packer = row.packer();
3435            let r = packer.push_range_with(
3436                RangeLowerBound {
3437                    inclusive: true,
3438                    bound: Some(|row: &mut RowPacker| {
3439                        for d in &datums[0] {
3440                            row.push(d);
3441                        }
3442                        Ok(())
3443                    }),
3444                },
3445                RangeUpperBound {
3446                    inclusive: true,
3447                    bound: Some(|row: &mut RowPacker| {
3448                        for d in &datums[1] {
3449                            row.push(d);
3450                        }
3451                        Ok(())
3452                    }),
3453                },
3454            );
3455
3456            assert_eq!(row_len, row.byte_len());
3457
3458            r
3459        }
3460
3461        for panicking_case in [
3462            vec![vec![Datum::Int32(1)], vec![]],
3463            vec![
3464                vec![Datum::Int32(1), Datum::Int32(2)],
3465                vec![Datum::Int32(3)],
3466            ],
3467            vec![
3468                vec![Datum::Int32(1)],
3469                vec![Datum::Int32(2), Datum::Int32(3)],
3470            ],
3471            vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3472            vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3473            vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3474            vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3475        ] {
3476            #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
3477            let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3478            assert_err!(result);
3479        }
3480
3481        let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3482        assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3483    }
3484
3485    /// Lists have a variable-length encoding for their lengths. We test each case here.
3486    #[mz_ore::test]
3487    #[cfg_attr(miri, ignore)] // slow
3488    fn test_list_encoding() {
3489        fn test_list_encoding_inner(len: usize) {
3490            let list_elem = |i: usize| {
3491                if i % 2 == 0 {
3492                    Datum::False
3493                } else {
3494                    Datum::True
3495                }
3496            };
3497            let mut row = Row::default();
3498            {
3499                // Push some stuff.
3500                let mut packer = row.packer();
3501                packer.push(Datum::String("start"));
3502                packer.push_list_with(|packer| {
3503                    for i in 0..len {
3504                        packer.push(list_elem(i));
3505                    }
3506                });
3507                packer.push(Datum::String("end"));
3508            }
3509            // Check that we read back exactly what we pushed.
3510            let mut row_it = row.iter();
3511            assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3512            match row_it.next().unwrap() {
3513                Datum::List(list) => {
3514                    let mut list_it = list.iter();
3515                    for i in 0..len {
3516                        assert_eq!(list_it.next().unwrap(), list_elem(i));
3517                    }
3518                    assert_none!(list_it.next());
3519                }
3520                _ => panic!("expected Datum::List"),
3521            }
3522            assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3523            assert_none!(row_it.next());
3524        }
3525
3526        test_list_encoding_inner(0);
3527        test_list_encoding_inner(1);
3528        test_list_encoding_inner(10);
3529        test_list_encoding_inner(TINY - 1); // tiny
3530        test_list_encoding_inner(TINY + 1); // short
3531        test_list_encoding_inner(SHORT + 1); // long
3532
3533        // The biggest one takes 40 s on my laptop, probably not worth it.
3534        //test_list_encoding_inner(LONG + 1); // huge
3535    }
3536
3537    /// Demonstrates that DatumList's Eq (bytewise) and Ord (datum-by-datum) are now consistent.
3538    /// A list containing -0.0 and one containing +0.0 have different byte representations
3539    /// (IEEE 754 distinguishes them), originally Eq says they are not equal. But after
3540    /// using the new Datum::cmp, Eq says they are equal, which matches what Ord
3541    /// compares via iter().cmp(other.iter()), and them as equal.
3542    #[mz_ore::test]
3543    fn test_datum_list_eq_ord_consistency() {
3544        // Build list containing +0.0
3545        let mut row_pos = Row::default();
3546        row_pos.packer().push_list_with(|p| {
3547            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3548        });
3549        let list_pos = row_pos.unpack_first().unwrap_list();
3550
3551        // Build list containing -0.0 (distinct bit pattern from +0.0)
3552        let mut row_neg = Row::default();
3553        row_neg.packer().push_list_with(|p| {
3554            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3555        });
3556        let list_neg = row_neg.unpack_first().unwrap_list();
3557
3558        // Eq is bytewise: different encodings => not equal
3559        // This was a bug in the past, so we test it.
3560        assert_eq!(
3561            list_pos, list_neg,
3562            "Eq should see different encodings as equal"
3563        );
3564
3565        // Ord is datum-by-datum: -0.0 and +0.0 compare equal as Datums
3566        assert_eq!(
3567            list_pos.cmp(&list_neg),
3568            Ordering::Equal,
3569            "Ord (datum-by-datum) should see -0.0 and +0.0 as equal"
3570        );
3571    }
3572
3573    /// Demonstrates that DatumMap's derived Eq (bytewise) can make maps with equal keys and
3574    /// values compare equal when values have different encodings (e.g. -0.0 vs +0.0).
3575    #[mz_ore::test]
3576    fn test_datum_map_eq_bytewise_consistency() {
3577        // Build map {"k": +0.0}
3578        let mut row_pos = Row::default();
3579        row_pos.packer().push_dict_with(|p| {
3580            p.push(Datum::String("k"));
3581            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3582        });
3583        let map_pos = row_pos.unpack_first().unwrap_map();
3584
3585        // Build map {"k": -0.0}
3586        let mut row_neg = Row::default();
3587        row_neg.packer().push_dict_with(|p| {
3588            p.push(Datum::String("k"));
3589            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3590        });
3591        let map_neg = row_neg.unpack_first().unwrap_map();
3592
3593        // Same keys and semantically equal values, but Eq (bytewise) says not equal
3594        assert_eq!(
3595            map_pos, map_neg,
3596            "DatumMap Eq is semantic; -0.0 and +0.0 have different encodings but are equal"
3597        );
3598        // Verify they have the same logical content
3599        let entries_pos: Vec<_> = map_pos.iter().collect();
3600        let entries_neg: Vec<_> = map_neg.iter().collect();
3601        assert_eq!(entries_pos.len(), entries_neg.len());
3602        for ((k1, v1), (k2, v2)) in entries_pos.iter().zip_eq(entries_neg.iter()) {
3603            assert_eq!(k1, k2);
3604            assert_eq!(
3605                v1, v2,
3606                "Datum-level comparison treats -0.0 and +0.0 as equal"
3607            );
3608        }
3609    }
3610
3611    /// Hash must agree with Eq: equal lists must have the same hash.
3612    #[mz_ore::test]
3613    fn test_datum_list_hash_consistency() {
3614        // Equal lists (including -0.0 vs +0.0) must hash the same
3615        let mut row_pos = Row::default();
3616        row_pos.packer().push_list_with(|p| {
3617            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3618        });
3619        let list_pos = row_pos.unpack_first().unwrap_list();
3620
3621        let mut row_neg = Row::default();
3622        row_neg.packer().push_list_with(|p| {
3623            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3624        });
3625        let list_neg = row_neg.unpack_first().unwrap_list();
3626
3627        assert_eq!(list_pos, list_neg);
3628        assert_eq!(
3629            hash(&list_pos),
3630            hash(&list_neg),
3631            "equal lists must have same hash"
3632        );
3633
3634        // Unequal lists should have different hashes (with asymptotic probability 1)
3635        let mut row_a = Row::default();
3636        row_a.packer().push_list_with(|p| {
3637            p.push(Datum::Int32(1));
3638            p.push(Datum::Int32(2));
3639        });
3640        let list_a = row_a.unpack_first().unwrap_list();
3641
3642        let mut row_b = Row::default();
3643        row_b.packer().push_list_with(|p| {
3644            p.push(Datum::Int32(1));
3645            p.push(Datum::Int32(3));
3646        });
3647        let list_b = row_b.unpack_first().unwrap_list();
3648
3649        assert_ne!(list_a, list_b);
3650        assert_ne!(
3651            hash(&list_a),
3652            hash(&list_b),
3653            "unequal lists must have different hashes"
3654        );
3655    }
3656
3657    /// Ord/PartialOrd for DatumList: less, equal, greater.
3658    #[mz_ore::test]
3659    fn test_datum_list_ordering() {
3660        let mut row_12 = Row::default();
3661        row_12.packer().push_list_with(|p| {
3662            p.push(Datum::Int32(1));
3663            p.push(Datum::Int32(2));
3664        });
3665        let list_12 = row_12.unpack_first().unwrap_list();
3666
3667        let mut row_13 = Row::default();
3668        row_13.packer().push_list_with(|p| {
3669            p.push(Datum::Int32(1));
3670            p.push(Datum::Int32(3));
3671        });
3672        let list_13 = row_13.unpack_first().unwrap_list();
3673
3674        let mut row_123 = Row::default();
3675        row_123.packer().push_list_with(|p| {
3676            p.push(Datum::Int32(1));
3677            p.push(Datum::Int32(2));
3678            p.push(Datum::Int32(3));
3679        });
3680        let list_123 = row_123.unpack_first().unwrap_list();
3681
3682        // [1, 2] < [1, 3] due to the second element being different
3683        assert_eq!(list_12.cmp(&list_13), Ordering::Less);
3684        assert_eq!(list_13.cmp(&list_12), Ordering::Greater);
3685        assert_eq!(list_12.cmp(&list_12), Ordering::Equal);
3686        // shorter prefix compares less
3687        assert_eq!(list_12.cmp(&list_123), Ordering::Less);
3688    }
3689
3690    /// Hash must agree with Eq: equal maps must have the same hash.
3691    #[mz_ore::test]
3692    fn test_datum_map_hash_consistency() {
3693        let mut row_pos = Row::default();
3694        row_pos.packer().push_dict_with(|p| {
3695            p.push(Datum::String("x"));
3696            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3697        });
3698        let map_pos = row_pos.unpack_first().unwrap_map();
3699
3700        let mut row_neg = Row::default();
3701        row_neg.packer().push_dict_with(|p| {
3702            p.push(Datum::String("x"));
3703            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3704        });
3705        let map_neg = row_neg.unpack_first().unwrap_map();
3706
3707        assert_eq!(map_pos, map_neg);
3708        assert_eq!(
3709            hash(&map_pos),
3710            hash(&map_neg),
3711            "equal maps must have same hash"
3712        );
3713
3714        let mut row_a = Row::default();
3715        row_a.packer().push_dict_with(|p| {
3716            p.push(Datum::String("a"));
3717            p.push(Datum::Int32(1));
3718        });
3719        let map_a = row_a.unpack_first().unwrap_map();
3720
3721        let mut row_b = Row::default();
3722        row_b.packer().push_dict_with(|p| {
3723            p.push(Datum::String("a"));
3724            p.push(Datum::Int32(2));
3725        });
3726        let map_b = row_b.unpack_first().unwrap_map();
3727
3728        assert_ne!(map_a, map_b);
3729        assert_ne!(
3730            hash(&map_a),
3731            hash(&map_b),
3732            "unequal maps must have different hashes"
3733        );
3734    }
3735
3736    /// Ord/PartialOrd for DatumMap: less, equal, greater (by key then value).
3737    #[mz_ore::test]
3738    fn test_datum_map_ordering() {
3739        let mut row_a1 = Row::default();
3740        row_a1.packer().push_dict_with(|p| {
3741            p.push(Datum::String("a"));
3742            p.push(Datum::Int32(1));
3743        });
3744        let map_a1 = row_a1.unpack_first().unwrap_map();
3745
3746        let mut row_a2 = Row::default();
3747        row_a2.packer().push_dict_with(|p| {
3748            p.push(Datum::String("a"));
3749            p.push(Datum::Int32(2));
3750        });
3751        let map_a2 = row_a2.unpack_first().unwrap_map();
3752
3753        let mut row_b1 = Row::default();
3754        row_b1.packer().push_dict_with(|p| {
3755            p.push(Datum::String("b"));
3756            p.push(Datum::Int32(1));
3757        });
3758        let map_b1 = row_b1.unpack_first().unwrap_map();
3759
3760        assert_eq!(map_a1.cmp(&map_a2), Ordering::Less);
3761        assert_eq!(map_a2.cmp(&map_a1), Ordering::Greater);
3762        assert_eq!(map_a1.cmp(&map_a1), Ordering::Equal);
3763        assert_eq!(map_a1.cmp(&map_b1), Ordering::Less); // "a" < "b"
3764    }
3765
3766    /// Datum puts Null last in the enum so that nulls sort last (PostgreSQL default).
3767    /// This ordering is used when comparing DatumList/DatumMap (e.g. jsonb_agg tiebreaker).
3768    #[mz_ore::test]
3769    fn test_datum_list_and_map_null_sorts_last() {
3770        // DatumList: [1] < [null] so non-null sorts before null
3771        let mut row_list_1 = Row::default();
3772        row_list_1
3773            .packer()
3774            .push_list_with(|p| p.push(Datum::Int32(1)));
3775        let list_1 = row_list_1.unpack_first().unwrap_list();
3776
3777        let mut row_list_null = Row::default();
3778        row_list_null
3779            .packer()
3780            .push_list_with(|p| p.push(Datum::Null));
3781        let list_null = row_list_null.unpack_first().unwrap_list();
3782
3783        assert_eq!(list_1.cmp(&list_null), Ordering::Less);
3784        assert_eq!(list_null.cmp(&list_1), Ordering::Greater);
3785
3786        // DatumMap: {"k": 1} < {"k": null} so non-null sorts before null (same as jsonb_agg)
3787        let mut row_map_1 = Row::default();
3788        row_map_1.packer().push_dict_with(|p| {
3789            p.push(Datum::String("k"));
3790            p.push(Datum::Int32(1));
3791        });
3792        let map_1 = row_map_1.unpack_first().unwrap_map();
3793
3794        let mut row_map_null = Row::default();
3795        row_map_null.packer().push_dict_with(|p| {
3796            p.push(Datum::String("k"));
3797            p.push(Datum::Null);
3798        });
3799        let map_null = row_map_null.unpack_first().unwrap_map();
3800
3801        assert_eq!(map_1.cmp(&map_null), Ordering::Less);
3802        assert_eq!(map_null.cmp(&map_1), Ordering::Greater);
3803    }
3804}