Skip to main content

mz_repr/
row.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Borrow;
11use std::cell::{Cell, RefCell};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::hash::{Hash, Hasher};
16use std::mem::{size_of, transmute};
17use std::ops::Deref;
18use std::str;
19
20use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
21use compact_bytes::CompactBytes;
22use mz_ore::cast::{CastFrom, ReinterpretCast};
23use mz_ore::soft_assert_no_log;
24use mz_ore::vec::Vector;
25use mz_persist_types::Codec64;
26use num_enum::{IntoPrimitive, TryFromPrimitive};
27use ordered_float::OrderedFloat;
28use proptest::prelude::*;
29use proptest::strategy::{BoxedStrategy, Strategy};
30use serde::{Deserialize, Serialize};
31use uuid::Uuid;
32
33use crate::adt::array::{
34    Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
35};
36use crate::adt::date::Date;
37use crate::adt::interval::Interval;
38use crate::adt::mz_acl_item::{AclItem, MzAclItem};
39use crate::adt::numeric;
40use crate::adt::numeric::Numeric;
41use crate::adt::range::{
42    self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
43};
44use crate::adt::timestamp::CheckedTimestamp;
45use crate::scalar::{DatumKind, arb_datum};
46use crate::{Datum, RelationDesc, Timestamp};
47
48pub(crate) mod encode;
49pub mod iter;
50
51include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
52
53/// A packed representation for `Datum`s.
54///
55/// `Datum` is easy to work with but very space inefficient. A `Datum::Int32(42)`
56/// is laid out in memory like this:
57///
58///   tag: 3
59///   padding: 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
60///   data: 0 0 0 42
61///   padding: 0 0 0 0 0 0 0 0 0 0 0 0
62///
63/// For a total of 32 bytes! The second set of padding is needed in case we were
64/// to write a 16-byte datum into this location. The first set of padding is
65/// needed to align that hypothetical decimal to a 16 bytes boundary.
66///
67/// A `Row` stores zero or more `Datum`s without any padding. We avoid the need
68/// for the first set of padding by only providing access to the `Datum`s via
69/// calls to `ptr::read_unaligned`, which on modern x86 is barely penalized. We
70/// avoid the need for the second set of padding by not providing mutable access
71/// to the `Datum`. Instead, `Row` is append-only.
72///
73/// A `Row` can be built from a collection of `Datum`s using `Row::pack`, but it
74/// is more efficient to use `Row::pack_slice` so that a right-sized allocation
75/// can be created. If that is not possible, consider using the row buffer
76/// pattern: allocate one row, pack into it, and then call [`Row::clone`] to
77/// receive a copy of that row, leaving behind the original allocation to pack
78/// future rows.
79///
80/// Creating a row via [`Row::pack_slice`]:
81///
82/// ```
83/// # use mz_repr::{Row, Datum};
84/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
85/// assert_eq!(row.unpack(), vec![Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)])
86/// ```
87///
88/// `Row`s can be unpacked by iterating over them:
89///
90/// ```
91/// # use mz_repr::{Row, Datum};
92/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
93/// assert_eq!(row.iter().nth(1).unwrap(), Datum::Int32(1));
94/// ```
95///
96/// If you want random access to the `Datum`s in a `Row`, use `Row::unpack` to create a `Vec<Datum>`
97/// ```
98/// # use mz_repr::{Row, Datum};
99/// let row = Row::pack_slice(&[Datum::Int32(0), Datum::Int32(1), Datum::Int32(2)]);
100/// let datums = row.unpack();
101/// assert_eq!(datums[1], Datum::Int32(1));
102/// ```
103///
104/// # Performance
105///
106/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
107/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
108/// avoids the allocations involved in `Row::new()`.
109#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
110pub struct Row {
111    data: CompactBytes,
112}
113
114impl Row {
115    const SIZE: usize = CompactBytes::MAX_INLINE;
116
117    /// A variant of `Row::from_proto` that allows for reuse of internal allocs
118    /// and validates the decoding against a provided [`RelationDesc`].
119    pub fn decode_from_proto(
120        &mut self,
121        proto: &ProtoRow,
122        desc: &RelationDesc,
123    ) -> Result<(), String> {
124        let mut packer = self.packer();
125        for (col_idx, _, _) in desc.iter_all() {
126            let d = match proto.datums.get(col_idx.to_raw()) {
127                Some(x) => x,
128                None => {
129                    packer.push(Datum::Null);
130                    continue;
131                }
132            };
133            packer.try_push_proto(d)?;
134        }
135
136        Ok(())
137    }
138
139    /// Allocate an empty `Row` with a pre-allocated capacity.
140    #[inline]
141    pub fn with_capacity(cap: usize) -> Self {
142        Self {
143            data: CompactBytes::with_capacity(cap),
144        }
145    }
146
147    /// Create an empty `Row`.
148    #[inline]
149    pub const fn empty() -> Self {
150        Self {
151            data: CompactBytes::empty(),
152        }
153    }
154
155    /// Creates a new row from supplied bytes.
156    ///
157    /// # Safety
158    ///
159    /// This method relies on `data` being an appropriate row encoding, and can
160    /// result in unsafety if this is not the case.
161    pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
162        Row {
163            data: CompactBytes::new(data),
164        }
165    }
166
167    /// Constructs a [`RowPacker`] that will pack datums into this row's
168    /// allocation.
169    ///
170    /// This method clears the existing contents of the row, but retains the
171    /// allocation.
172    pub fn packer(&mut self) -> RowPacker<'_> {
173        self.clear();
174        RowPacker { row: self }
175    }
176
177    /// Take some `Datum`s and pack them into a `Row`.
178    ///
179    /// This method builds a `Row` by repeatedly increasing the backing
180    /// allocation. If the contents of the iterator are known ahead of
181    /// time, consider [`Row::with_capacity`] to right-size the allocation
182    /// first, and then [`RowPacker::extend`] to populate it with `Datum`s.
183    /// This avoids the repeated allocation resizing and copying.
184    pub fn pack<'a, I, D>(iter: I) -> Row
185    where
186        I: IntoIterator<Item = D>,
187        D: Borrow<Datum<'a>>,
188    {
189        let mut row = Row::default();
190        row.packer().extend(iter);
191        row
192    }
193
194    /// Use `self` to pack `iter`, and then clone the result.
195    ///
196    /// This is a convenience method meant to reduce boilerplate around row
197    /// formation.
198    pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
199    where
200        I: IntoIterator<Item = D>,
201        D: Borrow<Datum<'a>>,
202    {
203        self.packer().extend(iter);
204        self.clone()
205    }
206
207    /// Like [`Row::pack`], but the provided iterator is allowed to produce an
208    /// error, in which case the packing operation is aborted and the error
209    /// returned.
210    pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
211    where
212        I: IntoIterator<Item = Result<D, E>>,
213        D: Borrow<Datum<'a>>,
214    {
215        let mut row = Row::default();
216        row.packer().try_extend(iter)?;
217        Ok(row)
218    }
219
220    /// Pack a slice of `Datum`s into a `Row`.
221    ///
222    /// This method has the advantage over `pack` that it can determine the required
223    /// allocation before packing the elements, ensuring only one allocation and no
224    /// redundant copies required.
225    pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
226        // Pre-allocate the needed number of bytes.
227        let mut row = Row::with_capacity(datums_size(slice.iter()));
228        row.packer().extend(slice.iter());
229        row
230    }
231
232    /// Returns the total amount of bytes used by this row.
233    pub fn byte_len(&self) -> usize {
234        let heap_size = if self.data.spilled() {
235            self.data.len()
236        } else {
237            0
238        };
239        let inline_size = std::mem::size_of::<Self>();
240        inline_size.saturating_add(heap_size)
241    }
242
243    /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself.
244    pub fn data_len(&self) -> usize {
245        self.data.len()
246    }
247
248    /// Returns the total capacity in bytes used by this row.
249    pub fn byte_capacity(&self) -> usize {
250        self.data.capacity()
251    }
252
253    /// Extracts a Row slice containing the entire [`Row`].
254    #[inline]
255    pub fn as_row_ref(&self) -> &RowRef {
256        RowRef::from_slice(self.data.as_slice())
257    }
258
259    /// Clear the contents of the [`Row`], leaving any allocation in place.
260    #[inline]
261    fn clear(&mut self) {
262        self.data.clear();
263    }
264}
265
266impl Borrow<RowRef> for Row {
267    #[inline]
268    fn borrow(&self) -> &RowRef {
269        self.as_row_ref()
270    }
271}
272
273impl AsRef<RowRef> for Row {
274    #[inline]
275    fn as_ref(&self) -> &RowRef {
276        self.as_row_ref()
277    }
278}
279
280impl Deref for Row {
281    type Target = RowRef;
282
283    #[inline]
284    fn deref(&self) -> &Self::Target {
285        self.as_row_ref()
286    }
287}
288
289// Nothing depends on Row being exactly 24, we just want to add visibility to the size.
290static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
291
292impl Clone for Row {
293    fn clone(&self) -> Self {
294        Row {
295            data: self.data.clone(),
296        }
297    }
298
299    fn clone_from(&mut self, source: &Self) {
300        self.data.clone_from(&source.data);
301    }
302}
303
304// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
305impl std::hash::Hash for Row {
306    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
307        self.as_row_ref().hash(state)
308    }
309}
310
311impl Arbitrary for Row {
312    type Parameters = prop::collection::SizeRange;
313    type Strategy = BoxedStrategy<Row>;
314
315    fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
316        prop::collection::vec(arb_datum(true), size)
317            .prop_map(|items| {
318                let mut row = Row::default();
319                let mut packer = row.packer();
320                for item in items.iter() {
321                    let datum: Datum<'_> = item.into();
322                    packer.push(datum);
323                }
324                row
325            })
326            .boxed()
327    }
328}
329
330impl PartialOrd for Row {
331    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
332        Some(self.cmp(other))
333    }
334}
335
336impl Ord for Row {
337    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
338        self.as_ref().cmp(other.as_ref())
339    }
340}
341
342#[allow(missing_debug_implementations)]
343mod columnation {
344    use columnation::{Columnation, Region};
345    use mz_ore::region::LgAllocRegion;
346
347    use crate::Row;
348
349    /// Region allocation for `Row` data.
350    ///
351    /// Content bytes are stored in stable contiguous memory locations,
352    /// and then a `Row` referencing them is falsified.
353    pub struct RowStack {
354        region: LgAllocRegion<u8>,
355    }
356
357    impl RowStack {
358        const LIMIT: usize = 2 << 20;
359    }
360
361    // Implement `Default` manually to specify a region allocation limit.
362    impl Default for RowStack {
363        fn default() -> Self {
364            Self {
365                // Limit the region size to 2MiB.
366                region: LgAllocRegion::with_limit(Self::LIMIT),
367            }
368        }
369    }
370
371    impl Columnation for Row {
372        type InnerRegion = RowStack;
373    }
374
375    impl Region for RowStack {
376        type Item = Row;
377        #[inline]
378        fn clear(&mut self) {
379            self.region.clear();
380        }
381        #[inline(always)]
382        unsafe fn copy(&mut self, item: &Row) -> Row {
383            if item.data.spilled() {
384                let bytes = self.region.copy_slice(&item.data[..]);
385                Row {
386                    data: compact_bytes::CompactBytes::from_raw_parts(
387                        bytes.as_mut_ptr(),
388                        item.data.len(),
389                        item.data.capacity(),
390                    ),
391                }
392            } else {
393                item.clone()
394            }
395        }
396
397        fn reserve_items<'a, I>(&mut self, items: I)
398        where
399            Self: 'a,
400            I: Iterator<Item = &'a Self::Item> + Clone,
401        {
402            let size = items
403                .filter(|row| row.data.spilled())
404                .map(|row| row.data.len())
405                .sum();
406            let size = std::cmp::min(size, Self::LIMIT);
407            self.region.reserve(size);
408        }
409
410        fn reserve_regions<'a, I>(&mut self, regions: I)
411        where
412            Self: 'a,
413            I: Iterator<Item = &'a Self> + Clone,
414        {
415            let size = regions.map(|r| r.region.len()).sum();
416            let size = std::cmp::min(size, Self::LIMIT);
417            self.region.reserve(size);
418        }
419
420        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
421            self.region.heap_size(callback)
422        }
423    }
424}
425
426mod columnar {
427    use columnar::common::PushIndexAs;
428    use columnar::{
429        AsBytes, Borrow, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
430    };
431    use mz_ore::cast::CastFrom;
432    use std::ops::Range;
433
434    use crate::{Row, RowRef};
435
436    #[derive(Copy, Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
437    pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
438        /// Bounds container; provides indexed access to offsets.
439        pub bounds: BC,
440        /// Values container; provides slice access to bytes.
441        pub values: VC,
442    }
443
444    impl Columnar for Row {
445        #[inline(always)]
446        fn copy_from(&mut self, other: columnar::Ref<'_, Self>) {
447            self.clear();
448            self.data.extend_from_slice(other.data());
449        }
450        #[inline(always)]
451        fn into_owned(other: columnar::Ref<'_, Self>) -> Self {
452            other.to_owned()
453        }
454        type Container = Rows;
455        #[inline(always)]
456        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
457        where
458            Self: 'a,
459        {
460            thing
461        }
462    }
463
464    impl<BC: PushIndexAs<u64>> Borrow for Rows<BC, Vec<u8>> {
465        type Ref<'a> = &'a RowRef;
466        type Borrowed<'a>
467            = Rows<BC::Borrowed<'a>, &'a [u8]>
468        where
469            Self: 'a;
470        #[inline(always)]
471        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
472            Rows {
473                bounds: self.bounds.borrow(),
474                values: self.values.borrow(),
475            }
476        }
477        #[inline(always)]
478        fn reborrow<'c, 'a: 'c>(item: Self::Borrowed<'a>) -> Self::Borrowed<'c>
479        where
480            Self: 'a,
481        {
482            Rows {
483                bounds: BC::reborrow(item.bounds),
484                values: item.values,
485            }
486        }
487
488        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
489        where
490            Self: 'a,
491        {
492            item
493        }
494    }
495
496    impl<BC: PushIndexAs<u64>> Container for Rows<BC, Vec<u8>> {
497        fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
498            if !range.is_empty() {
499                // Imported bounds will be relative to this starting offset.
500                let values_len: u64 = self.values.len().try_into().expect("must fit");
501
502                // Push all bytes that we can, all at once.
503                let other_lower = if range.start == 0 {
504                    0
505                } else {
506                    other.bounds.index_as(range.start - 1)
507                };
508                let other_upper = other.bounds.index_as(range.end - 1);
509                self.values.extend_from_self(
510                    other.values,
511                    usize::try_from(other_lower).expect("must fit")
512                        ..usize::try_from(other_upper).expect("must fit"),
513                );
514
515                // Each bound needs to be shifted by `values_len - other_lower`.
516                if values_len == other_lower {
517                    self.bounds.extend_from_self(other.bounds, range);
518                } else {
519                    for index in range {
520                        let shifted = other.bounds.index_as(index) - other_lower + values_len;
521                        self.bounds.push(&shifted)
522                    }
523                }
524            }
525        }
526        fn reserve_for<'a, I>(&mut self, selves: I)
527        where
528            Self: 'a,
529            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
530        {
531            self.bounds.reserve_for(selves.clone().map(|r| r.bounds));
532            self.values.reserve_for(selves.map(|r| r.values));
533        }
534    }
535
536    impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
537        #[inline(always)]
538        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
539            columnar::chain(self.bounds.as_bytes(), self.values.as_bytes())
540        }
541    }
542    impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
543        #[inline(always)]
544        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
545            Self {
546                bounds: FromBytes::from_bytes(bytes),
547                values: FromBytes::from_bytes(bytes),
548            }
549        }
550    }
551
552    impl<BC: Len, VC> Len for Rows<BC, VC> {
553        #[inline(always)]
554        fn len(&self) -> usize {
555            self.bounds.len()
556        }
557    }
558
559    impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
560        type Ref = &'a RowRef;
561        #[inline(always)]
562        fn get(&self, index: usize) -> Self::Ref {
563            let lower = if index == 0 {
564                0
565            } else {
566                self.bounds.index_as(index - 1)
567            };
568            let upper = self.bounds.index_as(index);
569            let lower = usize::cast_from(lower);
570            let upper = usize::cast_from(upper);
571            RowRef::from_slice(&self.values[lower..upper])
572        }
573    }
574    impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
575        type Ref = &'a RowRef;
576        #[inline(always)]
577        fn get(&self, index: usize) -> Self::Ref {
578            let lower = if index == 0 {
579                0
580            } else {
581                self.bounds.index_as(index - 1)
582            };
583            let upper = self.bounds.index_as(index);
584            let lower = usize::cast_from(lower);
585            let upper = usize::cast_from(upper);
586            RowRef::from_slice(&self.values[lower..upper])
587        }
588    }
589
590    impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
591        #[inline(always)]
592        fn push(&mut self, item: &Row) {
593            self.values.extend_from_slice(item.data.as_slice());
594            self.bounds.push(u64::cast_from(self.values.len()));
595        }
596    }
597    impl<BC: for<'a> Push<&'a u64>> Push<&RowRef> for Rows<BC> {
598        #[inline(always)]
599        fn push(&mut self, item: &RowRef) {
600            self.values.extend_from_slice(item.data());
601            self.bounds.push(&u64::cast_from(self.values.len()));
602        }
603    }
604    impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
605        #[inline(always)]
606        fn clear(&mut self) {
607            self.bounds.clear();
608            self.values.clear();
609        }
610    }
611    impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
612        #[inline(always)]
613        fn heap_size(&self) -> (usize, usize) {
614            let (l0, c0) = self.bounds.heap_size();
615            let (l1, c1) = self.values.heap_size();
616            (l0 + l1, c0 + c1)
617        }
618    }
619}
620
621/// A contiguous slice of bytes that are row data.
622///
623/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
624#[derive(PartialEq, Eq, Hash)]
625#[repr(transparent)]
626pub struct RowRef([u8]);
627
628impl RowRef {
629    /// Create a [`RowRef`] from a slice of data.
630    ///
631    /// We do not check that the provided slice is valid [`Row`] data, will panic on read
632    /// if the data is invalid.
633    pub fn from_slice(row: &[u8]) -> &RowRef {
634        #[allow(clippy::as_conversions)]
635        let ptr = row as *const [u8] as *const RowRef;
636        // SAFETY: We know `ptr` is non-null and aligned because it came from a &[u8].
637        unsafe { &*ptr }
638    }
639
640    /// Unpack `self` into a `Vec<Datum>` for efficient random access.
641    pub fn unpack(&self) -> Vec<Datum<'_>> {
642        // It's usually cheaper to unpack twice to figure out the right length than it is to grow the vec as we go
643        let len = self.iter().count();
644        let mut vec = Vec::with_capacity(len);
645        vec.extend(self.iter());
646        vec
647    }
648
649    /// Return the first [`Datum`] in `self`
650    ///
651    /// Panics if the [`RowRef`] is empty.
652    pub fn unpack_first(&self) -> Datum<'_> {
653        self.iter().next().unwrap()
654    }
655
656    /// Iterate the [`Datum`] elements of the [`RowRef`].
657    pub fn iter(&self) -> DatumListIter<'_> {
658        DatumListIter { data: &self.0 }
659    }
660
661    /// Return the byte length of this [`RowRef`].
662    pub fn byte_len(&self) -> usize {
663        self.0.len()
664    }
665
666    /// For debugging only.
667    pub fn data(&self) -> &[u8] {
668        &self.0
669    }
670
671    /// True iff there is no data in this [`RowRef`].
672    pub fn is_empty(&self) -> bool {
673        self.0.is_empty()
674    }
675}
676
677impl ToOwned for RowRef {
678    type Owned = Row;
679
680    fn to_owned(&self) -> Self::Owned {
681        // SAFETY: RowRef has the invariant that the wrapped data must be a valid Row encoding.
682        unsafe { Row::from_bytes_unchecked(&self.0) }
683    }
684}
685
686impl<'a> IntoIterator for &'a RowRef {
687    type Item = Datum<'a>;
688    type IntoIter = DatumListIter<'a>;
689
690    fn into_iter(self) -> DatumListIter<'a> {
691        DatumListIter { data: &self.0 }
692    }
693}
694
695/// These implementations order first by length, and then by slice contents.
696/// This allows many comparisons to complete without dereferencing memory.
697/// Warning: These order by the u8 array representation, and NOT by Datum::cmp.
698impl PartialOrd for RowRef {
699    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
700        Some(self.cmp(other))
701    }
702}
703
704impl Ord for RowRef {
705    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
706        match self.0.len().cmp(&other.0.len()) {
707            std::cmp::Ordering::Less => std::cmp::Ordering::Less,
708            std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
709            std::cmp::Ordering::Equal => self.0.cmp(&other.0),
710        }
711    }
712}
713
714impl fmt::Debug for RowRef {
715    /// Debug representation using the internal datums
716    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
717        f.write_str("RowRef{")?;
718        f.debug_list().entries(self.into_iter()).finish()?;
719        f.write_str("}")
720    }
721}
722
723/// Packs datums into a [`Row`].
724///
725/// Creating a `RowPacker` via [`Row::packer`] starts a packing operation on the
726/// row. A packing operation always starts from scratch: the existing contents
727/// of the underlying row are cleared.
728///
729/// To complete a packing operation, drop the `RowPacker`.
730#[derive(Debug)]
731pub struct RowPacker<'a> {
732    row: &'a mut Row,
733}
734
735#[derive(Debug, Clone)]
736pub struct DatumListIter<'a> {
737    data: &'a [u8],
738}
739
740#[derive(Debug, Clone)]
741pub struct DatumDictIter<'a> {
742    data: &'a [u8],
743    prev_key: Option<&'a str>,
744}
745
746/// `RowArena` is used to hold on to temporary `Row`s for functions like `eval` that need to create complex `Datum`s but don't have a `Row` to put them in yet.
747#[derive(Debug)]
748pub struct RowArena {
749    // Semantically, this field would be better represented by a `Vec<Box<[u8]>>`,
750    // as once the arena takes ownership of a byte vector the vector is never
751    // modified. But `RowArena::push_bytes` takes ownership of a `Vec<u8>`, so
752    // storing that `Vec<u8>` directly avoids an allocation. The cost is
753    // additional memory use, as the vector may have spare capacity, but row
754    // arenas are short lived so this is the better tradeoff.
755    inner: RefCell<Vec<Vec<u8>>>,
756}
757
758// DatumList and DatumDict defined here rather than near Datum because we need private access to the unsafe data field
759
760/// A sequence of Datums
761#[derive(Clone, Copy)]
762pub struct DatumList<'a> {
763    /// Points at the serialized datums
764    data: &'a [u8],
765}
766
767impl<'a> Debug for DatumList<'a> {
768    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
769        f.debug_list().entries(self.iter()).finish()
770    }
771}
772
773impl<'a> PartialEq for DatumList<'a> {
774    #[inline(always)]
775    fn eq(&self, other: &DatumList<'a>) -> bool {
776        self.iter().eq(other.iter())
777    }
778}
779
780impl<'a> Eq for DatumList<'a> {}
781
782impl<'a> Hash for DatumList<'a> {
783    #[inline(always)]
784    fn hash<H: Hasher>(&self, state: &mut H) {
785        for d in self.iter() {
786            d.hash(state);
787        }
788    }
789}
790
791impl Ord for DatumList<'_> {
792    #[inline(always)]
793    fn cmp(&self, other: &DatumList) -> Ordering {
794        self.iter().cmp(other.iter())
795    }
796}
797
798impl PartialOrd for DatumList<'_> {
799    #[inline(always)]
800    fn partial_cmp(&self, other: &DatumList) -> Option<Ordering> {
801        Some(self.cmp(other))
802    }
803}
804
805/// A mapping from string keys to Datums
806#[derive(Clone, Copy)]
807pub struct DatumMap<'a> {
808    /// Points at the serialized datums, which should be sorted in key order
809    data: &'a [u8],
810}
811
812impl<'a> PartialEq for DatumMap<'a> {
813    #[inline(always)]
814    fn eq(&self, other: &DatumMap<'a>) -> bool {
815        self.iter().eq(other.iter())
816    }
817}
818
819impl<'a> Eq for DatumMap<'a> {}
820
821impl<'a> Hash for DatumMap<'a> {
822    #[inline(always)]
823    fn hash<H: Hasher>(&self, state: &mut H) {
824        for (k, v) in self.iter() {
825            k.hash(state);
826            v.hash(state);
827        }
828    }
829}
830
831impl<'a> Ord for DatumMap<'a> {
832    #[inline(always)]
833    fn cmp(&self, other: &DatumMap<'a>) -> Ordering {
834        self.iter().cmp(other.iter())
835    }
836}
837
838impl<'a> PartialOrd for DatumMap<'a> {
839    #[inline(always)]
840    fn partial_cmp(&self, other: &DatumMap<'a>) -> Option<Ordering> {
841        Some(self.cmp(other))
842    }
843}
844
845/// Represents a single `Datum`, appropriate to be nested inside other
846/// `Datum`s.
847#[derive(Clone, Copy, Eq, PartialEq, Hash)]
848pub struct DatumNested<'a> {
849    val: &'a [u8],
850}
851
852impl<'a> std::fmt::Display for DatumNested<'a> {
853    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
854        std::fmt::Display::fmt(&self.datum(), f)
855    }
856}
857
858impl<'a> std::fmt::Debug for DatumNested<'a> {
859    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
860        f.debug_struct("DatumNested")
861            .field("val", &self.datum())
862            .finish()
863    }
864}
865
866impl<'a> DatumNested<'a> {
867    // Figure out which bytes `read_datum` returns (e.g. including the tag),
868    // and then store a reference to those bytes, so we can "replay" this same
869    // call later on without storing the datum itself.
870    pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
871        let prev = *data;
872        let _ = unsafe { read_datum(data) };
873        DatumNested {
874            val: &prev[..(prev.len() - data.len())],
875        }
876    }
877
878    /// Returns the datum `self` contains.
879    pub fn datum(&self) -> Datum<'a> {
880        let mut temp = self.val;
881        unsafe { read_datum(&mut temp) }
882    }
883}
884
885impl<'a> Ord for DatumNested<'a> {
886    fn cmp(&self, other: &Self) -> Ordering {
887        self.datum().cmp(&other.datum())
888    }
889}
890
891impl<'a> PartialOrd for DatumNested<'a> {
892    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
893        Some(self.cmp(other))
894    }
895}
896
897// Prefer adding new tags to the end of the enum. Certain behavior, like row ordering and EXPLAIN
898// PHYSICAL PLAN, rely on the ordering of this enum. Neither of these are breaking changes, but
899// it's annoying when they change.
900#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
901#[repr(u8)]
902enum Tag {
903    Null,
904    False,
905    True,
906    Int16,
907    Int32,
908    Int64,
909    UInt8,
910    UInt32,
911    Float32,
912    Float64,
913    Date,
914    Time,
915    Timestamp,
916    TimestampTz,
917    Interval,
918    BytesTiny,
919    BytesShort,
920    BytesLong,
921    BytesHuge,
922    StringTiny,
923    StringShort,
924    StringLong,
925    StringHuge,
926    Uuid,
927    Array,
928    ListTiny,
929    ListShort,
930    ListLong,
931    ListHuge,
932    Dict,
933    JsonNull,
934    Dummy,
935    Numeric,
936    UInt16,
937    UInt64,
938    MzTimestamp,
939    Range,
940    MzAclItem,
941    AclItem,
942    // Everything except leap seconds and times beyond the range of
943    // i64 nanoseconds. (Note that Materialize does not support leap
944    // seconds, but this module does).
945    CheapTimestamp,
946    // Everything except leap seconds and times beyond the range of
947    // i64 nanoseconds. (Note that Materialize does not support leap
948    // seconds, but this module does).
949    CheapTimestampTz,
950    // The next several tags are for variable-length signed integer encoding.
951    // The basic idea is that `NonNegativeIntN_K` is used to encode a datum of type
952    // IntN whose actual value is positive or zero and fits in K bits, and similarly for
953    // NegativeIntN_K with negative values.
954    //
955    // The order of these tags matters, because we want to be able to choose the
956    // tag for a given datum quickly, with arithmetic, rather than slowly, with a
957    // stack of `if` statements.
958    //
959    // Separate tags for non-negative and negative numbers are used to avoid having to
960    // waste one bit in the actual data space to encode the sign.
961    NonNegativeInt16_0, // i.e., 0
962    NonNegativeInt16_8,
963    NonNegativeInt16_16,
964
965    NonNegativeInt32_0,
966    NonNegativeInt32_8,
967    NonNegativeInt32_16,
968    NonNegativeInt32_24,
969    NonNegativeInt32_32,
970
971    NonNegativeInt64_0,
972    NonNegativeInt64_8,
973    NonNegativeInt64_16,
974    NonNegativeInt64_24,
975    NonNegativeInt64_32,
976    NonNegativeInt64_40,
977    NonNegativeInt64_48,
978    NonNegativeInt64_56,
979    NonNegativeInt64_64,
980
981    NegativeInt16_0, // i.e., -1
982    NegativeInt16_8,
983    NegativeInt16_16,
984
985    NegativeInt32_0,
986    NegativeInt32_8,
987    NegativeInt32_16,
988    NegativeInt32_24,
989    NegativeInt32_32,
990
991    NegativeInt64_0,
992    NegativeInt64_8,
993    NegativeInt64_16,
994    NegativeInt64_24,
995    NegativeInt64_32,
996    NegativeInt64_40,
997    NegativeInt64_48,
998    NegativeInt64_56,
999    NegativeInt64_64,
1000
1001    // These are like the ones above, but for unsigned types. The
1002    // situation is slightly simpler as we don't have negatives.
1003    UInt8_0, // i.e., 0
1004    UInt8_8,
1005
1006    UInt16_0,
1007    UInt16_8,
1008    UInt16_16,
1009
1010    UInt32_0,
1011    UInt32_8,
1012    UInt32_16,
1013    UInt32_24,
1014    UInt32_32,
1015
1016    UInt64_0,
1017    UInt64_8,
1018    UInt64_16,
1019    UInt64_24,
1020    UInt64_32,
1021    UInt64_40,
1022    UInt64_48,
1023    UInt64_56,
1024    UInt64_64,
1025}
1026
1027impl Tag {
1028    fn actual_int_length(self) -> Option<usize> {
1029        use Tag::*;
1030        let val = match self {
1031            NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
1032            | UInt32_0 | UInt64_0 => 0,
1033            NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
1034            | UInt32_8 | UInt64_8 => 1,
1035            NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
1036            | UInt32_16 | UInt64_16 => 2,
1037            NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
1038            NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
1039            NonNegativeInt64_40 | UInt64_40 => 5,
1040            NonNegativeInt64_48 | UInt64_48 => 6,
1041            NonNegativeInt64_56 | UInt64_56 => 7,
1042            NonNegativeInt64_64 | UInt64_64 => 8,
1043            NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
1044            NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
1045            NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
1046            NegativeInt32_24 | NegativeInt64_24 => 3,
1047            NegativeInt32_32 | NegativeInt64_32 => 4,
1048            NegativeInt64_40 => 5,
1049            NegativeInt64_48 => 6,
1050            NegativeInt64_56 => 7,
1051            NegativeInt64_64 => 8,
1052
1053            _ => return None,
1054        };
1055        Some(val)
1056    }
1057}
1058
1059// --------------------------------------------------------------------------------
1060// reading data
1061
1062/// Read a byte slice starting at byte `offset`.
1063///
1064/// Updates `offset` to point to the first byte after the end of the read region.
1065fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
1066    let len = u64::from_le_bytes(read_byte_array(data));
1067    let len = usize::cast_from(len);
1068    let (bytes, next) = data.split_at(len);
1069    *data = next;
1070    bytes
1071}
1072
1073/// Read a data whose length is encoded in the row before its contents.
1074///
1075/// Updates `offset` to point to the first byte after the end of the read region.
1076///
1077/// # Safety
1078///
1079/// This function is safe if the datum's length and contents were previously written by `push_lengthed_bytes`,
1080/// and it was only written with a `String` tag if it was indeed UTF-8.
1081unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
1082    let len = match tag {
1083        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
1084        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1085            usize::from(u16::from_le_bytes(read_byte_array(data)))
1086        }
1087        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1088            usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
1089        }
1090        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1091            usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
1092        }
1093        _ => unreachable!(),
1094    };
1095    let (bytes, next) = data.split_at(len);
1096    *data = next;
1097    match tag {
1098        Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
1099        Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
1100            Datum::String(str::from_utf8_unchecked(bytes))
1101        }
1102        Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
1103            Datum::List(DatumList { data: bytes })
1104        }
1105        _ => unreachable!(),
1106    }
1107}
1108
1109fn read_byte(data: &mut &[u8]) -> u8 {
1110    let byte = data[0];
1111    *data = &data[1..];
1112    byte
1113}
1114
1115/// Read `length` bytes from `data` at `offset`, updating the
1116/// latter. Extend the resulting buffer to an array of `N` bytes by
1117/// inserting `FILL` in the k most significant bytes, where k = N - length.
1118///
1119/// SAFETY:
1120///   * length <= N
1121///   * offset + length <= data.len()
1122fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1123    data: &mut &[u8],
1124    length: usize,
1125) -> [u8; N] {
1126    let mut raw = [FILL; N];
1127    let (prev, next) = data.split_at(length);
1128    (raw[..prev.len()]).copy_from_slice(prev);
1129    *data = next;
1130    raw
1131}
1132/// Read `length` bytes from `data` at `offset`, updating the
1133/// latter. Extend the resulting buffer to a negative `N`-byte
1134/// twos complement integer by filling the remaining bits with 1.
1135///
1136/// SAFETY:
1137///   * length <= N
1138///   * offset + length <= data.len()
1139fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1140    read_byte_array_sign_extending::<N, 255>(data, length)
1141}
1142
1143/// Read `length` bytes from `data` at `offset`, updating the
1144/// latter. Extend the resulting buffer to a positive or zero `N`-byte
1145/// twos complement integer by filling the remaining bits with 0.
1146///
1147/// SAFETY:
1148///   * length <= N
1149///   * offset + length <= data.len()
1150fn read_byte_array_extending_nonnegative<const N: usize>(
1151    data: &mut &[u8],
1152    length: usize,
1153) -> [u8; N] {
1154    read_byte_array_sign_extending::<N, 0>(data, length)
1155}
1156
1157pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1158    let (prev, next) = data.split_first_chunk().unwrap();
1159    *data = next;
1160    *prev
1161}
1162
1163pub(super) fn read_date(data: &mut &[u8]) -> Date {
1164    let days = i32::from_le_bytes(read_byte_array(data));
1165    Date::from_pg_epoch(days).expect("unexpected date")
1166}
1167
1168pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1169    let year = i32::from_le_bytes(read_byte_array(data));
1170    let ordinal = u32::from_le_bytes(read_byte_array(data));
1171    NaiveDate::from_yo_opt(year, ordinal).unwrap()
1172}
1173
1174pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1175    let secs = u32::from_le_bytes(read_byte_array(data));
1176    let nanos = u32::from_le_bytes(read_byte_array(data));
1177    NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1178}
1179
1180/// Read a datum starting at byte `offset`.
1181///
1182/// Updates `offset` to point to the first byte after the end of the read region.
1183///
1184/// # Safety
1185///
1186/// This function is safe if a `Datum` was previously written at this offset by `push_datum`.
1187/// Otherwise it could return invalid values, which is Undefined Behavior.
1188pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1189    let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1190    match tag {
1191        Tag::Null => Datum::Null,
1192        Tag::False => Datum::False,
1193        Tag::True => Datum::True,
1194        Tag::UInt8_0 | Tag::UInt8_8 => {
1195            let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1196                data,
1197                tag.actual_int_length()
1198                    .expect("returns a value for variable-length-encoded integer tags"),
1199            ));
1200            Datum::UInt8(i)
1201        }
1202        Tag::Int16 => {
1203            let i = i16::from_le_bytes(read_byte_array(data));
1204            Datum::Int16(i)
1205        }
1206        Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1207            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1208            // and `data` is big enough because it was encoded validly. These assumptions
1209            // are checked in debug asserts.
1210            let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1211                data,
1212                tag.actual_int_length()
1213                    .expect("returns a value for variable-length-encoded integer tags"),
1214            ));
1215            Datum::Int16(i)
1216        }
1217        Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1218            let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1219                data,
1220                tag.actual_int_length()
1221                    .expect("returns a value for variable-length-encoded integer tags"),
1222            ));
1223            Datum::UInt16(i)
1224        }
1225        Tag::Int32 => {
1226            let i = i32::from_le_bytes(read_byte_array(data));
1227            Datum::Int32(i)
1228        }
1229        Tag::NonNegativeInt32_0
1230        | Tag::NonNegativeInt32_32
1231        | Tag::NonNegativeInt32_8
1232        | Tag::NonNegativeInt32_16
1233        | Tag::NonNegativeInt32_24 => {
1234            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1235            // and `data` is big enough because it was encoded validly. These assumptions
1236            // are checked in debug asserts.
1237            let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1238                data,
1239                tag.actual_int_length()
1240                    .expect("returns a value for variable-length-encoded integer tags"),
1241            ));
1242            Datum::Int32(i)
1243        }
1244        Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1245            let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1246                data,
1247                tag.actual_int_length()
1248                    .expect("returns a value for variable-length-encoded integer tags"),
1249            ));
1250            Datum::UInt32(i)
1251        }
1252        Tag::Int64 => {
1253            let i = i64::from_le_bytes(read_byte_array(data));
1254            Datum::Int64(i)
1255        }
1256        Tag::NonNegativeInt64_0
1257        | Tag::NonNegativeInt64_64
1258        | Tag::NonNegativeInt64_8
1259        | Tag::NonNegativeInt64_16
1260        | Tag::NonNegativeInt64_24
1261        | Tag::NonNegativeInt64_32
1262        | Tag::NonNegativeInt64_40
1263        | Tag::NonNegativeInt64_48
1264        | Tag::NonNegativeInt64_56 => {
1265            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1266            // and `data` is big enough because it was encoded validly. These assumptions
1267            // are checked in debug asserts.
1268
1269            let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1270                data,
1271                tag.actual_int_length()
1272                    .expect("returns a value for variable-length-encoded integer tags"),
1273            ));
1274            Datum::Int64(i)
1275        }
1276        Tag::UInt64_0
1277        | Tag::UInt64_8
1278        | Tag::UInt64_16
1279        | Tag::UInt64_24
1280        | Tag::UInt64_32
1281        | Tag::UInt64_40
1282        | Tag::UInt64_48
1283        | Tag::UInt64_56
1284        | Tag::UInt64_64 => {
1285            let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1286                data,
1287                tag.actual_int_length()
1288                    .expect("returns a value for variable-length-encoded integer tags"),
1289            ));
1290            Datum::UInt64(i)
1291        }
1292        Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1293            // SAFETY:`tag.actual_int_length()` is <= 16 for these tags,
1294            // and `data` is big enough because it was encoded validly. These assumptions
1295            // are checked in debug asserts.
1296            let i = i16::from_le_bytes(read_byte_array_extending_negative(
1297                data,
1298                tag.actual_int_length()
1299                    .expect("returns a value for variable-length-encoded integer tags"),
1300            ));
1301            Datum::Int16(i)
1302        }
1303        Tag::NegativeInt32_0
1304        | Tag::NegativeInt32_32
1305        | Tag::NegativeInt32_8
1306        | Tag::NegativeInt32_16
1307        | Tag::NegativeInt32_24 => {
1308            // SAFETY:`tag.actual_int_length()` is <= 32 for these tags,
1309            // and `data` is big enough because it was encoded validly. These assumptions
1310            // are checked in debug asserts.
1311            let i = i32::from_le_bytes(read_byte_array_extending_negative(
1312                data,
1313                tag.actual_int_length()
1314                    .expect("returns a value for variable-length-encoded integer tags"),
1315            ));
1316            Datum::Int32(i)
1317        }
1318        Tag::NegativeInt64_0
1319        | Tag::NegativeInt64_64
1320        | Tag::NegativeInt64_8
1321        | Tag::NegativeInt64_16
1322        | Tag::NegativeInt64_24
1323        | Tag::NegativeInt64_32
1324        | Tag::NegativeInt64_40
1325        | Tag::NegativeInt64_48
1326        | Tag::NegativeInt64_56 => {
1327            // SAFETY:`tag.actual_int_length()` is <= 64 for these tags,
1328            // and `data` is big enough because the row was encoded validly. These assumptions
1329            // are checked in debug asserts.
1330            let i = i64::from_le_bytes(read_byte_array_extending_negative(
1331                data,
1332                tag.actual_int_length()
1333                    .expect("returns a value for variable-length-encoded integer tags"),
1334            ));
1335            Datum::Int64(i)
1336        }
1337
1338        Tag::UInt8 => {
1339            let i = u8::from_le_bytes(read_byte_array(data));
1340            Datum::UInt8(i)
1341        }
1342        Tag::UInt16 => {
1343            let i = u16::from_le_bytes(read_byte_array(data));
1344            Datum::UInt16(i)
1345        }
1346        Tag::UInt32 => {
1347            let i = u32::from_le_bytes(read_byte_array(data));
1348            Datum::UInt32(i)
1349        }
1350        Tag::UInt64 => {
1351            let i = u64::from_le_bytes(read_byte_array(data));
1352            Datum::UInt64(i)
1353        }
1354        Tag::Float32 => {
1355            let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1356            Datum::Float32(OrderedFloat::from(f))
1357        }
1358        Tag::Float64 => {
1359            let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1360            Datum::Float64(OrderedFloat::from(f))
1361        }
1362        Tag::Date => Datum::Date(read_date(data)),
1363        Tag::Time => Datum::Time(read_time(data)),
1364        Tag::CheapTimestamp => {
1365            let ts = i64::from_le_bytes(read_byte_array(data));
1366            let secs = ts.div_euclid(1_000_000_000);
1367            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1368            let ndt = DateTime::from_timestamp(secs, nsecs)
1369                .expect("We only write round-trippable timestamps")
1370                .naive_utc();
1371            Datum::Timestamp(
1372                CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1373            )
1374        }
1375        Tag::CheapTimestampTz => {
1376            let ts = i64::from_le_bytes(read_byte_array(data));
1377            let secs = ts.div_euclid(1_000_000_000);
1378            let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1379            let dt = DateTime::from_timestamp(secs, nsecs)
1380                .expect("We only write round-trippable timestamps");
1381            Datum::TimestampTz(
1382                CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1383            )
1384        }
1385        Tag::Timestamp => {
1386            let date = read_naive_date(data);
1387            let time = read_time(data);
1388            Datum::Timestamp(
1389                CheckedTimestamp::from_timestamplike(date.and_time(time))
1390                    .expect("unexpected timestamp"),
1391            )
1392        }
1393        Tag::TimestampTz => {
1394            let date = read_naive_date(data);
1395            let time = read_time(data);
1396            Datum::TimestampTz(
1397                CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1398                    date.and_time(time),
1399                    Utc,
1400                ))
1401                .expect("unexpected timestamptz"),
1402            )
1403        }
1404        Tag::Interval => {
1405            let months = i32::from_le_bytes(read_byte_array(data));
1406            let days = i32::from_le_bytes(read_byte_array(data));
1407            let micros = i64::from_le_bytes(read_byte_array(data));
1408            Datum::Interval(Interval {
1409                months,
1410                days,
1411                micros,
1412            })
1413        }
1414        Tag::BytesTiny
1415        | Tag::BytesShort
1416        | Tag::BytesLong
1417        | Tag::BytesHuge
1418        | Tag::StringTiny
1419        | Tag::StringShort
1420        | Tag::StringLong
1421        | Tag::StringHuge
1422        | Tag::ListTiny
1423        | Tag::ListShort
1424        | Tag::ListLong
1425        | Tag::ListHuge => read_lengthed_datum(data, tag),
1426        Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1427        Tag::Array => {
1428            // See the comment in `Row::push_array` for details on the encoding
1429            // of arrays.
1430            let ndims = read_byte(data);
1431            let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1432            let (dims, next) = data.split_at(dims_size);
1433            *data = next;
1434            let bytes = read_untagged_bytes(data);
1435            Datum::Array(Array {
1436                dims: ArrayDimensions { data: dims },
1437                elements: DatumList { data: bytes },
1438            })
1439        }
1440        Tag::Dict => {
1441            let bytes = read_untagged_bytes(data);
1442            Datum::Map(DatumMap { data: bytes })
1443        }
1444        Tag::JsonNull => Datum::JsonNull,
1445        Tag::Dummy => Datum::Dummy,
1446        Tag::Numeric => {
1447            let digits = read_byte(data).into();
1448            let exponent = i8::reinterpret_cast(read_byte(data));
1449            let bits = read_byte(data);
1450
1451            let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1452            let lsu_u8_len = lsu_u16_len * 2;
1453            let (lsu_u8, next) = data.split_at(lsu_u8_len);
1454            *data = next;
1455
1456            // TODO: if we refactor the decimal library to accept the owned
1457            // array as a parameter to `from_raw_parts` below, we could likely
1458            // avoid a copy because it is exactly the value we want
1459            let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1460            for (i, c) in lsu_u8.chunks(2).enumerate() {
1461                lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1462            }
1463
1464            let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1465            Datum::from(d)
1466        }
1467        Tag::MzTimestamp => {
1468            let t = Timestamp::decode(read_byte_array(data));
1469            Datum::MzTimestamp(t)
1470        }
1471        Tag::Range => {
1472            // See notes on `push_range_with` for details about encoding.
1473            let flag_byte = read_byte(data);
1474            let flags = range::InternalFlags::from_bits(flag_byte)
1475                .expect("range flags must be encoded validly");
1476
1477            if flags.contains(range::InternalFlags::EMPTY) {
1478                assert!(
1479                    flags == range::InternalFlags::EMPTY,
1480                    "empty ranges contain only RANGE_EMPTY flag"
1481                );
1482
1483                return Datum::Range(Range { inner: None });
1484            }
1485
1486            let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1487                None
1488            } else {
1489                Some(DatumNested::extract(data))
1490            };
1491
1492            let lower = RangeBound {
1493                inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1494                bound: lower_bound,
1495            };
1496
1497            let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1498                None
1499            } else {
1500                Some(DatumNested::extract(data))
1501            };
1502
1503            let upper = RangeBound {
1504                inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1505                bound: upper_bound,
1506            };
1507
1508            Datum::Range(Range {
1509                inner: Some(RangeInner { lower, upper }),
1510            })
1511        }
1512        Tag::MzAclItem => {
1513            const N: usize = MzAclItem::binary_size();
1514            let mz_acl_item =
1515                MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1516            Datum::MzAclItem(mz_acl_item)
1517        }
1518        Tag::AclItem => {
1519            const N: usize = AclItem::binary_size();
1520            let acl_item =
1521                AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1522            Datum::AclItem(acl_item)
1523        }
1524    }
1525}
1526
1527// --------------------------------------------------------------------------------
1528// writing data
1529
1530fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1531where
1532    D: Vector<u8>,
1533{
1534    let len = u64::cast_from(bytes.len());
1535    data.extend_from_slice(&len.to_le_bytes());
1536    data.extend_from_slice(bytes);
1537}
1538
1539fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1540where
1541    D: Vector<u8>,
1542{
1543    match tag {
1544        Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1545            let len = bytes.len().to_le_bytes();
1546            data.push(len[0]);
1547        }
1548        Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1549            let len = bytes.len().to_le_bytes();
1550            data.extend_from_slice(&len[0..2]);
1551        }
1552        Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1553            let len = bytes.len().to_le_bytes();
1554            data.extend_from_slice(&len[0..4]);
1555        }
1556        Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1557            let len = bytes.len().to_le_bytes();
1558            data.extend_from_slice(&len);
1559        }
1560        _ => unreachable!(),
1561    }
1562    data.extend_from_slice(bytes);
1563}
1564
1565pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1566    i32::to_le_bytes(date.pg_epoch_days())
1567}
1568
1569fn push_date<D>(data: &mut D, date: Date)
1570where
1571    D: Vector<u8>,
1572{
1573    data.extend_from_slice(&date_to_array(date));
1574}
1575
1576pub(super) fn naive_date_to_arrays(
1577    date: NaiveDate,
1578) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1579    (
1580        i32::to_le_bytes(date.year()),
1581        u32::to_le_bytes(date.ordinal()),
1582    )
1583}
1584
1585fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1586where
1587    D: Vector<u8>,
1588{
1589    let (ds1, ds2) = naive_date_to_arrays(date);
1590    data.extend_from_slice(&ds1);
1591    data.extend_from_slice(&ds2);
1592}
1593
1594pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1595    (
1596        u32::to_le_bytes(time.num_seconds_from_midnight()),
1597        u32::to_le_bytes(time.nanosecond()),
1598    )
1599}
1600
1601fn push_time<D>(data: &mut D, time: NaiveTime)
1602where
1603    D: Vector<u8>,
1604{
1605    let (ts1, ts2) = time_to_arrays(time);
1606    data.extend_from_slice(&ts1);
1607    data.extend_from_slice(&ts2);
1608}
1609
1610/// Returns an i64 representing a `NaiveDateTime`, if
1611/// said i64 can be round-tripped back to a `NaiveDateTime`.
1612///
1613/// The only exotic NDTs for which this can't happen are those that
1614/// are hundreds of years in the future or past, or those that
1615/// represent a leap second. (Note that Materialize does not support
1616/// leap seconds, but this module does).
1617// This function is inspired by `NaiveDateTime::timestamp_nanos`,
1618// with extra checking.
1619fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1620    let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1621    if subsec_nanos >= 1_000_000_000 {
1622        return None;
1623    }
1624    let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1625    as_ns.checked_add(i64::from(subsec_nanos))
1626}
1627
1628// This function is extremely hot, so
1629// we just use `as` to avoid the overhead of
1630// `try_into` followed by `unwrap`.
1631// `leading_ones` and `leading_zeros`
1632// can never return values greater than 64, so the conversion is safe.
1633#[inline(always)]
1634#[allow(clippy::as_conversions)]
1635fn min_bytes_signed<T>(i: T) -> u8
1636where
1637    T: Into<i64>,
1638{
1639    let i: i64 = i.into();
1640
1641    // To fit in n bytes, we require that
1642    // everything but the leading sign bits fits in n*8
1643    // bits.
1644    let n_sign_bits = if i.is_negative() {
1645        i.leading_ones() as u8
1646    } else {
1647        i.leading_zeros() as u8
1648    };
1649
1650    (64 - n_sign_bits + 7) / 8
1651}
1652
1653// In principle we could just use `min_bytes_signed`, rather than
1654// having a separate function here, as long as we made that one take
1655// `T: Into<i128>` instead of 64. But LLVM doesn't seem smart enough
1656// to realize that that function is the same as the current version,
1657// and generates worse code.
1658//
1659// Justification for `as` is the same as in `min_bytes_signed`.
1660#[inline(always)]
1661#[allow(clippy::as_conversions)]
1662fn min_bytes_unsigned<T>(i: T) -> u8
1663where
1664    T: Into<u64>,
1665{
1666    let i: u64 = i.into();
1667
1668    let n_sign_bits = i.leading_zeros() as u8;
1669
1670    (64 - n_sign_bits + 7) / 8
1671}
1672
1673const TINY: usize = 1 << 8;
1674const SHORT: usize = 1 << 16;
1675const LONG: usize = 1 << 32;
1676
1677fn push_datum<D>(data: &mut D, datum: Datum)
1678where
1679    D: Vector<u8>,
1680{
1681    match datum {
1682        Datum::Null => data.push(Tag::Null.into()),
1683        Datum::False => data.push(Tag::False.into()),
1684        Datum::True => data.push(Tag::True.into()),
1685        Datum::Int16(i) => {
1686            let mbs = min_bytes_signed(i);
1687            let tag = u8::from(if i.is_negative() {
1688                Tag::NegativeInt16_0
1689            } else {
1690                Tag::NonNegativeInt16_0
1691            }) + mbs;
1692
1693            data.push(tag);
1694            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1695        }
1696        Datum::Int32(i) => {
1697            let mbs = min_bytes_signed(i);
1698            let tag = u8::from(if i.is_negative() {
1699                Tag::NegativeInt32_0
1700            } else {
1701                Tag::NonNegativeInt32_0
1702            }) + mbs;
1703
1704            data.push(tag);
1705            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1706        }
1707        Datum::Int64(i) => {
1708            let mbs = min_bytes_signed(i);
1709            let tag = u8::from(if i.is_negative() {
1710                Tag::NegativeInt64_0
1711            } else {
1712                Tag::NonNegativeInt64_0
1713            }) + mbs;
1714
1715            data.push(tag);
1716            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1717        }
1718        Datum::UInt8(i) => {
1719            let mbu = min_bytes_unsigned(i);
1720            let tag = u8::from(Tag::UInt8_0) + mbu;
1721            data.push(tag);
1722            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1723        }
1724        Datum::UInt16(i) => {
1725            let mbu = min_bytes_unsigned(i);
1726            let tag = u8::from(Tag::UInt16_0) + mbu;
1727            data.push(tag);
1728            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1729        }
1730        Datum::UInt32(i) => {
1731            let mbu = min_bytes_unsigned(i);
1732            let tag = u8::from(Tag::UInt32_0) + mbu;
1733            data.push(tag);
1734            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1735        }
1736        Datum::UInt64(i) => {
1737            let mbu = min_bytes_unsigned(i);
1738            let tag = u8::from(Tag::UInt64_0) + mbu;
1739            data.push(tag);
1740            data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1741        }
1742        Datum::Float32(f) => {
1743            data.push(Tag::Float32.into());
1744            data.extend_from_slice(&f.to_bits().to_le_bytes());
1745        }
1746        Datum::Float64(f) => {
1747            data.push(Tag::Float64.into());
1748            data.extend_from_slice(&f.to_bits().to_le_bytes());
1749        }
1750        Datum::Date(d) => {
1751            data.push(Tag::Date.into());
1752            push_date(data, d);
1753        }
1754        Datum::Time(t) => {
1755            data.push(Tag::Time.into());
1756            push_time(data, t);
1757        }
1758        Datum::Timestamp(t) => {
1759            let datetime = t.to_naive();
1760            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1761                data.push(Tag::CheapTimestamp.into());
1762                data.extend_from_slice(&nanos.to_le_bytes());
1763            } else {
1764                data.push(Tag::Timestamp.into());
1765                push_naive_date(data, datetime.date());
1766                push_time(data, datetime.time());
1767            }
1768        }
1769        Datum::TimestampTz(t) => {
1770            let datetime = t.to_naive();
1771            if let Some(nanos) = checked_timestamp_nanos(datetime) {
1772                data.push(Tag::CheapTimestampTz.into());
1773                data.extend_from_slice(&nanos.to_le_bytes());
1774            } else {
1775                data.push(Tag::TimestampTz.into());
1776                push_naive_date(data, datetime.date());
1777                push_time(data, datetime.time());
1778            }
1779        }
1780        Datum::Interval(i) => {
1781            data.push(Tag::Interval.into());
1782            data.extend_from_slice(&i.months.to_le_bytes());
1783            data.extend_from_slice(&i.days.to_le_bytes());
1784            data.extend_from_slice(&i.micros.to_le_bytes());
1785        }
1786        Datum::Bytes(bytes) => {
1787            let tag = match bytes.len() {
1788                0..TINY => Tag::BytesTiny,
1789                TINY..SHORT => Tag::BytesShort,
1790                SHORT..LONG => Tag::BytesLong,
1791                _ => Tag::BytesHuge,
1792            };
1793            data.push(tag.into());
1794            push_lengthed_bytes(data, bytes, tag);
1795        }
1796        Datum::String(string) => {
1797            let tag = match string.len() {
1798                0..TINY => Tag::StringTiny,
1799                TINY..SHORT => Tag::StringShort,
1800                SHORT..LONG => Tag::StringLong,
1801                _ => Tag::StringHuge,
1802            };
1803            data.push(tag.into());
1804            push_lengthed_bytes(data, string.as_bytes(), tag);
1805        }
1806        Datum::List(list) => {
1807            let tag = match list.data.len() {
1808                0..TINY => Tag::ListTiny,
1809                TINY..SHORT => Tag::ListShort,
1810                SHORT..LONG => Tag::ListLong,
1811                _ => Tag::ListHuge,
1812            };
1813            data.push(tag.into());
1814            push_lengthed_bytes(data, list.data, tag);
1815        }
1816        Datum::Uuid(u) => {
1817            data.push(Tag::Uuid.into());
1818            data.extend_from_slice(u.as_bytes());
1819        }
1820        Datum::Array(array) => {
1821            // See the comment in `Row::push_array` for details on the encoding
1822            // of arrays.
1823            data.push(Tag::Array.into());
1824            data.push(array.dims.ndims());
1825            data.extend_from_slice(array.dims.data);
1826            push_untagged_bytes(data, array.elements.data);
1827        }
1828        Datum::Map(dict) => {
1829            data.push(Tag::Dict.into());
1830            push_untagged_bytes(data, dict.data);
1831        }
1832        Datum::JsonNull => data.push(Tag::JsonNull.into()),
1833        Datum::MzTimestamp(t) => {
1834            data.push(Tag::MzTimestamp.into());
1835            data.extend_from_slice(&t.encode());
1836        }
1837        Datum::Dummy => data.push(Tag::Dummy.into()),
1838        Datum::Numeric(mut n) => {
1839            // Pseudo-canonical representation of decimal values with
1840            // insignificant zeroes trimmed. This compresses the number further
1841            // than `Numeric::trim` by removing all zeroes, and not only those in
1842            // the fractional component.
1843            numeric::cx_datum().reduce(&mut n.0);
1844            let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1845            data.push(Tag::Numeric.into());
1846            data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1847            data.push(
1848                i8::try_from(exponent)
1849                    .expect("exponent to fit within i8; should not exceed +/- 39")
1850                    .to_le_bytes()[0],
1851            );
1852            data.push(bits);
1853
1854            let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1855
1856            // Little endian machines can take the lsu directly from u16 to u8.
1857            if cfg!(target_endian = "little") {
1858                // SAFETY: `lsu` (returned by `coefficient_units()`) is a `&[u16]`, so
1859                // each element can safely be transmuted into two `u8`s.
1860                let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
1861                // The `u8` aligned version of the `lsu` should have twice as many
1862                // elements as we expect for the `u16` version.
1863                soft_assert_no_log!(
1864                    lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
1865                    "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
1866                    Numeric::digits_to_lsu_elements_len(digits) * 2,
1867                    lsu_bytes.len()
1868                );
1869                // There should be no unaligned elements in the prefix or suffix.
1870                soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
1871                data.extend_from_slice(lsu_bytes);
1872            } else {
1873                for u in lsu {
1874                    data.extend_from_slice(&u.to_le_bytes());
1875                }
1876            }
1877        }
1878        Datum::Range(range) => {
1879            // See notes on `push_range_with` for details about encoding.
1880            data.push(Tag::Range.into());
1881            data.push(range.internal_flag_bits());
1882
1883            if let Some(RangeInner { lower, upper }) = range.inner {
1884                for bound in [lower.bound, upper.bound] {
1885                    if let Some(bound) = bound {
1886                        match bound.datum() {
1887                            Datum::Null => panic!("cannot push Datum::Null into range"),
1888                            d => push_datum::<D>(data, d),
1889                        }
1890                    }
1891                }
1892            }
1893        }
1894        Datum::MzAclItem(mz_acl_item) => {
1895            data.push(Tag::MzAclItem.into());
1896            data.extend_from_slice(&mz_acl_item.encode_binary());
1897        }
1898        Datum::AclItem(acl_item) => {
1899            data.push(Tag::AclItem.into());
1900            data.extend_from_slice(&acl_item.encode_binary());
1901        }
1902    }
1903}
1904
1905/// Return the number of bytes these Datums would use if packed as a Row.
1906pub fn row_size<'a, I>(a: I) -> usize
1907where
1908    I: IntoIterator<Item = Datum<'a>>,
1909{
1910    // Using datums_size instead of a.data().len() here is safer because it will
1911    // return the size of the datums if they were packed into a Row. Although
1912    // a.data().len() happens to give the correct answer (and is faster), data()
1913    // is documented as for debugging only.
1914    let sz = datums_size::<_, _>(a);
1915    let size_of_row = std::mem::size_of::<Row>();
1916    // The Row struct attempts to inline data until it can't fit in the
1917    // preallocated size. Otherwise it spills to heap, and uses the Row to point
1918    // to that.
1919    if sz > Row::SIZE {
1920        sz + size_of_row
1921    } else {
1922        size_of_row
1923    }
1924}
1925
1926/// Number of bytes required by the datum.
1927/// This is used to optimistically pre-allocate buffers for packing rows.
1928pub fn datum_size(datum: &Datum) -> usize {
1929    match datum {
1930        Datum::Null => 1,
1931        Datum::False => 1,
1932        Datum::True => 1,
1933        Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
1934        Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
1935        Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
1936        Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1937        Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1938        Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1939        Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1940        Datum::Float32(_) => 1 + size_of::<f32>(),
1941        Datum::Float64(_) => 1 + size_of::<f64>(),
1942        Datum::Date(_) => 1 + size_of::<i32>(),
1943        Datum::Time(_) => 1 + 8,
1944        Datum::Timestamp(t) => {
1945            1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
1946                8
1947            } else {
1948                16
1949            }
1950        }
1951        Datum::TimestampTz(t) => {
1952            1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
1953                8
1954            } else {
1955                16
1956            }
1957        }
1958        Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
1959        Datum::Bytes(bytes) => {
1960            // We use a variable length representation of slice length.
1961            let bytes_for_length = match bytes.len() {
1962                0..TINY => 1,
1963                TINY..SHORT => 2,
1964                SHORT..LONG => 4,
1965                _ => 8,
1966            };
1967            1 + bytes_for_length + bytes.len()
1968        }
1969        Datum::String(string) => {
1970            // We use a variable length representation of slice length.
1971            let bytes_for_length = match string.len() {
1972                0..TINY => 1,
1973                TINY..SHORT => 2,
1974                SHORT..LONG => 4,
1975                _ => 8,
1976            };
1977            1 + bytes_for_length + string.len()
1978        }
1979        Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
1980        Datum::Array(array) => {
1981            1 + size_of::<u8>()
1982                + array.dims.data.len()
1983                + size_of::<u64>()
1984                + array.elements.data.len()
1985        }
1986        Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
1987        Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
1988        Datum::JsonNull => 1,
1989        Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
1990        Datum::Dummy => 1,
1991        Datum::Numeric(d) => {
1992            let mut d = d.0.clone();
1993            // Values must be reduced to determine appropriate number of
1994            // coefficient units.
1995            numeric::cx_datum().reduce(&mut d);
1996            // 4 = 1 bit each for tag, digits, exponent, bits
1997            4 + (d.coefficient_units().len() * 2)
1998        }
1999        Datum::Range(Range { inner }) => {
2000            // Tag + flags
2001            2 + match inner {
2002                None => 0,
2003                Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
2004                    .iter()
2005                    .map(|bound| match bound {
2006                        None => 0,
2007                        Some(bound) => bound.val.len(),
2008                    })
2009                    .sum(),
2010            }
2011        }
2012        Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
2013        Datum::AclItem(_) => 1 + AclItem::binary_size(),
2014    }
2015}
2016
2017/// Number of bytes required by a sequence of datums.
2018///
2019/// This method can be used to right-size the allocation for a `Row`
2020/// before calling [`RowPacker::extend`].
2021pub fn datums_size<'a, I, D>(iter: I) -> usize
2022where
2023    I: IntoIterator<Item = D>,
2024    D: Borrow<Datum<'a>>,
2025{
2026    iter.into_iter().map(|d| datum_size(d.borrow())).sum()
2027}
2028
2029/// Number of bytes required by a list of datums. This computes the size that would be required if
2030/// the given datums were packed into a list.
2031///
2032/// This is used to optimistically pre-allocate buffers for packing rows.
2033pub fn datum_list_size<'a, I, D>(iter: I) -> usize
2034where
2035    I: IntoIterator<Item = D>,
2036    D: Borrow<Datum<'a>>,
2037{
2038    1 + size_of::<u64>() + datums_size(iter)
2039}
2040
2041impl RowPacker<'_> {
2042    /// Constructs a row packer that will pack additional datums into the
2043    /// provided row.
2044    ///
2045    /// This function is intentionally somewhat inconvenient to call. You
2046    /// usually want to call [`Row::packer`] instead to start packing from
2047    /// scratch.
2048    pub fn for_existing_row(row: &mut Row) -> RowPacker<'_> {
2049        RowPacker { row }
2050    }
2051
2052    /// Extend an existing `Row` with a `Datum`.
2053    #[inline]
2054    pub fn push<'a, D>(&mut self, datum: D)
2055    where
2056        D: Borrow<Datum<'a>>,
2057    {
2058        push_datum(&mut self.row.data, *datum.borrow());
2059    }
2060
2061    /// Extend an existing `Row` with additional `Datum`s.
2062    #[inline]
2063    pub fn extend<'a, I, D>(&mut self, iter: I)
2064    where
2065        I: IntoIterator<Item = D>,
2066        D: Borrow<Datum<'a>>,
2067    {
2068        for datum in iter {
2069            push_datum(&mut self.row.data, *datum.borrow())
2070        }
2071    }
2072
2073    /// Extend an existing `Row` with additional `Datum`s.
2074    ///
2075    /// In the case the iterator produces an error, the pushing of
2076    /// datums in terminated and the error returned. The `Row` will
2077    /// be incomplete, but it will be safe to read datums from it.
2078    #[inline]
2079    pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
2080    where
2081        I: IntoIterator<Item = Result<D, E>>,
2082        D: Borrow<Datum<'a>>,
2083    {
2084        for datum in iter {
2085            push_datum(&mut self.row.data, *datum?.borrow());
2086        }
2087        Ok(())
2088    }
2089
2090    /// Appends the datums of an entire `Row`.
2091    pub fn extend_by_row(&mut self, row: &Row) {
2092        self.row.data.extend_from_slice(row.data.as_slice());
2093    }
2094
2095    /// Appends the slice of data representing an entire `Row`. The data is not validated.
2096    ///
2097    /// # Safety
2098    ///
2099    /// The requirements from [`Row::from_bytes_unchecked`] apply here, too:
2100    /// This method relies on `data` being an appropriate row encoding, and can
2101    /// result in unsafety if this is not the case.
2102    #[inline]
2103    pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
2104        self.row.data.extend_from_slice(data)
2105    }
2106
2107    /// Pushes a [`DatumList`] that is built from a closure.
2108    ///
2109    /// The supplied closure will be invoked once with a `Row` that can be used
2110    /// to populate the list. It is valid to call any method on the
2111    /// [`RowPacker`] except for [`RowPacker::clear`], [`RowPacker::truncate`],
2112    /// or [`RowPacker::truncate_datums`].
2113    ///
2114    /// Returns the value returned by the closure, if any.
2115    ///
2116    /// ```
2117    /// # use mz_repr::{Row, Datum};
2118    /// let mut row = Row::default();
2119    /// row.packer().push_list_with(|row| {
2120    ///     row.push(Datum::String("age"));
2121    ///     row.push(Datum::Int64(42));
2122    /// });
2123    /// assert_eq!(
2124    ///     row.unpack_first().unwrap_list().iter().collect::<Vec<_>>(),
2125    ///     vec![Datum::String("age"), Datum::Int64(42)],
2126    /// );
2127    /// ```
2128    #[inline]
2129    pub fn push_list_with<F, R>(&mut self, f: F) -> R
2130    where
2131        F: FnOnce(&mut RowPacker) -> R,
2132    {
2133        // First, assume that the list will fit in 255 bytes, and thus the length will fit in
2134        // 1 byte. If not, we'll fix it up later.
2135        let start = self.row.data.len();
2136        self.row.data.push(Tag::ListTiny.into());
2137        // Write a dummy len, will fix it up later.
2138        self.row.data.push(0);
2139
2140        let out = f(self);
2141
2142        // The `- 1 - 1` is for the tag and the len.
2143        let len = self.row.data.len() - start - 1 - 1;
2144        // We now know the real len.
2145        if len < TINY {
2146            // If the len fits in 1 byte, we just need to fix up the len.
2147            self.row.data[start + 1] = len.to_le_bytes()[0];
2148        } else {
2149            // Note: We move this code path into its own function, so that the common case can be
2150            // inlined.
2151            long_list(&mut self.row.data, start, len);
2152        }
2153
2154        /// 1. Fix up the tag.
2155        /// 2. Move the actual data a bit (for which we also need to make room at the end).
2156        /// 3. Fix up the len.
2157        /// `data`: The row's backing data.
2158        /// `start`: where `push_list_with` started writing in `data`.
2159        /// `len`: the length of the data, excluding the tag and the length.
2160        #[cold]
2161        fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2162            // `len_len`: the length of the length. (Possible values are: 2, 4, 8. 1 is handled
2163            // elsewhere.) The other parameters are the same as for `long_list`.
2164            let long_list_inner = |data: &mut CompactBytes, len_len| {
2165                // We'll need memory for the new, bigger length, so make the `CompactBytes` bigger.
2166                // The `- 1` is because the old length was 1 byte.
2167                const ZEROS: [u8; 8] = [0; 8];
2168                data.extend_from_slice(&ZEROS[0..len_len - 1]);
2169                // Move the data to the end of the `CompactBytes`, to make space for the new length.
2170                // Originally, it started after the 1-byte tag and the 1-byte length, now it will
2171                // start after the 1-byte tag and the len_len-byte length.
2172                //
2173                // Note that this is the only operation in `long_list` whose cost is proportional
2174                // to `len`. Since `len` is at least 256 here, the other operations' cost are
2175                // negligible. `copy_within` is a memmove, which is probably a fair bit faster per
2176                // Datum than a Datum encoding in the `f` closure.
2177                data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2178                // Write the new length.
2179                data[start + 1..start + 1 + len_len]
2180                    .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2181            };
2182            match len {
2183                0..TINY => {
2184                    unreachable!()
2185                }
2186                TINY..SHORT => {
2187                    data[start] = Tag::ListShort.into();
2188                    long_list_inner(data, 2);
2189                }
2190                SHORT..LONG => {
2191                    data[start] = Tag::ListLong.into();
2192                    long_list_inner(data, 4);
2193                }
2194                _ => {
2195                    data[start] = Tag::ListHuge.into();
2196                    long_list_inner(data, 8);
2197                }
2198            };
2199        }
2200
2201        out
2202    }
2203
2204    /// Pushes a [`DatumMap`] that is built from a closure.
2205    ///
2206    /// The supplied closure will be invoked once with a `Row` that can be used
2207    /// to populate the dict.
2208    ///
2209    /// The closure **must** alternate pushing string keys and arbitrary values,
2210    /// otherwise reading the dict will cause a panic.
2211    ///
2212    /// The closure **must** push keys in ascending order, otherwise equality
2213    /// checks on the resulting `Row` may be wrong and reading the dict IN DEBUG
2214    /// MODE will cause a panic.
2215    ///
2216    /// The closure **must not** call [`RowPacker::clear`],
2217    /// [`RowPacker::truncate`], or [`RowPacker::truncate_datums`].
2218    ///
2219    /// # Example
2220    ///
2221    /// ```
2222    /// # use mz_repr::{Row, Datum};
2223    /// let mut row = Row::default();
2224    /// row.packer().push_dict_with(|row| {
2225    ///
2226    ///     // key
2227    ///     row.push(Datum::String("age"));
2228    ///     // value
2229    ///     row.push(Datum::Int64(42));
2230    ///
2231    ///     // key
2232    ///     row.push(Datum::String("name"));
2233    ///     // value
2234    ///     row.push(Datum::String("bob"));
2235    /// });
2236    /// assert_eq!(
2237    ///     row.unpack_first().unwrap_map().iter().collect::<Vec<_>>(),
2238    ///     vec![("age", Datum::Int64(42)), ("name", Datum::String("bob"))]
2239    /// );
2240    /// ```
2241    pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2242    where
2243        F: FnOnce(&mut RowPacker) -> R,
2244    {
2245        self.row.data.push(Tag::Dict.into());
2246        let start = self.row.data.len();
2247        // write a dummy len, will fix it up later
2248        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2249
2250        let res = f(self);
2251
2252        let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2253        // fix up the len
2254        self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2255
2256        res
2257    }
2258
2259    /// Convenience function to construct an array from an iter of `Datum`s.
2260    ///
2261    /// Returns an error if the number of elements in `iter` does not match
2262    /// the cardinality of the array as described by `dims`, or if the
2263    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2264    /// occurs, the packer's state will be unchanged.
2265    pub fn try_push_array<'a, I, D>(
2266        &mut self,
2267        dims: &[ArrayDimension],
2268        iter: I,
2269    ) -> Result<(), InvalidArrayError>
2270    where
2271        I: IntoIterator<Item = D>,
2272        D: Borrow<Datum<'a>>,
2273    {
2274        // SAFETY: The function returns the exact number of elements pushed into the array.
2275        unsafe {
2276            self.push_array_with_unchecked(dims, |packer| {
2277                let mut nelements = 0;
2278                for datum in iter {
2279                    packer.push(datum);
2280                    nelements += 1;
2281                }
2282                Ok::<_, InvalidArrayError>(nelements)
2283            })
2284        }
2285    }
2286
2287    /// Convenience function to construct an array from a function. The function must return the
2288    /// number of elements it pushed into the array. It is undefined behavior if the function returns
2289    /// a number different to the number of elements it pushed.
2290    ///
2291    /// Returns an error if the number of elements pushed by `f` does not match
2292    /// the cardinality of the array as described by `dims`, or if the
2293    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`], or if `f` errors. If an error
2294    /// occurs, the packer's state will be unchanged.
2295    pub unsafe fn push_array_with_unchecked<F, E>(
2296        &mut self,
2297        dims: &[ArrayDimension],
2298        f: F,
2299    ) -> Result<(), E>
2300    where
2301        F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2302        E: From<InvalidArrayError>,
2303    {
2304        // Arrays are encoded as follows.
2305        //
2306        // u8    ndims
2307        // u64   dim_0 lower bound
2308        // u64   dim_0 length
2309        // ...
2310        // u64   dim_n lower bound
2311        // u64   dim_n length
2312        // u64   element data size in bytes
2313        // u8    element data, where elements are encoded in row-major order
2314
2315        if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2316            return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2317        }
2318
2319        let start = self.row.data.len();
2320        self.row.data.push(Tag::Array.into());
2321
2322        // Write dimension information.
2323        self.row
2324            .data
2325            .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2326        for dim in dims {
2327            self.row
2328                .data
2329                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2330            self.row
2331                .data
2332                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2333        }
2334
2335        // Write elements.
2336        let off = self.row.data.len();
2337        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2338        let nelements = match f(self) {
2339            Ok(nelements) => nelements,
2340            Err(e) => {
2341                self.row.data.truncate(start);
2342                return Err(e);
2343            }
2344        };
2345        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2346        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2347
2348        // Check that the number of elements written matches the dimension
2349        // information.
2350        let cardinality = match dims {
2351            [] => 0,
2352            dims => dims.iter().map(|d| d.length).product(),
2353        };
2354        if nelements != cardinality {
2355            self.row.data.truncate(start);
2356            return Err(InvalidArrayError::WrongCardinality {
2357                actual: nelements,
2358                expected: cardinality,
2359            }
2360            .into());
2361        }
2362
2363        Ok(())
2364    }
2365
2366    /// Pushes an [`Array`] that is built from a closure.
2367    ///
2368    /// __WARNING__: This is fairly "sharp" tool that is easy to get wrong. You
2369    /// should prefer [`RowPacker::try_push_array`] when possible.
2370    ///
2371    /// Returns an error if the number of elements pushed does not match
2372    /// the cardinality of the array as described by `dims`, or if the
2373    /// number of dimensions exceeds [`MAX_ARRAY_DIMENSIONS`]. If an error
2374    /// occurs, the packer's state will be unchanged.
2375    pub fn push_array_with_row_major<F, I>(
2376        &mut self,
2377        dims: I,
2378        f: F,
2379    ) -> Result<(), InvalidArrayError>
2380    where
2381        I: IntoIterator<Item = ArrayDimension>,
2382        F: FnOnce(&mut RowPacker) -> usize,
2383    {
2384        let start = self.row.data.len();
2385        self.row.data.push(Tag::Array.into());
2386
2387        // Write dummy dimension length for now, we'll fix it up.
2388        let dims_start = self.row.data.len();
2389        self.row.data.push(42);
2390
2391        let mut num_dims: u8 = 0;
2392        let mut cardinality: usize = 1;
2393        for dim in dims {
2394            num_dims += 1;
2395            cardinality *= dim.length;
2396
2397            self.row
2398                .data
2399                .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2400            self.row
2401                .data
2402                .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2403        }
2404
2405        if num_dims > MAX_ARRAY_DIMENSIONS {
2406            // Reset the packer state so we don't have invalid data.
2407            self.row.data.truncate(start);
2408            return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2409        }
2410        // Fix up our dimension length.
2411        self.row.data[dims_start..dims_start + size_of::<u8>()]
2412            .copy_from_slice(&num_dims.to_le_bytes());
2413
2414        // Write elements.
2415        let off = self.row.data.len();
2416        self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2417
2418        let nelements = f(self);
2419
2420        let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2421        self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2422
2423        // Check that the number of elements written matches the dimension
2424        // information.
2425        let cardinality = match num_dims {
2426            0 => 0,
2427            _ => cardinality,
2428        };
2429        if nelements != cardinality {
2430            self.row.data.truncate(start);
2431            return Err(InvalidArrayError::WrongCardinality {
2432                actual: nelements,
2433                expected: cardinality,
2434            });
2435        }
2436
2437        Ok(())
2438    }
2439
2440    /// Convenience function to push a `DatumList` from an iter of `Datum`s
2441    ///
2442    /// See [`RowPacker::push_dict_with`] if you need to be able to handle errors
2443    pub fn push_list<'a, I, D>(&mut self, iter: I)
2444    where
2445        I: IntoIterator<Item = D>,
2446        D: Borrow<Datum<'a>>,
2447    {
2448        self.push_list_with(|packer| {
2449            for elem in iter {
2450                packer.push(*elem.borrow())
2451            }
2452        });
2453    }
2454
2455    /// Convenience function to push a `DatumMap` from an iter of `(&str, Datum)` pairs
2456    pub fn push_dict<'a, I, D>(&mut self, iter: I)
2457    where
2458        I: IntoIterator<Item = (&'a str, D)>,
2459        D: Borrow<Datum<'a>>,
2460    {
2461        self.push_dict_with(|packer| {
2462            for (k, v) in iter {
2463                packer.push(Datum::String(k));
2464                packer.push(*v.borrow())
2465            }
2466        })
2467    }
2468
2469    /// Pushes a `Datum::Range` derived from the `Range<Datum<'a>`.
2470    ///
2471    /// # Panics
2472    /// - If lower and upper express finite values and they are datums of
2473    ///   different types.
2474    /// - If lower or upper express finite values and are equal to
2475    ///   `Datum::Null`. To handle `Datum::Null` properly, use
2476    ///   [`RangeBound::new`].
2477    ///
2478    /// # Notes
2479    /// - This function canonicalizes the range before pushing it to the row.
2480    /// - Prefer this function over `push_range_with` because of its
2481    ///   canonicaliztion.
2482    /// - Prefer creating [`RangeBound`]s using [`RangeBound::new`], which
2483    ///   handles `Datum::Null` in a SQL-friendly way.
2484    pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2485        range.canonicalize()?;
2486        match range.inner {
2487            None => {
2488                self.row.data.push(Tag::Range.into());
2489                // Untagged bytes only contains the `RANGE_EMPTY` flag value.
2490                self.row.data.push(range::InternalFlags::EMPTY.bits());
2491                Ok(())
2492            }
2493            Some(inner) => self.push_range_with(
2494                RangeLowerBound {
2495                    inclusive: inner.lower.inclusive,
2496                    bound: inner
2497                        .lower
2498                        .bound
2499                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2500                },
2501                RangeUpperBound {
2502                    inclusive: inner.upper.inclusive,
2503                    bound: inner
2504                        .upper
2505                        .bound
2506                        .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2507                },
2508            ),
2509        }
2510    }
2511
2512    /// Pushes a `DatumRange` built from the specified arguments.
2513    ///
2514    /// # Warning
2515    /// Unlike `push_range`, `push_range_with` _does not_ canonicalize its
2516    /// inputs. Consequentially, this means it's possible to generate ranges
2517    /// that will not reflect the proper ordering and equality.
2518    ///
2519    /// # Panics
2520    /// - If lower or upper expresses a finite value and does not push exactly
2521    ///   one value into the `RowPacker`.
2522    /// - If lower and upper express finite values and they are datums of
2523    ///   different types.
2524    /// - If lower or upper express finite values and push `Datum::Null`.
2525    ///
2526    /// # Notes
2527    /// - Prefer `push_range_with` over this function. This function should be
2528    ///   used only when you are not pushing `Datum`s to the inner row.
2529    /// - Range encoding is `[<flag bytes>,<lower>?,<upper>?]`, where `lower`
2530    ///   and `upper` are optional, contingent on the flag value expressing an
2531    ///   empty range (where neither will be present) or infinite bounds (where
2532    ///   each infinite bound will be absent).
2533    /// - To push an emtpy range, use `push_range` using `Range { inner: None }`.
2534    pub fn push_range_with<L, U, E>(
2535        &mut self,
2536        lower: RangeLowerBound<L>,
2537        upper: RangeUpperBound<U>,
2538    ) -> Result<(), E>
2539    where
2540        L: FnOnce(&mut RowPacker) -> Result<(), E>,
2541        U: FnOnce(&mut RowPacker) -> Result<(), E>,
2542        E: From<InvalidRangeError>,
2543    {
2544        let start = self.row.data.len();
2545        self.row.data.push(Tag::Range.into());
2546
2547        let mut flags = range::InternalFlags::empty();
2548
2549        flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2550        flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2551        flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2552        flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2553
2554        let mut expected_datums = 0;
2555
2556        self.row.data.push(flags.bits());
2557
2558        let datum_check = self.row.data.len();
2559
2560        if let Some(value) = lower.bound {
2561            let start = self.row.data.len();
2562            value(self)?;
2563            assert!(
2564                start < self.row.data.len(),
2565                "finite values must each push exactly one value; expected 1 but got 0"
2566            );
2567            expected_datums += 1;
2568        }
2569
2570        if let Some(value) = upper.bound {
2571            let start = self.row.data.len();
2572            value(self)?;
2573            assert!(
2574                start < self.row.data.len(),
2575                "finite values must each push exactly one value; expected 1 but got 0"
2576            );
2577            expected_datums += 1;
2578        }
2579
2580        // Validate the invariants that 0, 1, or 2 elements were pushed, none are Null,
2581        // and if two are pushed then the second is not less than the first. Panic in
2582        // some cases and error in others.
2583        let mut actual_datums = 0;
2584        let mut seen = None;
2585        let mut dataz = &self.row.data[datum_check..];
2586        while !dataz.is_empty() {
2587            let d = unsafe { read_datum(&mut dataz) };
2588            assert!(d != Datum::Null, "cannot push Datum::Null into range");
2589
2590            match seen {
2591                None => seen = Some(d),
2592                Some(seen) => {
2593                    let seen_kind = DatumKind::from(seen);
2594                    let d_kind = DatumKind::from(d);
2595                    assert!(
2596                        seen_kind == d_kind,
2597                        "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2598                    );
2599
2600                    if seen > d {
2601                        self.row.data.truncate(start);
2602                        return Err(InvalidRangeError::MisorderedRangeBounds.into());
2603                    }
2604                }
2605            }
2606            actual_datums += 1;
2607        }
2608
2609        assert!(
2610            actual_datums == expected_datums,
2611            "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2612        );
2613
2614        Ok(())
2615    }
2616
2617    /// Clears the contents of the packer without de-allocating its backing memory.
2618    pub fn clear(&mut self) {
2619        self.row.data.clear();
2620    }
2621
2622    /// Truncates the underlying storage to the specified byte position.
2623    ///
2624    /// # Safety
2625    ///
2626    /// `pos` MUST specify a byte offset that lies on a datum boundary.
2627    /// If `pos` specifies a byte offset that is *within* a datum, the row
2628    /// packer will produce an invalid row, the unpacking of which may
2629    /// trigger undefined behavior!
2630    ///
2631    /// To find the byte offset of a datum boundary, inspect the packer's
2632    /// byte length by calling `packer.data().len()` after pushing the desired
2633    /// number of datums onto the packer.
2634    pub unsafe fn truncate(&mut self, pos: usize) {
2635        self.row.data.truncate(pos)
2636    }
2637
2638    /// Truncates the underlying row to contain at most the first `n` datums.
2639    pub fn truncate_datums(&mut self, n: usize) {
2640        let prev_len = self.row.data.len();
2641        let mut iter = self.row.iter();
2642        for _ in iter.by_ref().take(n) {}
2643        let next_len = iter.data.len();
2644        // SAFETY: iterator offsets always lie on a datum boundary.
2645        unsafe { self.truncate(prev_len - next_len) }
2646    }
2647
2648    /// Returns the total amount of bytes used by the underlying row.
2649    pub fn byte_len(&self) -> usize {
2650        self.row.byte_len()
2651    }
2652}
2653
2654impl<'a> IntoIterator for &'a Row {
2655    type Item = Datum<'a>;
2656    type IntoIter = DatumListIter<'a>;
2657    fn into_iter(self) -> DatumListIter<'a> {
2658        self.iter()
2659    }
2660}
2661
2662impl fmt::Debug for Row {
2663    /// Debug representation using the internal datums
2664    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2665        f.write_str("Row{")?;
2666        f.debug_list().entries(self.iter()).finish()?;
2667        f.write_str("}")
2668    }
2669}
2670
2671impl fmt::Display for Row {
2672    /// Display representation using the internal datums
2673    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2674        f.write_str("(")?;
2675        for (i, datum) in self.iter().enumerate() {
2676            if i != 0 {
2677                f.write_str(", ")?;
2678            }
2679            write!(f, "{}", datum)?;
2680        }
2681        f.write_str(")")
2682    }
2683}
2684
2685impl<'a> DatumList<'a> {
2686    pub fn empty() -> DatumList<'static> {
2687        DatumList { data: &[] }
2688    }
2689
2690    pub fn iter(&self) -> DatumListIter<'a> {
2691        DatumListIter { data: self.data }
2692    }
2693
2694    /// For debugging only
2695    pub fn data(&self) -> &'a [u8] {
2696        self.data
2697    }
2698}
2699
2700impl<'a> IntoIterator for DatumList<'a> {
2701    type Item = Datum<'a>;
2702    type IntoIter = DatumListIter<'a>;
2703    fn into_iter(self) -> DatumListIter<'a> {
2704        self.iter()
2705    }
2706}
2707
2708impl<'a> Iterator for DatumListIter<'a> {
2709    type Item = Datum<'a>;
2710    fn next(&mut self) -> Option<Self::Item> {
2711        if self.data.is_empty() {
2712            None
2713        } else {
2714            Some(unsafe { read_datum(&mut self.data) })
2715        }
2716    }
2717}
2718
2719impl<'a> DatumMap<'a> {
2720    pub fn empty() -> DatumMap<'static> {
2721        DatumMap { data: &[] }
2722    }
2723
2724    pub fn iter(&self) -> DatumDictIter<'a> {
2725        DatumDictIter {
2726            data: self.data,
2727            prev_key: None,
2728        }
2729    }
2730
2731    /// For debugging only
2732    pub fn data(&self) -> &'a [u8] {
2733        self.data
2734    }
2735}
2736
2737impl<'a> Debug for DatumMap<'a> {
2738    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2739        f.debug_map().entries(self.iter()).finish()
2740    }
2741}
2742
2743impl<'a> IntoIterator for &'a DatumMap<'a> {
2744    type Item = (&'a str, Datum<'a>);
2745    type IntoIter = DatumDictIter<'a>;
2746    fn into_iter(self) -> DatumDictIter<'a> {
2747        self.iter()
2748    }
2749}
2750
2751impl<'a> Iterator for DatumDictIter<'a> {
2752    type Item = (&'a str, Datum<'a>);
2753    fn next(&mut self) -> Option<Self::Item> {
2754        if self.data.is_empty() {
2755            None
2756        } else {
2757            let key_tag =
2758                Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2759            assert!(
2760                key_tag == Tag::StringTiny
2761                    || key_tag == Tag::StringShort
2762                    || key_tag == Tag::StringLong
2763                    || key_tag == Tag::StringHuge,
2764                "Dict keys must be strings, got {:?}",
2765                key_tag
2766            );
2767            let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
2768            let val = unsafe { read_datum(&mut self.data) };
2769
2770            // if in debug mode, sanity check keys
2771            if cfg!(debug_assertions) {
2772                if let Some(prev_key) = self.prev_key {
2773                    debug_assert!(
2774                        prev_key < key,
2775                        "Dict keys must be unique and given in ascending order: {} came before {}",
2776                        prev_key,
2777                        key
2778                    );
2779                }
2780                self.prev_key = Some(key);
2781            }
2782
2783            Some((key, val))
2784        }
2785    }
2786}
2787
2788impl RowArena {
2789    pub fn new() -> Self {
2790        RowArena {
2791            inner: RefCell::new(vec![]),
2792        }
2793    }
2794
2795    /// Creates a `RowArena` with a hint of how many rows will be created in the arena, to avoid
2796    /// reallocations of its internal vector.
2797    pub fn with_capacity(capacity: usize) -> Self {
2798        RowArena {
2799            inner: RefCell::new(Vec::with_capacity(capacity)),
2800        }
2801    }
2802
2803    /// Does a `reserve` on the underlying `Vec`. Call this when you expect `additional` more datums
2804    /// to be created in this arena.
2805    pub fn reserve(&self, additional: usize) {
2806        self.inner.borrow_mut().reserve(additional);
2807    }
2808
2809    /// Take ownership of `bytes` for the lifetime of the arena.
2810    #[allow(clippy::transmute_ptr_to_ptr)]
2811    pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
2812        let mut inner = self.inner.borrow_mut();
2813        inner.push(bytes);
2814        let owned_bytes = &inner[inner.len() - 1];
2815        unsafe {
2816            // This is safe because:
2817            //   * We only ever append to self.inner, so the byte vector
2818            //     will live as long as the arena.
2819            //   * We return a reference to the byte vector's contents, so it's
2820            //     okay if self.inner reallocates and moves the byte
2821            //     vector.
2822            //   * We don't allow access to the byte vector itself, so it will
2823            //     never reallocate.
2824            transmute::<&[u8], &'a [u8]>(owned_bytes)
2825        }
2826    }
2827
2828    /// Take ownership of `string` for the lifetime of the arena.
2829    pub fn push_string<'a>(&'a self, string: String) -> &'a str {
2830        let owned_bytes = self.push_bytes(string.into_bytes());
2831        unsafe {
2832            // This is safe because we know it was a `String` just before.
2833            std::str::from_utf8_unchecked(owned_bytes)
2834        }
2835    }
2836
2837    /// Take ownership of `row` for the lifetime of the arena, returning a
2838    /// reference to the first datum in the row.
2839    ///
2840    /// If we had an owned datum type, this method would be much clearer, and
2841    /// would be called `push_owned_datum`.
2842    pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
2843        let mut inner = self.inner.borrow_mut();
2844        inner.push(row.data.into_vec());
2845        unsafe {
2846            // This is safe because:
2847            //   * We only ever append to self.inner, so the row data will live
2848            //     as long as the arena.
2849            //   * We force the row data into its own heap allocation--
2850            //     importantly, we do NOT store the SmallVec, which might be
2851            //     storing data inline--so it's okay if self.inner reallocates
2852            //     and moves the row.
2853            //   * We don't allow access to the byte vector itself, so it will
2854            //     never reallocate.
2855            let datum = read_datum(&mut &inner[inner.len() - 1][..]);
2856            transmute::<Datum<'_>, Datum<'a>>(datum)
2857        }
2858    }
2859
2860    /// Equivalent to `push_unary_row` but returns a `DatumNested` rather than a
2861    /// `Datum`.
2862    fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
2863        let mut inner = self.inner.borrow_mut();
2864        inner.push(row.data.into_vec());
2865        unsafe {
2866            // This is safe because:
2867            //   * We only ever append to self.inner, so the row data will live
2868            //     as long as the arena.
2869            //   * We force the row data into its own heap allocation--
2870            //     importantly, we do NOT store the SmallVec, which might be
2871            //     storing data inline--so it's okay if self.inner reallocates
2872            //     and moves the row.
2873            //   * We don't allow access to the byte vector itself, so it will
2874            //     never reallocate.
2875            let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
2876            transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
2877        }
2878    }
2879
2880    /// Convenience function to make a new `Row` containing a single datum, and
2881    /// take ownership of it for the lifetime of the arena
2882    ///
2883    /// ```
2884    /// # use mz_repr::{RowArena, Datum};
2885    /// let arena = RowArena::new();
2886    /// let datum = arena.make_datum(|packer| {
2887    ///   packer.push_list(&[Datum::String("hello"), Datum::String("world")]);
2888    /// });
2889    /// assert_eq!(datum.unwrap_list().iter().collect::<Vec<_>>(), vec![Datum::String("hello"), Datum::String("world")]);
2890    /// ```
2891    pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
2892    where
2893        F: FnOnce(&mut RowPacker),
2894    {
2895        let mut row = Row::default();
2896        f(&mut row.packer());
2897        self.push_unary_row(row)
2898    }
2899
2900    /// Convenience function identical to `make_datum` but instead returns a
2901    /// `DatumNested`.
2902    pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
2903    where
2904        F: FnOnce(&mut RowPacker),
2905    {
2906        let mut row = Row::default();
2907        f(&mut row.packer());
2908        self.push_unary_row_datum_nested(row)
2909    }
2910
2911    /// Like [`RowArena::make_datum`], but the provided closure can return an error.
2912    pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
2913    where
2914        F: FnOnce(&mut RowPacker) -> Result<(), E>,
2915    {
2916        let mut row = Row::default();
2917        f(&mut row.packer())?;
2918        Ok(self.push_unary_row(row))
2919    }
2920
2921    /// Clear the contents of the arena.
2922    pub fn clear(&mut self) {
2923        self.inner.borrow_mut().clear();
2924    }
2925}
2926
2927impl Default for RowArena {
2928    fn default() -> RowArena {
2929        RowArena::new()
2930    }
2931}
2932
2933/// A thread-local row, which can be borrowed and returned.
2934/// # Example
2935///
2936/// Use this type instead of creating a new row:
2937/// ```
2938/// use mz_repr::SharedRow;
2939///
2940/// let mut row_builder = SharedRow::get();
2941/// ```
2942///
2943/// This allows us to reuse an existing row allocation instead of creating a new one or retaining
2944/// an allocation locally. Additionally, we can observe the size of the local row in a central
2945/// place and potentially reallocate to reduce memory needs.
2946///
2947/// # Panic
2948///
2949/// [`SharedRow::get`] panics when trying to obtain multiple references to the shared row.
2950#[derive(Debug)]
2951pub struct SharedRow(Row);
2952
2953impl SharedRow {
2954    thread_local! {
2955        /// A thread-local slot containing a shared Row that can be temporarily used by a function.
2956        /// There can be at most one active user of this Row, which is tracked by the state of the
2957        /// `Option<_>` wrapper. When it is `Some(..)`, the row is available for using. When it
2958        /// is `None`, it is not, and the constructor will panic if a thread attempts to use it.
2959        static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
2960    }
2961
2962    /// Get the shared row.
2963    ///
2964    /// The row's contents are cleared before returning it.
2965    ///
2966    /// # Panic
2967    ///
2968    /// Panics when the row is already borrowed elsewhere.
2969    pub fn get() -> Self {
2970        let mut row = Self::SHARED_ROW
2971            .take()
2972            .expect("attempted to borrow already borrowed SharedRow");
2973        // Clear row
2974        row.packer();
2975        Self(row)
2976    }
2977
2978    /// Gets the shared row and uses it to pack `iter`.
2979    pub fn pack<'a, I, D>(iter: I) -> Row
2980    where
2981        I: IntoIterator<Item = D>,
2982        D: Borrow<Datum<'a>>,
2983    {
2984        let mut row_builder = Self::get();
2985        let mut row_packer = row_builder.packer();
2986        row_packer.extend(iter);
2987        row_builder.clone()
2988    }
2989}
2990
2991impl std::ops::Deref for SharedRow {
2992    type Target = Row;
2993
2994    fn deref(&self) -> &Self::Target {
2995        &self.0
2996    }
2997}
2998
2999impl std::ops::DerefMut for SharedRow {
3000    fn deref_mut(&mut self) -> &mut Self::Target {
3001        &mut self.0
3002    }
3003}
3004
3005impl Drop for SharedRow {
3006    fn drop(&mut self) {
3007        // Take the Row allocation from this instance and put it back in the thread local slot for
3008        // the next user. The Row in `self` is replaced with an empty Row which does not allocate.
3009        Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
3010    }
3011}
3012
3013#[cfg(test)]
3014mod tests {
3015    use std::cmp::Ordering;
3016    use std::collections::hash_map::DefaultHasher;
3017    use std::hash::{Hash, Hasher};
3018
3019    use chrono::{DateTime, NaiveDate};
3020    use itertools::Itertools;
3021    use mz_ore::{assert_err, assert_none};
3022    use ordered_float::OrderedFloat;
3023
3024    use crate::SqlScalarType;
3025
3026    use super::*;
3027
3028    fn hash<T: Hash>(t: &T) -> u64 {
3029        let mut hasher = DefaultHasher::new();
3030        t.hash(&mut hasher);
3031        hasher.finish()
3032    }
3033
3034    #[mz_ore::test]
3035    fn test_assumptions() {
3036        assert_eq!(size_of::<Tag>(), 1);
3037        #[cfg(target_endian = "big")]
3038        {
3039            // if you want to run this on a big-endian cpu, we'll need big-endian versions of the serialization code
3040            assert!(false);
3041        }
3042    }
3043
3044    #[mz_ore::test]
3045    fn miri_test_arena() {
3046        let arena = RowArena::new();
3047
3048        assert_eq!(arena.push_string("".to_owned()), "");
3049        assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
3050
3051        let empty: &[u8] = &[];
3052        assert_eq!(arena.push_bytes(vec![]), empty);
3053        assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
3054
3055        let mut row = Row::default();
3056        let mut packer = row.packer();
3057        packer.push_dict_with(|row| {
3058            row.push(Datum::String("a"));
3059            row.push_list_with(|row| {
3060                row.push(Datum::String("one"));
3061                row.push(Datum::String("two"));
3062                row.push(Datum::String("three"));
3063            });
3064            row.push(Datum::String("b"));
3065            row.push(Datum::String("c"));
3066        });
3067        assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
3068    }
3069
3070    #[mz_ore::test]
3071    fn miri_test_round_trip() {
3072        fn round_trip(datums: Vec<Datum>) {
3073            let row = Row::pack(datums.clone());
3074
3075            // When run under miri this catches undefined bytes written to data
3076            // eg by calling push_copy! on a type which contains undefined padding values
3077            println!("{:?}", row.data());
3078
3079            let datums2 = row.iter().collect::<Vec<_>>();
3080            let datums3 = row.unpack();
3081            assert_eq!(datums, datums2);
3082            assert_eq!(datums, datums3);
3083        }
3084
3085        round_trip(vec![]);
3086        round_trip(
3087            SqlScalarType::enumerate()
3088                .iter()
3089                .flat_map(|r#type| r#type.interesting_datums())
3090                .collect(),
3091        );
3092        round_trip(vec![
3093            Datum::Null,
3094            Datum::Null,
3095            Datum::False,
3096            Datum::True,
3097            Datum::Int16(-21),
3098            Datum::Int32(-42),
3099            Datum::Int64(-2_147_483_648 - 42),
3100            Datum::UInt8(0),
3101            Datum::UInt8(1),
3102            Datum::UInt16(0),
3103            Datum::UInt16(1),
3104            Datum::UInt16(1 << 8),
3105            Datum::UInt32(0),
3106            Datum::UInt32(1),
3107            Datum::UInt32(1 << 8),
3108            Datum::UInt32(1 << 16),
3109            Datum::UInt32(1 << 24),
3110            Datum::UInt64(0),
3111            Datum::UInt64(1),
3112            Datum::UInt64(1 << 8),
3113            Datum::UInt64(1 << 16),
3114            Datum::UInt64(1 << 24),
3115            Datum::UInt64(1 << 32),
3116            Datum::UInt64(1 << 40),
3117            Datum::UInt64(1 << 48),
3118            Datum::UInt64(1 << 56),
3119            Datum::Float32(OrderedFloat::from(-42.12)),
3120            Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
3121            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
3122            Datum::Timestamp(
3123                CheckedTimestamp::from_timestamplike(
3124                    NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
3125                        .unwrap()
3126                        .and_hms_opt(14, 32, 11)
3127                        .unwrap(),
3128                )
3129                .unwrap(),
3130            ),
3131            Datum::TimestampTz(
3132                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3133                    .unwrap(),
3134            ),
3135            Datum::Interval(Interval {
3136                months: 312,
3137                ..Default::default()
3138            }),
3139            Datum::Interval(Interval::new(0, 0, 1_012_312)),
3140            Datum::Bytes(&[]),
3141            Datum::Bytes(&[0, 2, 1, 255]),
3142            Datum::String(""),
3143            Datum::String("العَرَبِيَّة"),
3144        ]);
3145    }
3146
3147    #[mz_ore::test]
3148    fn test_array() {
3149        // Construct an array using `Row::push_array` and verify that it unpacks
3150        // correctly.
3151        const DIM: ArrayDimension = ArrayDimension {
3152            lower_bound: 2,
3153            length: 2,
3154        };
3155        let mut row = Row::default();
3156        let mut packer = row.packer();
3157        packer
3158            .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3159            .unwrap();
3160        let arr1 = row.unpack_first().unwrap_array();
3161        assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3162        assert_eq!(
3163            arr1.elements().into_iter().collect::<Vec<_>>(),
3164            vec![Datum::Int32(1), Datum::Int32(2)]
3165        );
3166
3167        // Pack a previously-constructed `Datum::Array` and verify that it
3168        // unpacks correctly.
3169        let row = Row::pack_slice(&[Datum::Array(arr1)]);
3170        let arr2 = row.unpack_first().unwrap_array();
3171        assert_eq!(arr1, arr2);
3172    }
3173
3174    #[mz_ore::test]
3175    fn test_multidimensional_array() {
3176        let datums = vec![
3177            Datum::Int32(1),
3178            Datum::Int32(2),
3179            Datum::Int32(3),
3180            Datum::Int32(4),
3181            Datum::Int32(5),
3182            Datum::Int32(6),
3183            Datum::Int32(7),
3184            Datum::Int32(8),
3185        ];
3186
3187        let mut row = Row::default();
3188        let mut packer = row.packer();
3189        packer
3190            .try_push_array(
3191                &[
3192                    ArrayDimension {
3193                        lower_bound: 1,
3194                        length: 1,
3195                    },
3196                    ArrayDimension {
3197                        lower_bound: 1,
3198                        length: 4,
3199                    },
3200                    ArrayDimension {
3201                        lower_bound: 1,
3202                        length: 2,
3203                    },
3204                ],
3205                &datums,
3206            )
3207            .unwrap();
3208        let array = row.unpack_first().unwrap_array();
3209        assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3210    }
3211
3212    #[mz_ore::test]
3213    fn test_array_max_dimensions() {
3214        let mut row = Row::default();
3215        let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3216
3217        // An array with one too many dimensions should be rejected.
3218        let res = row.packer().try_push_array(
3219            &vec![
3220                ArrayDimension {
3221                    lower_bound: 1,
3222                    length: 1
3223                };
3224                max_dims + 1
3225            ],
3226            vec![Datum::Int32(4)],
3227        );
3228        assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3229        assert!(row.data.is_empty());
3230
3231        // An array with exactly the maximum allowable dimensions should be
3232        // accepted.
3233        row.packer()
3234            .try_push_array(
3235                &vec![
3236                    ArrayDimension {
3237                        lower_bound: 1,
3238                        length: 1
3239                    };
3240                    max_dims
3241                ],
3242                vec![Datum::Int32(4)],
3243            )
3244            .unwrap();
3245    }
3246
3247    #[mz_ore::test]
3248    fn test_array_wrong_cardinality() {
3249        let mut row = Row::default();
3250        let res = row.packer().try_push_array(
3251            &[
3252                ArrayDimension {
3253                    lower_bound: 1,
3254                    length: 2,
3255                },
3256                ArrayDimension {
3257                    lower_bound: 1,
3258                    length: 3,
3259                },
3260            ],
3261            vec![Datum::Int32(1), Datum::Int32(2)],
3262        );
3263        assert_eq!(
3264            res,
3265            Err(InvalidArrayError::WrongCardinality {
3266                actual: 2,
3267                expected: 6,
3268            })
3269        );
3270        assert!(row.data.is_empty());
3271    }
3272
3273    #[mz_ore::test]
3274    fn test_nesting() {
3275        let mut row = Row::default();
3276        row.packer().push_dict_with(|row| {
3277            row.push(Datum::String("favourites"));
3278            row.push_list_with(|row| {
3279                row.push(Datum::String("ice cream"));
3280                row.push(Datum::String("oreos"));
3281                row.push(Datum::String("cheesecake"));
3282            });
3283            row.push(Datum::String("name"));
3284            row.push(Datum::String("bob"));
3285        });
3286
3287        let mut iter = row.unpack_first().unwrap_map().iter();
3288
3289        let (k, v) = iter.next().unwrap();
3290        assert_eq!(k, "favourites");
3291        assert_eq!(
3292            v.unwrap_list().iter().collect::<Vec<_>>(),
3293            vec![
3294                Datum::String("ice cream"),
3295                Datum::String("oreos"),
3296                Datum::String("cheesecake"),
3297            ]
3298        );
3299
3300        let (k, v) = iter.next().unwrap();
3301        assert_eq!(k, "name");
3302        assert_eq!(v, Datum::String("bob"));
3303    }
3304
3305    #[mz_ore::test]
3306    fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3307        let pack = |ok| {
3308            let mut row = Row::default();
3309            row.packer().push_dict_with(|row| {
3310                if ok {
3311                    row.push(Datum::String("key"));
3312                    row.push(Datum::Int32(42));
3313                    Ok(7)
3314                } else {
3315                    Err("fail")
3316                }
3317            })?;
3318            Ok(row)
3319        };
3320
3321        assert_eq!(pack(false), Err("fail"));
3322
3323        let row = pack(true)?;
3324        let mut dict = row.unpack_first().unwrap_map().iter();
3325        assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3326        assert_eq!(dict.next(), None);
3327
3328        Ok(())
3329    }
3330
3331    #[mz_ore::test]
3332    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
3333    fn test_datum_sizes() {
3334        let arena = RowArena::new();
3335
3336        // Test the claims about various datum sizes.
3337        let values_of_interest = vec![
3338            Datum::Null,
3339            Datum::False,
3340            Datum::Int16(0),
3341            Datum::Int32(0),
3342            Datum::Int64(0),
3343            Datum::UInt8(0),
3344            Datum::UInt8(1),
3345            Datum::UInt16(0),
3346            Datum::UInt16(1),
3347            Datum::UInt16(1 << 8),
3348            Datum::UInt32(0),
3349            Datum::UInt32(1),
3350            Datum::UInt32(1 << 8),
3351            Datum::UInt32(1 << 16),
3352            Datum::UInt32(1 << 24),
3353            Datum::UInt64(0),
3354            Datum::UInt64(1),
3355            Datum::UInt64(1 << 8),
3356            Datum::UInt64(1 << 16),
3357            Datum::UInt64(1 << 24),
3358            Datum::UInt64(1 << 32),
3359            Datum::UInt64(1 << 40),
3360            Datum::UInt64(1 << 48),
3361            Datum::UInt64(1 << 56),
3362            Datum::Float32(OrderedFloat(0.0)),
3363            Datum::Float64(OrderedFloat(0.0)),
3364            Datum::from(numeric::Numeric::from(0)),
3365            Datum::from(numeric::Numeric::from(1000)),
3366            Datum::from(numeric::Numeric::from(9999)),
3367            Datum::Date(
3368                NaiveDate::from_ymd_opt(1, 1, 1)
3369                    .unwrap()
3370                    .try_into()
3371                    .unwrap(),
3372            ),
3373            Datum::Timestamp(
3374                CheckedTimestamp::from_timestamplike(
3375                    DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3376                )
3377                .unwrap(),
3378            ),
3379            Datum::TimestampTz(
3380                CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3381                    .unwrap(),
3382            ),
3383            Datum::Interval(Interval::default()),
3384            Datum::Bytes(&[]),
3385            Datum::String(""),
3386            Datum::JsonNull,
3387            Datum::Range(Range { inner: None }),
3388            arena.make_datum(|packer| {
3389                packer
3390                    .push_range(Range::new(Some((
3391                        RangeLowerBound::new(Datum::Int32(-1), true),
3392                        RangeUpperBound::new(Datum::Int32(1), true),
3393                    ))))
3394                    .unwrap();
3395            }),
3396        ];
3397        for value in values_of_interest {
3398            if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3399                panic!("Disparity in claimed size for {:?}", value);
3400            }
3401        }
3402    }
3403
3404    #[mz_ore::test]
3405    fn test_range_errors() {
3406        fn test_range_errors_inner<'a>(
3407            datums: Vec<Vec<Datum<'a>>>,
3408        ) -> Result<(), InvalidRangeError> {
3409            let mut row = Row::default();
3410            let row_len = row.byte_len();
3411            let mut packer = row.packer();
3412            let r = packer.push_range_with(
3413                RangeLowerBound {
3414                    inclusive: true,
3415                    bound: Some(|row: &mut RowPacker| {
3416                        for d in &datums[0] {
3417                            row.push(d);
3418                        }
3419                        Ok(())
3420                    }),
3421                },
3422                RangeUpperBound {
3423                    inclusive: true,
3424                    bound: Some(|row: &mut RowPacker| {
3425                        for d in &datums[1] {
3426                            row.push(d);
3427                        }
3428                        Ok(())
3429                    }),
3430                },
3431            );
3432
3433            assert_eq!(row_len, row.byte_len());
3434
3435            r
3436        }
3437
3438        for panicking_case in [
3439            vec![vec![Datum::Int32(1)], vec![]],
3440            vec![
3441                vec![Datum::Int32(1), Datum::Int32(2)],
3442                vec![Datum::Int32(3)],
3443            ],
3444            vec![
3445                vec![Datum::Int32(1)],
3446                vec![Datum::Int32(2), Datum::Int32(3)],
3447            ],
3448            vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3449            vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3450            vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3451            vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3452        ] {
3453            #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
3454            let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3455            assert_err!(result);
3456        }
3457
3458        let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3459        assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3460    }
3461
3462    /// Lists have a variable-length encoding for their lengths. We test each case here.
3463    #[mz_ore::test]
3464    #[cfg_attr(miri, ignore)] // slow
3465    fn test_list_encoding() {
3466        fn test_list_encoding_inner(len: usize) {
3467            let list_elem = |i: usize| {
3468                if i % 2 == 0 {
3469                    Datum::False
3470                } else {
3471                    Datum::True
3472                }
3473            };
3474            let mut row = Row::default();
3475            {
3476                // Push some stuff.
3477                let mut packer = row.packer();
3478                packer.push(Datum::String("start"));
3479                packer.push_list_with(|packer| {
3480                    for i in 0..len {
3481                        packer.push(list_elem(i));
3482                    }
3483                });
3484                packer.push(Datum::String("end"));
3485            }
3486            // Check that we read back exactly what we pushed.
3487            let mut row_it = row.iter();
3488            assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3489            match row_it.next().unwrap() {
3490                Datum::List(list) => {
3491                    let mut list_it = list.iter();
3492                    for i in 0..len {
3493                        assert_eq!(list_it.next().unwrap(), list_elem(i));
3494                    }
3495                    assert_none!(list_it.next());
3496                }
3497                _ => panic!("expected Datum::List"),
3498            }
3499            assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3500            assert_none!(row_it.next());
3501        }
3502
3503        test_list_encoding_inner(0);
3504        test_list_encoding_inner(1);
3505        test_list_encoding_inner(10);
3506        test_list_encoding_inner(TINY - 1); // tiny
3507        test_list_encoding_inner(TINY + 1); // short
3508        test_list_encoding_inner(SHORT + 1); // long
3509
3510        // The biggest one takes 40 s on my laptop, probably not worth it.
3511        //test_list_encoding_inner(LONG + 1); // huge
3512    }
3513
3514    /// Demonstrates that DatumList's Eq (bytewise) and Ord (datum-by-datum) are now consistent.
3515    /// A list containing -0.0 and one containing +0.0 have different byte representations
3516    /// (IEEE 754 distinguishes them), originally Eq says they are not equal. But after
3517    /// using the new Datum::cmp, Eq says they are equal, which matches what Ord
3518    /// compares via iter().cmp(other.iter()), and them as equal.
3519    #[mz_ore::test]
3520    fn test_datum_list_eq_ord_consistency() {
3521        // Build list containing +0.0
3522        let mut row_pos = Row::default();
3523        row_pos.packer().push_list_with(|p| {
3524            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3525        });
3526        let list_pos = row_pos.unpack_first().unwrap_list();
3527
3528        // Build list containing -0.0 (distinct bit pattern from +0.0)
3529        let mut row_neg = Row::default();
3530        row_neg.packer().push_list_with(|p| {
3531            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3532        });
3533        let list_neg = row_neg.unpack_first().unwrap_list();
3534
3535        // Eq is bytewise: different encodings => not equal
3536        // This was a bug in the past, so we test it.
3537        assert_eq!(
3538            list_pos, list_neg,
3539            "Eq should see different encodings as equal"
3540        );
3541
3542        // Ord is datum-by-datum: -0.0 and +0.0 compare equal as Datums
3543        assert_eq!(
3544            list_pos.cmp(&list_neg),
3545            Ordering::Equal,
3546            "Ord (datum-by-datum) should see -0.0 and +0.0 as equal"
3547        );
3548    }
3549
3550    /// Demonstrates that DatumMap's derived Eq (bytewise) can make maps with equal keys and
3551    /// values compare equal when values have different encodings (e.g. -0.0 vs +0.0).
3552    #[mz_ore::test]
3553    fn test_datum_map_eq_bytewise_consistency() {
3554        // Build map {"k": +0.0}
3555        let mut row_pos = Row::default();
3556        row_pos.packer().push_dict_with(|p| {
3557            p.push(Datum::String("k"));
3558            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3559        });
3560        let map_pos = row_pos.unpack_first().unwrap_map();
3561
3562        // Build map {"k": -0.0}
3563        let mut row_neg = Row::default();
3564        row_neg.packer().push_dict_with(|p| {
3565            p.push(Datum::String("k"));
3566            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3567        });
3568        let map_neg = row_neg.unpack_first().unwrap_map();
3569
3570        // Same keys and semantically equal values, but Eq (bytewise) says not equal
3571        assert_eq!(
3572            map_pos, map_neg,
3573            "DatumMap Eq is semantic; -0.0 and +0.0 have different encodings but are equal"
3574        );
3575        // Verify they have the same logical content
3576        let entries_pos: Vec<_> = map_pos.iter().collect();
3577        let entries_neg: Vec<_> = map_neg.iter().collect();
3578        assert_eq!(entries_pos.len(), entries_neg.len());
3579        for ((k1, v1), (k2, v2)) in entries_pos.iter().zip_eq(entries_neg.iter()) {
3580            assert_eq!(k1, k2);
3581            assert_eq!(
3582                v1, v2,
3583                "Datum-level comparison treats -0.0 and +0.0 as equal"
3584            );
3585        }
3586    }
3587
3588    /// Hash must agree with Eq: equal lists must have the same hash.
3589    #[mz_ore::test]
3590    fn test_datum_list_hash_consistency() {
3591        // Equal lists (including -0.0 vs +0.0) must hash the same
3592        let mut row_pos = Row::default();
3593        row_pos.packer().push_list_with(|p| {
3594            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3595        });
3596        let list_pos = row_pos.unpack_first().unwrap_list();
3597
3598        let mut row_neg = Row::default();
3599        row_neg.packer().push_list_with(|p| {
3600            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3601        });
3602        let list_neg = row_neg.unpack_first().unwrap_list();
3603
3604        assert_eq!(list_pos, list_neg);
3605        assert_eq!(
3606            hash(&list_pos),
3607            hash(&list_neg),
3608            "equal lists must have same hash"
3609        );
3610
3611        // Unequal lists should have different hashes (with asymptotic probability 1)
3612        let mut row_a = Row::default();
3613        row_a.packer().push_list_with(|p| {
3614            p.push(Datum::Int32(1));
3615            p.push(Datum::Int32(2));
3616        });
3617        let list_a = row_a.unpack_first().unwrap_list();
3618
3619        let mut row_b = Row::default();
3620        row_b.packer().push_list_with(|p| {
3621            p.push(Datum::Int32(1));
3622            p.push(Datum::Int32(3));
3623        });
3624        let list_b = row_b.unpack_first().unwrap_list();
3625
3626        assert_ne!(list_a, list_b);
3627        assert_ne!(
3628            hash(&list_a),
3629            hash(&list_b),
3630            "unequal lists must have different hashes"
3631        );
3632    }
3633
3634    /// Ord/PartialOrd for DatumList: less, equal, greater.
3635    #[mz_ore::test]
3636    fn test_datum_list_ordering() {
3637        let mut row_12 = Row::default();
3638        row_12.packer().push_list_with(|p| {
3639            p.push(Datum::Int32(1));
3640            p.push(Datum::Int32(2));
3641        });
3642        let list_12 = row_12.unpack_first().unwrap_list();
3643
3644        let mut row_13 = Row::default();
3645        row_13.packer().push_list_with(|p| {
3646            p.push(Datum::Int32(1));
3647            p.push(Datum::Int32(3));
3648        });
3649        let list_13 = row_13.unpack_first().unwrap_list();
3650
3651        let mut row_123 = Row::default();
3652        row_123.packer().push_list_with(|p| {
3653            p.push(Datum::Int32(1));
3654            p.push(Datum::Int32(2));
3655            p.push(Datum::Int32(3));
3656        });
3657        let list_123 = row_123.unpack_first().unwrap_list();
3658
3659        // [1, 2] < [1, 3] due to the second element being different
3660        assert_eq!(list_12.cmp(&list_13), Ordering::Less);
3661        assert_eq!(list_13.cmp(&list_12), Ordering::Greater);
3662        assert_eq!(list_12.cmp(&list_12), Ordering::Equal);
3663        // shorter prefix compares less
3664        assert_eq!(list_12.cmp(&list_123), Ordering::Less);
3665    }
3666
3667    /// Hash must agree with Eq: equal maps must have the same hash.
3668    #[mz_ore::test]
3669    fn test_datum_map_hash_consistency() {
3670        let mut row_pos = Row::default();
3671        row_pos.packer().push_dict_with(|p| {
3672            p.push(Datum::String("x"));
3673            p.push(Datum::Float64(OrderedFloat::from(0.0)));
3674        });
3675        let map_pos = row_pos.unpack_first().unwrap_map();
3676
3677        let mut row_neg = Row::default();
3678        row_neg.packer().push_dict_with(|p| {
3679            p.push(Datum::String("x"));
3680            p.push(Datum::Float64(OrderedFloat::from(-0.0)));
3681        });
3682        let map_neg = row_neg.unpack_first().unwrap_map();
3683
3684        assert_eq!(map_pos, map_neg);
3685        assert_eq!(
3686            hash(&map_pos),
3687            hash(&map_neg),
3688            "equal maps must have same hash"
3689        );
3690
3691        let mut row_a = Row::default();
3692        row_a.packer().push_dict_with(|p| {
3693            p.push(Datum::String("a"));
3694            p.push(Datum::Int32(1));
3695        });
3696        let map_a = row_a.unpack_first().unwrap_map();
3697
3698        let mut row_b = Row::default();
3699        row_b.packer().push_dict_with(|p| {
3700            p.push(Datum::String("a"));
3701            p.push(Datum::Int32(2));
3702        });
3703        let map_b = row_b.unpack_first().unwrap_map();
3704
3705        assert_ne!(map_a, map_b);
3706        assert_ne!(
3707            hash(&map_a),
3708            hash(&map_b),
3709            "unequal maps must have different hashes"
3710        );
3711    }
3712
3713    /// Ord/PartialOrd for DatumMap: less, equal, greater (by key then value).
3714    #[mz_ore::test]
3715    fn test_datum_map_ordering() {
3716        let mut row_a1 = Row::default();
3717        row_a1.packer().push_dict_with(|p| {
3718            p.push(Datum::String("a"));
3719            p.push(Datum::Int32(1));
3720        });
3721        let map_a1 = row_a1.unpack_first().unwrap_map();
3722
3723        let mut row_a2 = Row::default();
3724        row_a2.packer().push_dict_with(|p| {
3725            p.push(Datum::String("a"));
3726            p.push(Datum::Int32(2));
3727        });
3728        let map_a2 = row_a2.unpack_first().unwrap_map();
3729
3730        let mut row_b1 = Row::default();
3731        row_b1.packer().push_dict_with(|p| {
3732            p.push(Datum::String("b"));
3733            p.push(Datum::Int32(1));
3734        });
3735        let map_b1 = row_b1.unpack_first().unwrap_map();
3736
3737        assert_eq!(map_a1.cmp(&map_a2), Ordering::Less);
3738        assert_eq!(map_a2.cmp(&map_a1), Ordering::Greater);
3739        assert_eq!(map_a1.cmp(&map_a1), Ordering::Equal);
3740        assert_eq!(map_a1.cmp(&map_b1), Ordering::Less); // "a" < "b"
3741    }
3742
3743    /// Datum puts Null last in the enum so that nulls sort last (PostgreSQL default).
3744    /// This ordering is used when comparing DatumList/DatumMap (e.g. jsonb_agg tiebreaker).
3745    #[mz_ore::test]
3746    fn test_datum_list_and_map_null_sorts_last() {
3747        // DatumList: [1] < [null] so non-null sorts before null
3748        let mut row_list_1 = Row::default();
3749        row_list_1
3750            .packer()
3751            .push_list_with(|p| p.push(Datum::Int32(1)));
3752        let list_1 = row_list_1.unpack_first().unwrap_list();
3753
3754        let mut row_list_null = Row::default();
3755        row_list_null
3756            .packer()
3757            .push_list_with(|p| p.push(Datum::Null));
3758        let list_null = row_list_null.unpack_first().unwrap_list();
3759
3760        assert_eq!(list_1.cmp(&list_null), Ordering::Less);
3761        assert_eq!(list_null.cmp(&list_1), Ordering::Greater);
3762
3763        // DatumMap: {"k": 1} < {"k": null} so non-null sorts before null (same as jsonb_agg)
3764        let mut row_map_1 = Row::default();
3765        row_map_1.packer().push_dict_with(|p| {
3766            p.push(Datum::String("k"));
3767            p.push(Datum::Int32(1));
3768        });
3769        let map_1 = row_map_1.unpack_first().unwrap_map();
3770
3771        let mut row_map_null = Row::default();
3772        row_map_null.packer().push_dict_with(|p| {
3773            p.push(Datum::String("k"));
3774            p.push(Datum::Null);
3775        });
3776        let map_null = row_map_null.unpack_first().unwrap_map();
3777
3778        assert_eq!(map_1.cmp(&map_null), Ordering::Less);
3779        assert_eq!(map_null.cmp(&map_1), Ordering::Greater);
3780    }
3781}