1use std::borrow::Borrow;
11use std::cell::RefCell;
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::mem::{size_of, transmute};
16use std::ops::Deref;
17use std::rc::Rc;
18use std::str;
19
20use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
21use compact_bytes::CompactBytes;
22use mz_ore::cast::{CastFrom, ReinterpretCast};
23use mz_ore::soft_assert_no_log;
24use mz_ore::vec::Vector;
25use mz_persist_types::Codec64;
26use num_enum::{IntoPrimitive, TryFromPrimitive};
27use ordered_float::OrderedFloat;
28use proptest::prelude::*;
29use proptest::strategy::{BoxedStrategy, Strategy};
30use serde::{Deserialize, Serialize};
31use uuid::Uuid;
32
33use crate::adt::array::{
34 Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
35};
36use crate::adt::date::Date;
37use crate::adt::interval::Interval;
38use crate::adt::mz_acl_item::{AclItem, MzAclItem};
39use crate::adt::numeric;
40use crate::adt::numeric::Numeric;
41use crate::adt::range::{
42 self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
43};
44use crate::adt::timestamp::CheckedTimestamp;
45use crate::scalar::{DatumKind, arb_datum};
46use crate::{Datum, RelationDesc, Timestamp};
47
48pub(crate) mod encode;
49pub mod iter;
50
51include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
52
53#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
110pub struct Row {
111 data: CompactBytes,
112}
113
114impl Row {
115 const SIZE: usize = CompactBytes::MAX_INLINE;
116
117 pub fn decode_from_proto(
120 &mut self,
121 proto: &ProtoRow,
122 desc: &RelationDesc,
123 ) -> Result<(), String> {
124 let mut packer = self.packer();
125 for (col_idx, _, _) in desc.iter_all() {
126 let d = match proto.datums.get(col_idx.to_raw()) {
127 Some(x) => x,
128 None => {
129 packer.push(Datum::Null);
130 continue;
131 }
132 };
133 packer.try_push_proto(d)?;
134 }
135
136 Ok(())
137 }
138
139 #[inline]
141 pub fn with_capacity(cap: usize) -> Self {
142 Self {
143 data: CompactBytes::with_capacity(cap),
144 }
145 }
146
147 pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
154 Row {
155 data: CompactBytes::new(data),
156 }
157 }
158
159 pub fn packer(&mut self) -> RowPacker<'_> {
165 self.clear();
166 RowPacker { row: self }
167 }
168
169 pub fn pack<'a, I, D>(iter: I) -> Row
177 where
178 I: IntoIterator<Item = D>,
179 D: Borrow<Datum<'a>>,
180 {
181 let mut row = Row::default();
182 row.packer().extend(iter);
183 row
184 }
185
186 pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
191 where
192 I: IntoIterator<Item = D>,
193 D: Borrow<Datum<'a>>,
194 {
195 self.packer().extend(iter);
196 self.clone()
197 }
198
199 pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
203 where
204 I: IntoIterator<Item = Result<D, E>>,
205 D: Borrow<Datum<'a>>,
206 {
207 let mut row = Row::default();
208 row.packer().try_extend(iter)?;
209 Ok(row)
210 }
211
212 pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
218 let mut row = Row::with_capacity(datums_size(slice.iter()));
220 row.packer().extend(slice.iter());
221 row
222 }
223
224 pub fn byte_len(&self) -> usize {
226 let heap_size = if self.data.spilled() {
227 self.data.len()
228 } else {
229 0
230 };
231 let inline_size = std::mem::size_of::<Self>();
232 inline_size.saturating_add(heap_size)
233 }
234
235 pub fn data_len(&self) -> usize {
237 self.data.len()
238 }
239
240 pub fn byte_capacity(&self) -> usize {
242 self.data.capacity()
243 }
244
245 #[inline]
247 pub fn as_row_ref(&self) -> &RowRef {
248 RowRef::from_slice(self.data.as_slice())
249 }
250
251 #[inline]
253 fn clear(&mut self) {
254 self.data.clear();
255 }
256}
257
258impl Borrow<RowRef> for Row {
259 #[inline]
260 fn borrow(&self) -> &RowRef {
261 self.as_row_ref()
262 }
263}
264
265impl AsRef<RowRef> for Row {
266 #[inline]
267 fn as_ref(&self) -> &RowRef {
268 self.as_row_ref()
269 }
270}
271
272impl Deref for Row {
273 type Target = RowRef;
274
275 #[inline]
276 fn deref(&self) -> &Self::Target {
277 self.as_row_ref()
278 }
279}
280
281static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
283
284impl Clone for Row {
285 fn clone(&self) -> Self {
286 Row {
287 data: self.data.clone(),
288 }
289 }
290
291 fn clone_from(&mut self, source: &Self) {
292 self.data.clone_from(&source.data);
293 }
294}
295
296impl std::hash::Hash for Row {
298 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
299 self.as_row_ref().hash(state)
300 }
301}
302
303impl Arbitrary for Row {
304 type Parameters = prop::collection::SizeRange;
305 type Strategy = BoxedStrategy<Row>;
306
307 fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
308 prop::collection::vec(arb_datum(), size)
309 .prop_map(|items| {
310 let mut row = Row::default();
311 let mut packer = row.packer();
312 for item in items.iter() {
313 let datum: Datum<'_> = item.into();
314 packer.push(datum);
315 }
316 row
317 })
318 .boxed()
319 }
320}
321
322impl PartialOrd for Row {
323 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
324 Some(self.cmp(other))
325 }
326}
327
328impl Ord for Row {
329 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
330 self.as_ref().cmp(other.as_ref())
331 }
332}
333
334#[allow(missing_debug_implementations)]
335mod columnation {
336 use columnation::{Columnation, Region};
337 use mz_ore::region::LgAllocRegion;
338
339 use crate::Row;
340
341 pub struct RowStack {
346 region: LgAllocRegion<u8>,
347 }
348
349 impl RowStack {
350 const LIMIT: usize = 2 << 20;
351 }
352
353 impl Default for RowStack {
355 fn default() -> Self {
356 Self {
357 region: LgAllocRegion::with_limit(Self::LIMIT),
359 }
360 }
361 }
362
363 impl Columnation for Row {
364 type InnerRegion = RowStack;
365 }
366
367 impl Region for RowStack {
368 type Item = Row;
369 #[inline]
370 fn clear(&mut self) {
371 self.region.clear();
372 }
373 #[inline(always)]
374 unsafe fn copy(&mut self, item: &Row) -> Row {
375 if item.data.spilled() {
376 let bytes = self.region.copy_slice(&item.data[..]);
377 Row {
378 data: compact_bytes::CompactBytes::from_raw_parts(
379 bytes.as_mut_ptr(),
380 item.data.len(),
381 item.data.capacity(),
382 ),
383 }
384 } else {
385 item.clone()
386 }
387 }
388
389 fn reserve_items<'a, I>(&mut self, items: I)
390 where
391 Self: 'a,
392 I: Iterator<Item = &'a Self::Item> + Clone,
393 {
394 let size = items
395 .filter(|row| row.data.spilled())
396 .map(|row| row.data.len())
397 .sum();
398 let size = std::cmp::min(size, Self::LIMIT);
399 self.region.reserve(size);
400 }
401
402 fn reserve_regions<'a, I>(&mut self, regions: I)
403 where
404 Self: 'a,
405 I: Iterator<Item = &'a Self> + Clone,
406 {
407 let size = regions.map(|r| r.region.len()).sum();
408 let size = std::cmp::min(size, Self::LIMIT);
409 self.region.reserve(size);
410 }
411
412 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
413 self.region.heap_size(callback)
414 }
415 }
416}
417
418mod columnar {
419 use columnar::{
420 AsBytes, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
421 };
422 use mz_ore::cast::CastFrom;
423
424 use crate::{Row, RowRef};
425
426 #[derive(Copy, Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
427 pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
428 pub bounds: BC,
430 pub values: VC,
432 }
433
434 impl Columnar for Row {
435 type Ref<'a> = &'a RowRef;
436 fn copy_from(&mut self, other: Self::Ref<'_>) {
437 self.clear();
438 self.data.extend_from_slice(other.data());
439 }
440 fn into_owned(other: Self::Ref<'_>) -> Self {
441 other.to_owned()
442 }
443 type Container = Rows;
444 }
445
446 impl<'b, BC: Container<u64>> Container<Row> for Rows<BC, &'b [u8]> {
447 type Borrowed<'a>
448 = Rows<BC::Borrowed<'a>, &'a [u8]>
449 where
450 Self: 'a;
451 fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
452 Rows {
453 bounds: self.bounds.borrow(),
454 values: self.values,
455 }
456 }
457 }
458 impl<BC: Container<u64>> Container<Row> for Rows<BC, Vec<u8>> {
459 type Borrowed<'a>
460 = Rows<BC::Borrowed<'a>, &'a [u8]>
461 where
462 BC: 'a;
463 fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
464 Rows {
465 bounds: self.bounds.borrow(),
466 values: self.values.borrow(),
467 }
468 }
469 }
470
471 impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
472 fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
473 self.bounds.as_bytes().chain(self.values.as_bytes())
474 }
475 }
476 impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
477 fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
478 Self {
479 bounds: FromBytes::from_bytes(bytes),
480 values: FromBytes::from_bytes(bytes),
481 }
482 }
483 }
484
485 impl<BC: Len, VC> Len for Rows<BC, VC> {
486 #[inline(always)]
487 fn len(&self) -> usize {
488 self.bounds.len()
489 }
490 }
491
492 impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
493 type Ref = &'a RowRef;
494 #[inline(always)]
495 fn get(&self, index: usize) -> Self::Ref {
496 let lower = if index == 0 {
497 0
498 } else {
499 self.bounds.index_as(index - 1)
500 };
501 let upper = self.bounds.index_as(index);
502 let lower = usize::cast_from(lower);
503 let upper = usize::cast_from(upper);
504 RowRef::from_slice(&self.values[lower..upper])
505 }
506 }
507 impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
508 type Ref = &'a RowRef;
509 #[inline(always)]
510 fn get(&self, index: usize) -> Self::Ref {
511 let lower = if index == 0 {
512 0
513 } else {
514 self.bounds.index_as(index - 1)
515 };
516 let upper = self.bounds.index_as(index);
517 let lower = usize::cast_from(lower);
518 let upper = usize::cast_from(upper);
519 RowRef::from_slice(&self.values[lower..upper])
520 }
521 }
522
523 impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
524 #[inline(always)]
525 fn push(&mut self, item: &Row) {
526 self.values.extend_from_slice(item.data.as_slice());
527 self.bounds.push(u64::cast_from(self.values.len()));
528 }
529 }
530 impl<BC: Push<u64>> Push<&RowRef> for Rows<BC> {
531 fn push(&mut self, item: &RowRef) {
532 self.values.extend_from_slice(item.data());
533 self.bounds.push(u64::cast_from(self.values.len()));
534 }
535 }
536 impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
537 fn clear(&mut self) {
538 self.bounds.clear();
539 self.values.clear();
540 }
541 }
542 impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
543 fn heap_size(&self) -> (usize, usize) {
544 let (l0, c0) = self.bounds.heap_size();
545 let (l1, c1) = self.values.heap_size();
546 (l0 + l1, c0 + c1)
547 }
548 }
549}
550
551#[derive(PartialEq, Eq, Hash)]
555#[repr(transparent)]
556pub struct RowRef([u8]);
557
558impl RowRef {
559 pub fn from_slice(row: &[u8]) -> &RowRef {
564 #[allow(clippy::as_conversions)]
565 let ptr = row as *const [u8] as *const RowRef;
566 unsafe { &*ptr }
568 }
569
570 pub fn unpack(&self) -> Vec<Datum> {
572 let len = self.iter().count();
574 let mut vec = Vec::with_capacity(len);
575 vec.extend(self.iter());
576 vec
577 }
578
579 pub fn unpack_first(&self) -> Datum {
583 self.iter().next().unwrap()
584 }
585
586 pub fn iter(&self) -> DatumListIter {
588 DatumListIter { data: &self.0 }
589 }
590
591 pub fn byte_len(&self) -> usize {
593 self.0.len()
594 }
595
596 pub fn data(&self) -> &[u8] {
598 &self.0
599 }
600
601 pub fn is_empty(&self) -> bool {
603 self.0.is_empty()
604 }
605}
606
607impl ToOwned for RowRef {
608 type Owned = Row;
609
610 fn to_owned(&self) -> Self::Owned {
611 unsafe { Row::from_bytes_unchecked(&self.0) }
613 }
614}
615
616impl<'a> IntoIterator for &'a RowRef {
617 type Item = Datum<'a>;
618 type IntoIter = DatumListIter<'a>;
619
620 fn into_iter(self) -> DatumListIter<'a> {
621 DatumListIter { data: &self.0 }
622 }
623}
624
625impl PartialOrd for RowRef {
629 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
630 Some(self.cmp(other))
631 }
632}
633
634impl Ord for RowRef {
635 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
636 match self.0.len().cmp(&other.0.len()) {
637 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
638 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
639 std::cmp::Ordering::Equal => self.0.cmp(&other.0),
640 }
641 }
642}
643
644impl fmt::Debug for RowRef {
645 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
647 f.write_str("RowRef{")?;
648 f.debug_list().entries(self.into_iter()).finish()?;
649 f.write_str("}")
650 }
651}
652
653#[derive(Debug)]
661pub struct RowPacker<'a> {
662 row: &'a mut Row,
663}
664
665#[derive(Debug, Clone)]
666pub struct DatumListIter<'a> {
667 data: &'a [u8],
668}
669
670#[derive(Debug, Clone)]
671pub struct DatumDictIter<'a> {
672 data: &'a [u8],
673 prev_key: Option<&'a str>,
674}
675
676#[derive(Debug)]
678pub struct RowArena {
679 inner: RefCell<Vec<Vec<u8>>>,
686}
687
688#[derive(Clone, Copy, Eq, PartialEq, Hash)]
692pub struct DatumList<'a> {
693 data: &'a [u8],
695}
696
697impl<'a> Debug for DatumList<'a> {
698 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
699 f.debug_list().entries(self.iter()).finish()
700 }
701}
702
703impl Ord for DatumList<'_> {
704 fn cmp(&self, other: &DatumList) -> Ordering {
705 self.iter().cmp(other.iter())
706 }
707}
708
709impl PartialOrd for DatumList<'_> {
710 fn partial_cmp(&self, other: &DatumList) -> Option<Ordering> {
711 Some(self.cmp(other))
712 }
713}
714
715#[derive(Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
717pub struct DatumMap<'a> {
718 data: &'a [u8],
720}
721
722#[derive(Clone, Copy, Eq, PartialEq, Hash)]
725pub struct DatumNested<'a> {
726 val: &'a [u8],
727}
728
729impl<'a> std::fmt::Display for DatumNested<'a> {
730 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
731 std::fmt::Display::fmt(&self.datum(), f)
732 }
733}
734
735impl<'a> std::fmt::Debug for DatumNested<'a> {
736 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
737 f.debug_struct("DatumNested")
738 .field("val", &self.datum())
739 .finish()
740 }
741}
742
743impl<'a> DatumNested<'a> {
744 pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
748 let prev = *data;
749 let _ = unsafe { read_datum(data) };
750 DatumNested {
751 val: &prev[..(prev.len() - data.len())],
752 }
753 }
754
755 pub fn datum(&self) -> Datum<'a> {
757 let mut temp = self.val;
758 unsafe { read_datum(&mut temp) }
759 }
760}
761
762impl<'a> Ord for DatumNested<'a> {
763 fn cmp(&self, other: &Self) -> Ordering {
764 self.datum().cmp(&other.datum())
765 }
766}
767
768impl<'a> PartialOrd for DatumNested<'a> {
769 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
770 Some(self.cmp(other))
771 }
772}
773
774#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
778#[repr(u8)]
779enum Tag {
780 Null,
781 False,
782 True,
783 Int16,
784 Int32,
785 Int64,
786 UInt8,
787 UInt32,
788 Float32,
789 Float64,
790 Date,
791 Time,
792 Timestamp,
793 TimestampTz,
794 Interval,
795 BytesTiny,
796 BytesShort,
797 BytesLong,
798 BytesHuge,
799 StringTiny,
800 StringShort,
801 StringLong,
802 StringHuge,
803 Uuid,
804 Array,
805 ListTiny,
806 ListShort,
807 ListLong,
808 ListHuge,
809 Dict,
810 JsonNull,
811 Dummy,
812 Numeric,
813 UInt16,
814 UInt64,
815 MzTimestamp,
816 Range,
817 MzAclItem,
818 AclItem,
819 CheapTimestamp,
823 CheapTimestampTz,
827 NonNegativeInt16_0, NonNegativeInt16_8,
840 NonNegativeInt16_16,
841
842 NonNegativeInt32_0,
843 NonNegativeInt32_8,
844 NonNegativeInt32_16,
845 NonNegativeInt32_24,
846 NonNegativeInt32_32,
847
848 NonNegativeInt64_0,
849 NonNegativeInt64_8,
850 NonNegativeInt64_16,
851 NonNegativeInt64_24,
852 NonNegativeInt64_32,
853 NonNegativeInt64_40,
854 NonNegativeInt64_48,
855 NonNegativeInt64_56,
856 NonNegativeInt64_64,
857
858 NegativeInt16_0, NegativeInt16_8,
860 NegativeInt16_16,
861
862 NegativeInt32_0,
863 NegativeInt32_8,
864 NegativeInt32_16,
865 NegativeInt32_24,
866 NegativeInt32_32,
867
868 NegativeInt64_0,
869 NegativeInt64_8,
870 NegativeInt64_16,
871 NegativeInt64_24,
872 NegativeInt64_32,
873 NegativeInt64_40,
874 NegativeInt64_48,
875 NegativeInt64_56,
876 NegativeInt64_64,
877
878 UInt8_0, UInt8_8,
882
883 UInt16_0,
884 UInt16_8,
885 UInt16_16,
886
887 UInt32_0,
888 UInt32_8,
889 UInt32_16,
890 UInt32_24,
891 UInt32_32,
892
893 UInt64_0,
894 UInt64_8,
895 UInt64_16,
896 UInt64_24,
897 UInt64_32,
898 UInt64_40,
899 UInt64_48,
900 UInt64_56,
901 UInt64_64,
902}
903
904impl Tag {
905 fn actual_int_length(self) -> Option<usize> {
906 use Tag::*;
907 let val = match self {
908 NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
909 | UInt32_0 | UInt64_0 => 0,
910 NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
911 | UInt32_8 | UInt64_8 => 1,
912 NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
913 | UInt32_16 | UInt64_16 => 2,
914 NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
915 NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
916 NonNegativeInt64_40 | UInt64_40 => 5,
917 NonNegativeInt64_48 | UInt64_48 => 6,
918 NonNegativeInt64_56 | UInt64_56 => 7,
919 NonNegativeInt64_64 | UInt64_64 => 8,
920 NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
921 NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
922 NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
923 NegativeInt32_24 | NegativeInt64_24 => 3,
924 NegativeInt32_32 | NegativeInt64_32 => 4,
925 NegativeInt64_40 => 5,
926 NegativeInt64_48 => 6,
927 NegativeInt64_56 => 7,
928 NegativeInt64_64 => 8,
929
930 _ => return None,
931 };
932 Some(val)
933 }
934}
935
936fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
943 let len = u64::from_le_bytes(read_byte_array(data));
944 let len = usize::cast_from(len);
945 let (bytes, next) = data.split_at(len);
946 *data = next;
947 bytes
948}
949
950unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
959 let len = match tag {
960 Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
961 Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
962 usize::from(u16::from_le_bytes(read_byte_array(data)))
963 }
964 Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
965 usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
966 }
967 Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
968 usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
969 }
970 _ => unreachable!(),
971 };
972 let (bytes, next) = data.split_at(len);
973 *data = next;
974 match tag {
975 Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
976 Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
977 Datum::String(str::from_utf8_unchecked(bytes))
978 }
979 Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
980 Datum::List(DatumList { data: bytes })
981 }
982 _ => unreachable!(),
983 }
984}
985
986fn read_byte(data: &mut &[u8]) -> u8 {
987 let byte = data[0];
988 *data = &data[1..];
989 byte
990}
991
992fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1000 data: &mut &[u8],
1001 length: usize,
1002) -> [u8; N] {
1003 let mut raw = [FILL; N];
1004 let (prev, next) = data.split_at(length);
1005 (raw[..prev.len()]).copy_from_slice(prev);
1006 *data = next;
1007 raw
1008}
1009fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1017 read_byte_array_sign_extending::<N, 255>(data, length)
1018}
1019
1020fn read_byte_array_extending_nonnegative<const N: usize>(
1028 data: &mut &[u8],
1029 length: usize,
1030) -> [u8; N] {
1031 read_byte_array_sign_extending::<N, 0>(data, length)
1032}
1033
1034pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1035 let (prev, next) = data.split_first_chunk().unwrap();
1036 *data = next;
1037 *prev
1038}
1039
1040pub(super) fn read_date(data: &mut &[u8]) -> Date {
1041 let days = i32::from_le_bytes(read_byte_array(data));
1042 Date::from_pg_epoch(days).expect("unexpected date")
1043}
1044
1045pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1046 let year = i32::from_le_bytes(read_byte_array(data));
1047 let ordinal = u32::from_le_bytes(read_byte_array(data));
1048 NaiveDate::from_yo_opt(year, ordinal).unwrap()
1049}
1050
1051pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1052 let secs = u32::from_le_bytes(read_byte_array(data));
1053 let nanos = u32::from_le_bytes(read_byte_array(data));
1054 NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1055}
1056
1057pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1066 let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1067 match tag {
1068 Tag::Null => Datum::Null,
1069 Tag::False => Datum::False,
1070 Tag::True => Datum::True,
1071 Tag::UInt8_0 | Tag::UInt8_8 => {
1072 let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1073 data,
1074 tag.actual_int_length()
1075 .expect("returns a value for variable-length-encoded integer tags"),
1076 ));
1077 Datum::UInt8(i)
1078 }
1079 Tag::Int16 => {
1080 let i = i16::from_le_bytes(read_byte_array(data));
1081 Datum::Int16(i)
1082 }
1083 Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1084 let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1088 data,
1089 tag.actual_int_length()
1090 .expect("returns a value for variable-length-encoded integer tags"),
1091 ));
1092 Datum::Int16(i)
1093 }
1094 Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1095 let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1096 data,
1097 tag.actual_int_length()
1098 .expect("returns a value for variable-length-encoded integer tags"),
1099 ));
1100 Datum::UInt16(i)
1101 }
1102 Tag::Int32 => {
1103 let i = i32::from_le_bytes(read_byte_array(data));
1104 Datum::Int32(i)
1105 }
1106 Tag::NonNegativeInt32_0
1107 | Tag::NonNegativeInt32_32
1108 | Tag::NonNegativeInt32_8
1109 | Tag::NonNegativeInt32_16
1110 | Tag::NonNegativeInt32_24 => {
1111 let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1115 data,
1116 tag.actual_int_length()
1117 .expect("returns a value for variable-length-encoded integer tags"),
1118 ));
1119 Datum::Int32(i)
1120 }
1121 Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1122 let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1123 data,
1124 tag.actual_int_length()
1125 .expect("returns a value for variable-length-encoded integer tags"),
1126 ));
1127 Datum::UInt32(i)
1128 }
1129 Tag::Int64 => {
1130 let i = i64::from_le_bytes(read_byte_array(data));
1131 Datum::Int64(i)
1132 }
1133 Tag::NonNegativeInt64_0
1134 | Tag::NonNegativeInt64_64
1135 | Tag::NonNegativeInt64_8
1136 | Tag::NonNegativeInt64_16
1137 | Tag::NonNegativeInt64_24
1138 | Tag::NonNegativeInt64_32
1139 | Tag::NonNegativeInt64_40
1140 | Tag::NonNegativeInt64_48
1141 | Tag::NonNegativeInt64_56 => {
1142 let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1147 data,
1148 tag.actual_int_length()
1149 .expect("returns a value for variable-length-encoded integer tags"),
1150 ));
1151 Datum::Int64(i)
1152 }
1153 Tag::UInt64_0
1154 | Tag::UInt64_8
1155 | Tag::UInt64_16
1156 | Tag::UInt64_24
1157 | Tag::UInt64_32
1158 | Tag::UInt64_40
1159 | Tag::UInt64_48
1160 | Tag::UInt64_56
1161 | Tag::UInt64_64 => {
1162 let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1163 data,
1164 tag.actual_int_length()
1165 .expect("returns a value for variable-length-encoded integer tags"),
1166 ));
1167 Datum::UInt64(i)
1168 }
1169 Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1170 let i = i16::from_le_bytes(read_byte_array_extending_negative(
1174 data,
1175 tag.actual_int_length()
1176 .expect("returns a value for variable-length-encoded integer tags"),
1177 ));
1178 Datum::Int16(i)
1179 }
1180 Tag::NegativeInt32_0
1181 | Tag::NegativeInt32_32
1182 | Tag::NegativeInt32_8
1183 | Tag::NegativeInt32_16
1184 | Tag::NegativeInt32_24 => {
1185 let i = i32::from_le_bytes(read_byte_array_extending_negative(
1189 data,
1190 tag.actual_int_length()
1191 .expect("returns a value for variable-length-encoded integer tags"),
1192 ));
1193 Datum::Int32(i)
1194 }
1195 Tag::NegativeInt64_0
1196 | Tag::NegativeInt64_64
1197 | Tag::NegativeInt64_8
1198 | Tag::NegativeInt64_16
1199 | Tag::NegativeInt64_24
1200 | Tag::NegativeInt64_32
1201 | Tag::NegativeInt64_40
1202 | Tag::NegativeInt64_48
1203 | Tag::NegativeInt64_56 => {
1204 let i = i64::from_le_bytes(read_byte_array_extending_negative(
1208 data,
1209 tag.actual_int_length()
1210 .expect("returns a value for variable-length-encoded integer tags"),
1211 ));
1212 Datum::Int64(i)
1213 }
1214
1215 Tag::UInt8 => {
1216 let i = u8::from_le_bytes(read_byte_array(data));
1217 Datum::UInt8(i)
1218 }
1219 Tag::UInt16 => {
1220 let i = u16::from_le_bytes(read_byte_array(data));
1221 Datum::UInt16(i)
1222 }
1223 Tag::UInt32 => {
1224 let i = u32::from_le_bytes(read_byte_array(data));
1225 Datum::UInt32(i)
1226 }
1227 Tag::UInt64 => {
1228 let i = u64::from_le_bytes(read_byte_array(data));
1229 Datum::UInt64(i)
1230 }
1231 Tag::Float32 => {
1232 let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1233 Datum::Float32(OrderedFloat::from(f))
1234 }
1235 Tag::Float64 => {
1236 let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1237 Datum::Float64(OrderedFloat::from(f))
1238 }
1239 Tag::Date => Datum::Date(read_date(data)),
1240 Tag::Time => Datum::Time(read_time(data)),
1241 Tag::CheapTimestamp => {
1242 let ts = i64::from_le_bytes(read_byte_array(data));
1243 let secs = ts.div_euclid(1_000_000_000);
1244 let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1245 let ndt = DateTime::from_timestamp(secs, nsecs)
1246 .expect("We only write round-trippable timestamps")
1247 .naive_utc();
1248 Datum::Timestamp(
1249 CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1250 )
1251 }
1252 Tag::CheapTimestampTz => {
1253 let ts = i64::from_le_bytes(read_byte_array(data));
1254 let secs = ts.div_euclid(1_000_000_000);
1255 let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1256 let dt = DateTime::from_timestamp(secs, nsecs)
1257 .expect("We only write round-trippable timestamps");
1258 Datum::TimestampTz(
1259 CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1260 )
1261 }
1262 Tag::Timestamp => {
1263 let date = read_naive_date(data);
1264 let time = read_time(data);
1265 Datum::Timestamp(
1266 CheckedTimestamp::from_timestamplike(date.and_time(time))
1267 .expect("unexpected timestamp"),
1268 )
1269 }
1270 Tag::TimestampTz => {
1271 let date = read_naive_date(data);
1272 let time = read_time(data);
1273 Datum::TimestampTz(
1274 CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1275 date.and_time(time),
1276 Utc,
1277 ))
1278 .expect("unexpected timestamptz"),
1279 )
1280 }
1281 Tag::Interval => {
1282 let months = i32::from_le_bytes(read_byte_array(data));
1283 let days = i32::from_le_bytes(read_byte_array(data));
1284 let micros = i64::from_le_bytes(read_byte_array(data));
1285 Datum::Interval(Interval {
1286 months,
1287 days,
1288 micros,
1289 })
1290 }
1291 Tag::BytesTiny
1292 | Tag::BytesShort
1293 | Tag::BytesLong
1294 | Tag::BytesHuge
1295 | Tag::StringTiny
1296 | Tag::StringShort
1297 | Tag::StringLong
1298 | Tag::StringHuge
1299 | Tag::ListTiny
1300 | Tag::ListShort
1301 | Tag::ListLong
1302 | Tag::ListHuge => read_lengthed_datum(data, tag),
1303 Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1304 Tag::Array => {
1305 let ndims = read_byte(data);
1308 let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1309 let (dims, next) = data.split_at(dims_size);
1310 *data = next;
1311 let bytes = read_untagged_bytes(data);
1312 Datum::Array(Array {
1313 dims: ArrayDimensions { data: dims },
1314 elements: DatumList { data: bytes },
1315 })
1316 }
1317 Tag::Dict => {
1318 let bytes = read_untagged_bytes(data);
1319 Datum::Map(DatumMap { data: bytes })
1320 }
1321 Tag::JsonNull => Datum::JsonNull,
1322 Tag::Dummy => Datum::Dummy,
1323 Tag::Numeric => {
1324 let digits = read_byte(data).into();
1325 let exponent = i8::reinterpret_cast(read_byte(data));
1326 let bits = read_byte(data);
1327
1328 let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1329 let lsu_u8_len = lsu_u16_len * 2;
1330 let (lsu_u8, next) = data.split_at(lsu_u8_len);
1331 *data = next;
1332
1333 let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1337 for (i, c) in lsu_u8.chunks(2).enumerate() {
1338 lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1339 }
1340
1341 let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1342 Datum::from(d)
1343 }
1344 Tag::MzTimestamp => {
1345 let t = Timestamp::decode(read_byte_array(data));
1346 Datum::MzTimestamp(t)
1347 }
1348 Tag::Range => {
1349 let flag_byte = read_byte(data);
1351 let flags = range::InternalFlags::from_bits(flag_byte)
1352 .expect("range flags must be encoded validly");
1353
1354 if flags.contains(range::InternalFlags::EMPTY) {
1355 assert!(
1356 flags == range::InternalFlags::EMPTY,
1357 "empty ranges contain only RANGE_EMPTY flag"
1358 );
1359
1360 return Datum::Range(Range { inner: None });
1361 }
1362
1363 let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1364 None
1365 } else {
1366 Some(DatumNested::extract(data))
1367 };
1368
1369 let lower = RangeBound {
1370 inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1371 bound: lower_bound,
1372 };
1373
1374 let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1375 None
1376 } else {
1377 Some(DatumNested::extract(data))
1378 };
1379
1380 let upper = RangeBound {
1381 inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1382 bound: upper_bound,
1383 };
1384
1385 Datum::Range(Range {
1386 inner: Some(RangeInner { lower, upper }),
1387 })
1388 }
1389 Tag::MzAclItem => {
1390 const N: usize = MzAclItem::binary_size();
1391 let mz_acl_item =
1392 MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1393 Datum::MzAclItem(mz_acl_item)
1394 }
1395 Tag::AclItem => {
1396 const N: usize = AclItem::binary_size();
1397 let acl_item =
1398 AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1399 Datum::AclItem(acl_item)
1400 }
1401 }
1402}
1403
1404fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1408where
1409 D: Vector<u8>,
1410{
1411 let len = u64::cast_from(bytes.len());
1412 data.extend_from_slice(&len.to_le_bytes());
1413 data.extend_from_slice(bytes);
1414}
1415
1416fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1417where
1418 D: Vector<u8>,
1419{
1420 match tag {
1421 Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1422 let len = bytes.len().to_le_bytes();
1423 data.push(len[0]);
1424 }
1425 Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1426 let len = bytes.len().to_le_bytes();
1427 data.extend_from_slice(&len[0..2]);
1428 }
1429 Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1430 let len = bytes.len().to_le_bytes();
1431 data.extend_from_slice(&len[0..4]);
1432 }
1433 Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1434 let len = bytes.len().to_le_bytes();
1435 data.extend_from_slice(&len);
1436 }
1437 _ => unreachable!(),
1438 }
1439 data.extend_from_slice(bytes);
1440}
1441
1442pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1443 i32::to_le_bytes(date.pg_epoch_days())
1444}
1445
1446fn push_date<D>(data: &mut D, date: Date)
1447where
1448 D: Vector<u8>,
1449{
1450 data.extend_from_slice(&date_to_array(date));
1451}
1452
1453pub(super) fn naive_date_to_arrays(
1454 date: NaiveDate,
1455) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1456 (
1457 i32::to_le_bytes(date.year()),
1458 u32::to_le_bytes(date.ordinal()),
1459 )
1460}
1461
1462fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1463where
1464 D: Vector<u8>,
1465{
1466 let (ds1, ds2) = naive_date_to_arrays(date);
1467 data.extend_from_slice(&ds1);
1468 data.extend_from_slice(&ds2);
1469}
1470
1471pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1472 (
1473 u32::to_le_bytes(time.num_seconds_from_midnight()),
1474 u32::to_le_bytes(time.nanosecond()),
1475 )
1476}
1477
1478fn push_time<D>(data: &mut D, time: NaiveTime)
1479where
1480 D: Vector<u8>,
1481{
1482 let (ts1, ts2) = time_to_arrays(time);
1483 data.extend_from_slice(&ts1);
1484 data.extend_from_slice(&ts2);
1485}
1486
1487fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1497 let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1498 if subsec_nanos >= 1_000_000_000 {
1499 return None;
1500 }
1501 let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1502 as_ns.checked_add(i64::from(subsec_nanos))
1503}
1504
1505#[inline(always)]
1511#[allow(clippy::as_conversions)]
1512fn min_bytes_signed<T>(i: T) -> u8
1513where
1514 T: Into<i64>,
1515{
1516 let i: i64 = i.into();
1517
1518 let n_sign_bits = if i.is_negative() {
1522 i.leading_ones() as u8
1523 } else {
1524 i.leading_zeros() as u8
1525 };
1526
1527 (64 - n_sign_bits + 7) / 8
1528}
1529
1530#[inline(always)]
1538#[allow(clippy::as_conversions)]
1539fn min_bytes_unsigned<T>(i: T) -> u8
1540where
1541 T: Into<u64>,
1542{
1543 let i: u64 = i.into();
1544
1545 let n_sign_bits = i.leading_zeros() as u8;
1546
1547 (64 - n_sign_bits + 7) / 8
1548}
1549
1550const TINY: usize = 1 << 8;
1551const SHORT: usize = 1 << 16;
1552const LONG: usize = 1 << 32;
1553
1554fn push_datum<D>(data: &mut D, datum: Datum)
1555where
1556 D: Vector<u8>,
1557{
1558 match datum {
1559 Datum::Null => data.push(Tag::Null.into()),
1560 Datum::False => data.push(Tag::False.into()),
1561 Datum::True => data.push(Tag::True.into()),
1562 Datum::Int16(i) => {
1563 let mbs = min_bytes_signed(i);
1564 let tag = u8::from(if i.is_negative() {
1565 Tag::NegativeInt16_0
1566 } else {
1567 Tag::NonNegativeInt16_0
1568 }) + mbs;
1569
1570 data.push(tag);
1571 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1572 }
1573 Datum::Int32(i) => {
1574 let mbs = min_bytes_signed(i);
1575 let tag = u8::from(if i.is_negative() {
1576 Tag::NegativeInt32_0
1577 } else {
1578 Tag::NonNegativeInt32_0
1579 }) + mbs;
1580
1581 data.push(tag);
1582 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1583 }
1584 Datum::Int64(i) => {
1585 let mbs = min_bytes_signed(i);
1586 let tag = u8::from(if i.is_negative() {
1587 Tag::NegativeInt64_0
1588 } else {
1589 Tag::NonNegativeInt64_0
1590 }) + mbs;
1591
1592 data.push(tag);
1593 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1594 }
1595 Datum::UInt8(i) => {
1596 let mbu = min_bytes_unsigned(i);
1597 let tag = u8::from(Tag::UInt8_0) + mbu;
1598 data.push(tag);
1599 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1600 }
1601 Datum::UInt16(i) => {
1602 let mbu = min_bytes_unsigned(i);
1603 let tag = u8::from(Tag::UInt16_0) + mbu;
1604 data.push(tag);
1605 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1606 }
1607 Datum::UInt32(i) => {
1608 let mbu = min_bytes_unsigned(i);
1609 let tag = u8::from(Tag::UInt32_0) + mbu;
1610 data.push(tag);
1611 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1612 }
1613 Datum::UInt64(i) => {
1614 let mbu = min_bytes_unsigned(i);
1615 let tag = u8::from(Tag::UInt64_0) + mbu;
1616 data.push(tag);
1617 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1618 }
1619 Datum::Float32(f) => {
1620 data.push(Tag::Float32.into());
1621 data.extend_from_slice(&f.to_bits().to_le_bytes());
1622 }
1623 Datum::Float64(f) => {
1624 data.push(Tag::Float64.into());
1625 data.extend_from_slice(&f.to_bits().to_le_bytes());
1626 }
1627 Datum::Date(d) => {
1628 data.push(Tag::Date.into());
1629 push_date(data, d);
1630 }
1631 Datum::Time(t) => {
1632 data.push(Tag::Time.into());
1633 push_time(data, t);
1634 }
1635 Datum::Timestamp(t) => {
1636 let datetime = t.to_naive();
1637 if let Some(nanos) = checked_timestamp_nanos(datetime) {
1638 data.push(Tag::CheapTimestamp.into());
1639 data.extend_from_slice(&nanos.to_le_bytes());
1640 } else {
1641 data.push(Tag::Timestamp.into());
1642 push_naive_date(data, datetime.date());
1643 push_time(data, datetime.time());
1644 }
1645 }
1646 Datum::TimestampTz(t) => {
1647 let datetime = t.to_naive();
1648 if let Some(nanos) = checked_timestamp_nanos(datetime) {
1649 data.push(Tag::CheapTimestampTz.into());
1650 data.extend_from_slice(&nanos.to_le_bytes());
1651 } else {
1652 data.push(Tag::TimestampTz.into());
1653 push_naive_date(data, datetime.date());
1654 push_time(data, datetime.time());
1655 }
1656 }
1657 Datum::Interval(i) => {
1658 data.push(Tag::Interval.into());
1659 data.extend_from_slice(&i.months.to_le_bytes());
1660 data.extend_from_slice(&i.days.to_le_bytes());
1661 data.extend_from_slice(&i.micros.to_le_bytes());
1662 }
1663 Datum::Bytes(bytes) => {
1664 let tag = match bytes.len() {
1665 0..TINY => Tag::BytesTiny,
1666 TINY..SHORT => Tag::BytesShort,
1667 SHORT..LONG => Tag::BytesLong,
1668 _ => Tag::BytesHuge,
1669 };
1670 data.push(tag.into());
1671 push_lengthed_bytes(data, bytes, tag);
1672 }
1673 Datum::String(string) => {
1674 let tag = match string.len() {
1675 0..TINY => Tag::StringTiny,
1676 TINY..SHORT => Tag::StringShort,
1677 SHORT..LONG => Tag::StringLong,
1678 _ => Tag::StringHuge,
1679 };
1680 data.push(tag.into());
1681 push_lengthed_bytes(data, string.as_bytes(), tag);
1682 }
1683 Datum::List(list) => {
1684 let tag = match list.data.len() {
1685 0..TINY => Tag::ListTiny,
1686 TINY..SHORT => Tag::ListShort,
1687 SHORT..LONG => Tag::ListLong,
1688 _ => Tag::ListHuge,
1689 };
1690 data.push(tag.into());
1691 push_lengthed_bytes(data, list.data, tag);
1692 }
1693 Datum::Uuid(u) => {
1694 data.push(Tag::Uuid.into());
1695 data.extend_from_slice(u.as_bytes());
1696 }
1697 Datum::Array(array) => {
1698 data.push(Tag::Array.into());
1701 data.push(array.dims.ndims());
1702 data.extend_from_slice(array.dims.data);
1703 push_untagged_bytes(data, array.elements.data);
1704 }
1705 Datum::Map(dict) => {
1706 data.push(Tag::Dict.into());
1707 push_untagged_bytes(data, dict.data);
1708 }
1709 Datum::JsonNull => data.push(Tag::JsonNull.into()),
1710 Datum::MzTimestamp(t) => {
1711 data.push(Tag::MzTimestamp.into());
1712 data.extend_from_slice(&t.encode());
1713 }
1714 Datum::Dummy => data.push(Tag::Dummy.into()),
1715 Datum::Numeric(mut n) => {
1716 numeric::cx_datum().reduce(&mut n.0);
1721 let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1722 data.push(Tag::Numeric.into());
1723 data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1724 data.push(
1725 i8::try_from(exponent)
1726 .expect("exponent to fit within i8; should not exceed +/- 39")
1727 .to_le_bytes()[0],
1728 );
1729 data.push(bits);
1730
1731 let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1732
1733 if cfg!(target_endian = "little") {
1735 let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
1738 soft_assert_no_log!(
1741 lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
1742 "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
1743 Numeric::digits_to_lsu_elements_len(digits) * 2,
1744 lsu_bytes.len()
1745 );
1746 soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
1748 data.extend_from_slice(lsu_bytes);
1749 } else {
1750 for u in lsu {
1751 data.extend_from_slice(&u.to_le_bytes());
1752 }
1753 }
1754 }
1755 Datum::Range(range) => {
1756 data.push(Tag::Range.into());
1758 data.push(range.internal_flag_bits());
1759
1760 if let Some(RangeInner { lower, upper }) = range.inner {
1761 for bound in [lower.bound, upper.bound] {
1762 if let Some(bound) = bound {
1763 match bound.datum() {
1764 Datum::Null => panic!("cannot push Datum::Null into range"),
1765 d => push_datum::<D>(data, d),
1766 }
1767 }
1768 }
1769 }
1770 }
1771 Datum::MzAclItem(mz_acl_item) => {
1772 data.push(Tag::MzAclItem.into());
1773 data.extend_from_slice(&mz_acl_item.encode_binary());
1774 }
1775 Datum::AclItem(acl_item) => {
1776 data.push(Tag::AclItem.into());
1777 data.extend_from_slice(&acl_item.encode_binary());
1778 }
1779 }
1780}
1781
1782pub fn row_size<'a, I>(a: I) -> usize
1784where
1785 I: IntoIterator<Item = Datum<'a>>,
1786{
1787 let sz = datums_size::<_, _>(a);
1792 let size_of_row = std::mem::size_of::<Row>();
1793 if sz > Row::SIZE {
1797 sz + size_of_row
1798 } else {
1799 size_of_row
1800 }
1801}
1802
1803pub fn datum_size(datum: &Datum) -> usize {
1806 match datum {
1807 Datum::Null => 1,
1808 Datum::False => 1,
1809 Datum::True => 1,
1810 Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
1811 Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
1812 Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
1813 Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1814 Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1815 Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1816 Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1817 Datum::Float32(_) => 1 + size_of::<f32>(),
1818 Datum::Float64(_) => 1 + size_of::<f64>(),
1819 Datum::Date(_) => 1 + size_of::<i32>(),
1820 Datum::Time(_) => 1 + 8,
1821 Datum::Timestamp(t) => {
1822 1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
1823 8
1824 } else {
1825 16
1826 }
1827 }
1828 Datum::TimestampTz(t) => {
1829 1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
1830 8
1831 } else {
1832 16
1833 }
1834 }
1835 Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
1836 Datum::Bytes(bytes) => {
1837 let bytes_for_length = match bytes.len() {
1839 0..TINY => 1,
1840 TINY..SHORT => 2,
1841 SHORT..LONG => 4,
1842 _ => 8,
1843 };
1844 1 + bytes_for_length + bytes.len()
1845 }
1846 Datum::String(string) => {
1847 let bytes_for_length = match string.len() {
1849 0..TINY => 1,
1850 TINY..SHORT => 2,
1851 SHORT..LONG => 4,
1852 _ => 8,
1853 };
1854 1 + bytes_for_length + string.len()
1855 }
1856 Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
1857 Datum::Array(array) => {
1858 1 + size_of::<u8>()
1859 + array.dims.data.len()
1860 + size_of::<u64>()
1861 + array.elements.data.len()
1862 }
1863 Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
1864 Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
1865 Datum::JsonNull => 1,
1866 Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
1867 Datum::Dummy => 1,
1868 Datum::Numeric(d) => {
1869 let mut d = d.0.clone();
1870 numeric::cx_datum().reduce(&mut d);
1873 4 + (d.coefficient_units().len() * 2)
1875 }
1876 Datum::Range(Range { inner }) => {
1877 2 + match inner {
1879 None => 0,
1880 Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
1881 .iter()
1882 .map(|bound| match bound {
1883 None => 0,
1884 Some(bound) => bound.val.len(),
1885 })
1886 .sum(),
1887 }
1888 }
1889 Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
1890 Datum::AclItem(_) => 1 + AclItem::binary_size(),
1891 }
1892}
1893
1894pub fn datums_size<'a, I, D>(iter: I) -> usize
1899where
1900 I: IntoIterator<Item = D>,
1901 D: Borrow<Datum<'a>>,
1902{
1903 iter.into_iter().map(|d| datum_size(d.borrow())).sum()
1904}
1905
1906pub fn datum_list_size<'a, I, D>(iter: I) -> usize
1911where
1912 I: IntoIterator<Item = D>,
1913 D: Borrow<Datum<'a>>,
1914{
1915 1 + size_of::<u64>() + datums_size(iter)
1916}
1917
1918impl RowPacker<'_> {
1919 pub fn for_existing_row(row: &mut Row) -> RowPacker {
1926 RowPacker { row }
1927 }
1928
1929 #[inline]
1931 pub fn push<'a, D>(&mut self, datum: D)
1932 where
1933 D: Borrow<Datum<'a>>,
1934 {
1935 push_datum(&mut self.row.data, *datum.borrow());
1936 }
1937
1938 #[inline]
1940 pub fn extend<'a, I, D>(&mut self, iter: I)
1941 where
1942 I: IntoIterator<Item = D>,
1943 D: Borrow<Datum<'a>>,
1944 {
1945 for datum in iter {
1946 push_datum(&mut self.row.data, *datum.borrow())
1947 }
1948 }
1949
1950 #[inline]
1956 pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
1957 where
1958 I: IntoIterator<Item = Result<D, E>>,
1959 D: Borrow<Datum<'a>>,
1960 {
1961 for datum in iter {
1962 push_datum(&mut self.row.data, *datum?.borrow());
1963 }
1964 Ok(())
1965 }
1966
1967 pub fn extend_by_row(&mut self, row: &Row) {
1969 self.row.data.extend_from_slice(row.data.as_slice());
1970 }
1971
1972 #[inline]
1980 pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
1981 self.row.data.extend_from_slice(data)
1982 }
1983
1984 #[inline]
2006 pub fn push_list_with<F, R>(&mut self, f: F) -> R
2007 where
2008 F: FnOnce(&mut RowPacker) -> R,
2009 {
2010 let start = self.row.data.len();
2013 self.row.data.push(Tag::ListTiny.into());
2014 self.row.data.push(0);
2016
2017 let out = f(self);
2018
2019 let len = self.row.data.len() - start - 1 - 1;
2021 if len < TINY {
2023 self.row.data[start + 1] = len.to_le_bytes()[0];
2025 } else {
2026 long_list(&mut self.row.data, start, len);
2029 }
2030
2031 #[cold]
2038 fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2039 let long_list_inner = |data: &mut CompactBytes, len_len| {
2042 const ZEROS: [u8; 8] = [0; 8];
2045 data.extend_from_slice(&ZEROS[0..len_len - 1]);
2046 data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2055 data[start + 1..start + 1 + len_len]
2057 .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2058 };
2059 match len {
2060 0..TINY => {
2061 unreachable!()
2062 }
2063 TINY..SHORT => {
2064 data[start] = Tag::ListShort.into();
2065 long_list_inner(data, 2);
2066 }
2067 SHORT..LONG => {
2068 data[start] = Tag::ListLong.into();
2069 long_list_inner(data, 4);
2070 }
2071 _ => {
2072 data[start] = Tag::ListHuge.into();
2073 long_list_inner(data, 8);
2074 }
2075 };
2076 }
2077
2078 out
2079 }
2080
2081 pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2119 where
2120 F: FnOnce(&mut RowPacker) -> R,
2121 {
2122 self.row.data.push(Tag::Dict.into());
2123 let start = self.row.data.len();
2124 self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2126
2127 let res = f(self);
2128
2129 let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2130 self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2132
2133 res
2134 }
2135
2136 pub fn try_push_array<'a, I, D>(
2143 &mut self,
2144 dims: &[ArrayDimension],
2145 iter: I,
2146 ) -> Result<(), InvalidArrayError>
2147 where
2148 I: IntoIterator<Item = D>,
2149 D: Borrow<Datum<'a>>,
2150 {
2151 unsafe {
2153 self.push_array_with_unchecked(dims, |packer| {
2154 let mut nelements = 0;
2155 for datum in iter {
2156 packer.push(datum);
2157 nelements += 1;
2158 }
2159 Ok::<_, InvalidArrayError>(nelements)
2160 })
2161 }
2162 }
2163
2164 pub unsafe fn push_array_with_unchecked<F, E>(
2173 &mut self,
2174 dims: &[ArrayDimension],
2175 f: F,
2176 ) -> Result<(), E>
2177 where
2178 F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2179 E: From<InvalidArrayError>,
2180 {
2181 if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2193 return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2194 }
2195
2196 let start = self.row.data.len();
2197 self.row.data.push(Tag::Array.into());
2198
2199 self.row
2201 .data
2202 .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2203 for dim in dims {
2204 self.row
2205 .data
2206 .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2207 self.row
2208 .data
2209 .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2210 }
2211
2212 let off = self.row.data.len();
2214 self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2215 let nelements = match f(self) {
2216 Ok(nelements) => nelements,
2217 Err(e) => {
2218 self.row.data.truncate(start);
2219 return Err(e);
2220 }
2221 };
2222 let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2223 self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2224
2225 let cardinality = match dims {
2228 [] => 0,
2229 dims => dims.iter().map(|d| d.length).product(),
2230 };
2231 if nelements != cardinality {
2232 self.row.data.truncate(start);
2233 return Err(InvalidArrayError::WrongCardinality {
2234 actual: nelements,
2235 expected: cardinality,
2236 }
2237 .into());
2238 }
2239
2240 Ok(())
2241 }
2242
2243 pub fn push_array_with_row_major<F, I>(
2253 &mut self,
2254 dims: I,
2255 f: F,
2256 ) -> Result<(), InvalidArrayError>
2257 where
2258 I: IntoIterator<Item = ArrayDimension>,
2259 F: FnOnce(&mut RowPacker) -> usize,
2260 {
2261 let start = self.row.data.len();
2262 self.row.data.push(Tag::Array.into());
2263
2264 let dims_start = self.row.data.len();
2266 self.row.data.push(42);
2267
2268 let mut num_dims: u8 = 0;
2269 let mut cardinality: usize = 1;
2270 for dim in dims {
2271 num_dims += 1;
2272 cardinality *= dim.length;
2273
2274 self.row
2275 .data
2276 .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2277 self.row
2278 .data
2279 .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2280 }
2281
2282 if num_dims > MAX_ARRAY_DIMENSIONS {
2283 self.row.data.truncate(start);
2285 return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2286 }
2287 self.row.data[dims_start..dims_start + size_of::<u8>()]
2289 .copy_from_slice(&num_dims.to_le_bytes());
2290
2291 let off = self.row.data.len();
2293 self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2294
2295 let nelements = f(self);
2296
2297 let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2298 self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2299
2300 let cardinality = match num_dims {
2303 0 => 0,
2304 _ => cardinality,
2305 };
2306 if nelements != cardinality {
2307 self.row.data.truncate(start);
2308 return Err(InvalidArrayError::WrongCardinality {
2309 actual: nelements,
2310 expected: cardinality,
2311 });
2312 }
2313
2314 Ok(())
2315 }
2316
2317 pub fn push_list<'a, I, D>(&mut self, iter: I)
2321 where
2322 I: IntoIterator<Item = D>,
2323 D: Borrow<Datum<'a>>,
2324 {
2325 self.push_list_with(|packer| {
2326 for elem in iter {
2327 packer.push(*elem.borrow())
2328 }
2329 });
2330 }
2331
2332 pub fn push_dict<'a, I, D>(&mut self, iter: I)
2334 where
2335 I: IntoIterator<Item = (&'a str, D)>,
2336 D: Borrow<Datum<'a>>,
2337 {
2338 self.push_dict_with(|packer| {
2339 for (k, v) in iter {
2340 packer.push(Datum::String(k));
2341 packer.push(*v.borrow())
2342 }
2343 })
2344 }
2345
2346 pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2362 range.canonicalize()?;
2363 match range.inner {
2364 None => {
2365 self.row.data.push(Tag::Range.into());
2366 self.row.data.push(range::InternalFlags::EMPTY.bits());
2368 Ok(())
2369 }
2370 Some(inner) => self.push_range_with(
2371 RangeLowerBound {
2372 inclusive: inner.lower.inclusive,
2373 bound: inner
2374 .lower
2375 .bound
2376 .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2377 },
2378 RangeUpperBound {
2379 inclusive: inner.upper.inclusive,
2380 bound: inner
2381 .upper
2382 .bound
2383 .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2384 },
2385 ),
2386 }
2387 }
2388
2389 pub fn push_range_with<L, U, E>(
2412 &mut self,
2413 lower: RangeLowerBound<L>,
2414 upper: RangeUpperBound<U>,
2415 ) -> Result<(), E>
2416 where
2417 L: FnOnce(&mut RowPacker) -> Result<(), E>,
2418 U: FnOnce(&mut RowPacker) -> Result<(), E>,
2419 E: From<InvalidRangeError>,
2420 {
2421 let start = self.row.data.len();
2422 self.row.data.push(Tag::Range.into());
2423
2424 let mut flags = range::InternalFlags::empty();
2425
2426 flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2427 flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2428 flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2429 flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2430
2431 let mut expected_datums = 0;
2432
2433 self.row.data.push(flags.bits());
2434
2435 let datum_check = self.row.data.len();
2436
2437 if let Some(value) = lower.bound {
2438 let start = self.row.data.len();
2439 value(self)?;
2440 assert!(
2441 start < self.row.data.len(),
2442 "finite values must each push exactly one value; expected 1 but got 0"
2443 );
2444 expected_datums += 1;
2445 }
2446
2447 if let Some(value) = upper.bound {
2448 let start = self.row.data.len();
2449 value(self)?;
2450 assert!(
2451 start < self.row.data.len(),
2452 "finite values must each push exactly one value; expected 1 but got 0"
2453 );
2454 expected_datums += 1;
2455 }
2456
2457 let mut actual_datums = 0;
2461 let mut seen = None;
2462 let mut dataz = &self.row.data[datum_check..];
2463 while !dataz.is_empty() {
2464 let d = unsafe { read_datum(&mut dataz) };
2465 assert!(d != Datum::Null, "cannot push Datum::Null into range");
2466
2467 match seen {
2468 None => seen = Some(d),
2469 Some(seen) => {
2470 let seen_kind = DatumKind::from(seen);
2471 let d_kind = DatumKind::from(d);
2472 assert!(
2473 seen_kind == d_kind,
2474 "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2475 );
2476
2477 if seen > d {
2478 self.row.data.truncate(start);
2479 return Err(InvalidRangeError::MisorderedRangeBounds.into());
2480 }
2481 }
2482 }
2483 actual_datums += 1;
2484 }
2485
2486 assert!(
2487 actual_datums == expected_datums,
2488 "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2489 );
2490
2491 Ok(())
2492 }
2493
2494 pub fn clear(&mut self) {
2496 self.row.data.clear();
2497 }
2498
2499 pub unsafe fn truncate(&mut self, pos: usize) {
2512 self.row.data.truncate(pos)
2513 }
2514
2515 pub fn truncate_datums(&mut self, n: usize) {
2517 let prev_len = self.row.data.len();
2518 let mut iter = self.row.iter();
2519 for _ in iter.by_ref().take(n) {}
2520 let next_len = iter.data.len();
2521 unsafe { self.truncate(prev_len - next_len) }
2523 }
2524
2525 pub fn byte_len(&self) -> usize {
2527 self.row.byte_len()
2528 }
2529}
2530
2531impl<'a> IntoIterator for &'a Row {
2532 type Item = Datum<'a>;
2533 type IntoIter = DatumListIter<'a>;
2534 fn into_iter(self) -> DatumListIter<'a> {
2535 self.iter()
2536 }
2537}
2538
2539impl fmt::Debug for Row {
2540 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2542 f.write_str("Row{")?;
2543 f.debug_list().entries(self.iter()).finish()?;
2544 f.write_str("}")
2545 }
2546}
2547
2548impl fmt::Display for Row {
2549 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2551 f.write_str("(")?;
2552 for (i, datum) in self.iter().enumerate() {
2553 if i != 0 {
2554 f.write_str(", ")?;
2555 }
2556 write!(f, "{}", datum)?;
2557 }
2558 f.write_str(")")
2559 }
2560}
2561
2562impl<'a> DatumList<'a> {
2563 pub fn empty() -> DatumList<'static> {
2564 DatumList { data: &[] }
2565 }
2566
2567 pub fn iter(&self) -> DatumListIter<'a> {
2568 DatumListIter { data: self.data }
2569 }
2570
2571 pub fn data(&self) -> &'a [u8] {
2573 self.data
2574 }
2575}
2576
2577impl<'a> IntoIterator for &'a DatumList<'a> {
2578 type Item = Datum<'a>;
2579 type IntoIter = DatumListIter<'a>;
2580 fn into_iter(self) -> DatumListIter<'a> {
2581 self.iter()
2582 }
2583}
2584
2585impl<'a> Iterator for DatumListIter<'a> {
2586 type Item = Datum<'a>;
2587 fn next(&mut self) -> Option<Self::Item> {
2588 if self.data.is_empty() {
2589 None
2590 } else {
2591 Some(unsafe { read_datum(&mut self.data) })
2592 }
2593 }
2594}
2595
2596impl<'a> DatumMap<'a> {
2597 pub fn empty() -> DatumMap<'static> {
2598 DatumMap { data: &[] }
2599 }
2600
2601 pub fn iter(&self) -> DatumDictIter<'a> {
2602 DatumDictIter {
2603 data: self.data,
2604 prev_key: None,
2605 }
2606 }
2607
2608 pub fn data(&self) -> &'a [u8] {
2610 self.data
2611 }
2612}
2613
2614impl<'a> Debug for DatumMap<'a> {
2615 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2616 f.debug_map().entries(self.iter()).finish()
2617 }
2618}
2619
2620impl<'a> IntoIterator for &'a DatumMap<'a> {
2621 type Item = (&'a str, Datum<'a>);
2622 type IntoIter = DatumDictIter<'a>;
2623 fn into_iter(self) -> DatumDictIter<'a> {
2624 self.iter()
2625 }
2626}
2627
2628impl<'a> Iterator for DatumDictIter<'a> {
2629 type Item = (&'a str, Datum<'a>);
2630 fn next(&mut self) -> Option<Self::Item> {
2631 if self.data.is_empty() {
2632 None
2633 } else {
2634 let key_tag =
2635 Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2636 assert!(
2637 key_tag == Tag::StringTiny
2638 || key_tag == Tag::StringShort
2639 || key_tag == Tag::StringLong
2640 || key_tag == Tag::StringHuge,
2641 "Dict keys must be strings, got {:?}",
2642 key_tag
2643 );
2644 let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
2645 let val = unsafe { read_datum(&mut self.data) };
2646
2647 if cfg!(debug_assertions) {
2649 if let Some(prev_key) = self.prev_key {
2650 debug_assert!(
2651 prev_key < key,
2652 "Dict keys must be unique and given in ascending order: {} came before {}",
2653 prev_key,
2654 key
2655 );
2656 }
2657 self.prev_key = Some(key);
2658 }
2659
2660 Some((key, val))
2661 }
2662 }
2663}
2664
2665impl RowArena {
2666 pub fn new() -> Self {
2667 RowArena {
2668 inner: RefCell::new(vec![]),
2669 }
2670 }
2671
2672 pub fn with_capacity(capacity: usize) -> Self {
2675 RowArena {
2676 inner: RefCell::new(Vec::with_capacity(capacity)),
2677 }
2678 }
2679
2680 pub fn reserve(&self, additional: usize) {
2683 self.inner.borrow_mut().reserve(additional);
2684 }
2685
2686 #[allow(clippy::transmute_ptr_to_ptr)]
2688 pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
2689 let mut inner = self.inner.borrow_mut();
2690 inner.push(bytes);
2691 let owned_bytes = &inner[inner.len() - 1];
2692 unsafe {
2693 transmute::<&[u8], &'a [u8]>(owned_bytes)
2702 }
2703 }
2704
2705 pub fn push_string<'a>(&'a self, string: String) -> &'a str {
2707 let owned_bytes = self.push_bytes(string.into_bytes());
2708 unsafe {
2709 std::str::from_utf8_unchecked(owned_bytes)
2711 }
2712 }
2713
2714 pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
2720 let mut inner = self.inner.borrow_mut();
2721 inner.push(row.data.into_vec());
2722 unsafe {
2723 let datum = read_datum(&mut &inner[inner.len() - 1][..]);
2733 transmute::<Datum<'_>, Datum<'a>>(datum)
2734 }
2735 }
2736
2737 fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
2740 let mut inner = self.inner.borrow_mut();
2741 inner.push(row.data.into_vec());
2742 unsafe {
2743 let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
2753 transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
2754 }
2755 }
2756
2757 pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
2769 where
2770 F: FnOnce(&mut RowPacker),
2771 {
2772 let mut row = Row::default();
2773 f(&mut row.packer());
2774 self.push_unary_row(row)
2775 }
2776
2777 pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
2780 where
2781 F: FnOnce(&mut RowPacker),
2782 {
2783 let mut row = Row::default();
2784 f(&mut row.packer());
2785 self.push_unary_row_datum_nested(row)
2786 }
2787
2788 pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
2790 where
2791 F: FnOnce(&mut RowPacker) -> Result<(), E>,
2792 {
2793 let mut row = Row::default();
2794 f(&mut row.packer())?;
2795 Ok(self.push_unary_row(row))
2796 }
2797
2798 pub fn clear(&mut self) {
2800 self.inner.borrow_mut().clear();
2801 }
2802}
2803
2804impl Default for RowArena {
2805 fn default() -> RowArena {
2806 RowArena::new()
2807 }
2808}
2809
2810#[derive(Debug)]
2829pub struct SharedRow(Rc<RefCell<Row>>);
2830
2831impl SharedRow {
2832 thread_local! {
2833 static SHARED_ROW: Rc<RefCell<Row>> = Rc::new(RefCell::new(Row::default()));
2834 }
2835
2836 pub fn get() -> Self {
2844 let row = Self::SHARED_ROW.with(Rc::clone);
2845 row.borrow_mut().packer();
2847 Self(row)
2848 }
2849
2850 pub fn pack<'a, I, D>(iter: I) -> Row
2852 where
2853 I: IntoIterator<Item = D>,
2854 D: Borrow<Datum<'a>>,
2855 {
2856 let binding = Self::SHARED_ROW.with(Rc::clone);
2857 let mut row_builder = binding.borrow_mut();
2858 let mut row_packer = row_builder.packer();
2859 row_packer.extend(iter);
2860 row_builder.clone()
2861 }
2862
2863 pub fn pack_with<F, R>(&mut self, f: F) -> R
2869 where
2870 for<'a> F: FnOnce(&'a mut RowPacker<'a>) -> R,
2871 {
2872 let mut borrow = self.borrow_mut();
2873 let mut packer = borrow.packer();
2874 (f)(&mut packer)
2875 }
2876}
2877
2878impl std::ops::Deref for SharedRow {
2879 type Target = RefCell<Row>;
2880
2881 fn deref(&self) -> &Self::Target {
2882 &self.0
2883 }
2884}
2885
2886#[cfg(test)]
2887mod tests {
2888 use chrono::{DateTime, NaiveDate};
2889 use mz_ore::{assert_err, assert_none};
2890
2891 use crate::ScalarType;
2892
2893 use super::*;
2894
2895 #[mz_ore::test]
2896 fn test_assumptions() {
2897 assert_eq!(size_of::<Tag>(), 1);
2898 #[cfg(target_endian = "big")]
2899 {
2900 assert!(false);
2902 }
2903 }
2904
2905 #[mz_ore::test]
2906 fn miri_test_arena() {
2907 let arena = RowArena::new();
2908
2909 assert_eq!(arena.push_string("".to_owned()), "");
2910 assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
2911
2912 let empty: &[u8] = &[];
2913 assert_eq!(arena.push_bytes(vec![]), empty);
2914 assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
2915
2916 let mut row = Row::default();
2917 let mut packer = row.packer();
2918 packer.push_dict_with(|row| {
2919 row.push(Datum::String("a"));
2920 row.push_list_with(|row| {
2921 row.push(Datum::String("one"));
2922 row.push(Datum::String("two"));
2923 row.push(Datum::String("three"));
2924 });
2925 row.push(Datum::String("b"));
2926 row.push(Datum::String("c"));
2927 });
2928 assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
2929 }
2930
2931 #[mz_ore::test]
2932 fn miri_test_round_trip() {
2933 fn round_trip(datums: Vec<Datum>) {
2934 let row = Row::pack(datums.clone());
2935
2936 println!("{:?}", row.data());
2939
2940 let datums2 = row.iter().collect::<Vec<_>>();
2941 let datums3 = row.unpack();
2942 assert_eq!(datums, datums2);
2943 assert_eq!(datums, datums3);
2944 }
2945
2946 round_trip(vec![]);
2947 round_trip(
2948 ScalarType::enumerate()
2949 .iter()
2950 .flat_map(|r#type| r#type.interesting_datums())
2951 .collect(),
2952 );
2953 round_trip(vec![
2954 Datum::Null,
2955 Datum::Null,
2956 Datum::False,
2957 Datum::True,
2958 Datum::Int16(-21),
2959 Datum::Int32(-42),
2960 Datum::Int64(-2_147_483_648 - 42),
2961 Datum::UInt8(0),
2962 Datum::UInt8(1),
2963 Datum::UInt16(0),
2964 Datum::UInt16(1),
2965 Datum::UInt16(1 << 8),
2966 Datum::UInt32(0),
2967 Datum::UInt32(1),
2968 Datum::UInt32(1 << 8),
2969 Datum::UInt32(1 << 16),
2970 Datum::UInt32(1 << 24),
2971 Datum::UInt64(0),
2972 Datum::UInt64(1),
2973 Datum::UInt64(1 << 8),
2974 Datum::UInt64(1 << 16),
2975 Datum::UInt64(1 << 24),
2976 Datum::UInt64(1 << 32),
2977 Datum::UInt64(1 << 40),
2978 Datum::UInt64(1 << 48),
2979 Datum::UInt64(1 << 56),
2980 Datum::Float32(OrderedFloat::from(-42.12)),
2981 Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
2982 Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
2983 Datum::Timestamp(
2984 CheckedTimestamp::from_timestamplike(
2985 NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
2986 .unwrap()
2987 .and_hms_opt(14, 32, 11)
2988 .unwrap(),
2989 )
2990 .unwrap(),
2991 ),
2992 Datum::TimestampTz(
2993 CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
2994 .unwrap(),
2995 ),
2996 Datum::Interval(Interval {
2997 months: 312,
2998 ..Default::default()
2999 }),
3000 Datum::Interval(Interval::new(0, 0, 1_012_312)),
3001 Datum::Bytes(&[]),
3002 Datum::Bytes(&[0, 2, 1, 255]),
3003 Datum::String(""),
3004 Datum::String("العَرَبِيَّة"),
3005 ]);
3006 }
3007
3008 #[mz_ore::test]
3009 fn test_array() {
3010 const DIM: ArrayDimension = ArrayDimension {
3013 lower_bound: 2,
3014 length: 2,
3015 };
3016 let mut row = Row::default();
3017 let mut packer = row.packer();
3018 packer
3019 .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3020 .unwrap();
3021 let arr1 = row.unpack_first().unwrap_array();
3022 assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3023 assert_eq!(
3024 arr1.elements().into_iter().collect::<Vec<_>>(),
3025 vec![Datum::Int32(1), Datum::Int32(2)]
3026 );
3027
3028 let row = Row::pack_slice(&[Datum::Array(arr1)]);
3031 let arr2 = row.unpack_first().unwrap_array();
3032 assert_eq!(arr1, arr2);
3033 }
3034
3035 #[mz_ore::test]
3036 fn test_multidimensional_array() {
3037 let datums = vec![
3038 Datum::Int32(1),
3039 Datum::Int32(2),
3040 Datum::Int32(3),
3041 Datum::Int32(4),
3042 Datum::Int32(5),
3043 Datum::Int32(6),
3044 Datum::Int32(7),
3045 Datum::Int32(8),
3046 ];
3047
3048 let mut row = Row::default();
3049 let mut packer = row.packer();
3050 packer
3051 .try_push_array(
3052 &[
3053 ArrayDimension {
3054 lower_bound: 1,
3055 length: 1,
3056 },
3057 ArrayDimension {
3058 lower_bound: 1,
3059 length: 4,
3060 },
3061 ArrayDimension {
3062 lower_bound: 1,
3063 length: 2,
3064 },
3065 ],
3066 &datums,
3067 )
3068 .unwrap();
3069 let array = row.unpack_first().unwrap_array();
3070 assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3071 }
3072
3073 #[mz_ore::test]
3074 fn test_array_max_dimensions() {
3075 let mut row = Row::default();
3076 let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3077
3078 let res = row.packer().try_push_array(
3080 &vec![
3081 ArrayDimension {
3082 lower_bound: 1,
3083 length: 1
3084 };
3085 max_dims + 1
3086 ],
3087 vec![Datum::Int32(4)],
3088 );
3089 assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3090 assert!(row.data.is_empty());
3091
3092 row.packer()
3095 .try_push_array(
3096 &vec![
3097 ArrayDimension {
3098 lower_bound: 1,
3099 length: 1
3100 };
3101 max_dims
3102 ],
3103 vec![Datum::Int32(4)],
3104 )
3105 .unwrap();
3106 }
3107
3108 #[mz_ore::test]
3109 fn test_array_wrong_cardinality() {
3110 let mut row = Row::default();
3111 let res = row.packer().try_push_array(
3112 &[
3113 ArrayDimension {
3114 lower_bound: 1,
3115 length: 2,
3116 },
3117 ArrayDimension {
3118 lower_bound: 1,
3119 length: 3,
3120 },
3121 ],
3122 vec![Datum::Int32(1), Datum::Int32(2)],
3123 );
3124 assert_eq!(
3125 res,
3126 Err(InvalidArrayError::WrongCardinality {
3127 actual: 2,
3128 expected: 6,
3129 })
3130 );
3131 assert!(row.data.is_empty());
3132 }
3133
3134 #[mz_ore::test]
3135 fn test_nesting() {
3136 let mut row = Row::default();
3137 row.packer().push_dict_with(|row| {
3138 row.push(Datum::String("favourites"));
3139 row.push_list_with(|row| {
3140 row.push(Datum::String("ice cream"));
3141 row.push(Datum::String("oreos"));
3142 row.push(Datum::String("cheesecake"));
3143 });
3144 row.push(Datum::String("name"));
3145 row.push(Datum::String("bob"));
3146 });
3147
3148 let mut iter = row.unpack_first().unwrap_map().iter();
3149
3150 let (k, v) = iter.next().unwrap();
3151 assert_eq!(k, "favourites");
3152 assert_eq!(
3153 v.unwrap_list().iter().collect::<Vec<_>>(),
3154 vec![
3155 Datum::String("ice cream"),
3156 Datum::String("oreos"),
3157 Datum::String("cheesecake"),
3158 ]
3159 );
3160
3161 let (k, v) = iter.next().unwrap();
3162 assert_eq!(k, "name");
3163 assert_eq!(v, Datum::String("bob"));
3164 }
3165
3166 #[mz_ore::test]
3167 fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3168 let pack = |ok| {
3169 let mut row = Row::default();
3170 row.packer().push_dict_with(|row| {
3171 if ok {
3172 row.push(Datum::String("key"));
3173 row.push(Datum::Int32(42));
3174 Ok(7)
3175 } else {
3176 Err("fail")
3177 }
3178 })?;
3179 Ok(row)
3180 };
3181
3182 assert_eq!(pack(false), Err("fail"));
3183
3184 let row = pack(true)?;
3185 let mut dict = row.unpack_first().unwrap_map().iter();
3186 assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3187 assert_eq!(dict.next(), None);
3188
3189 Ok(())
3190 }
3191
3192 #[mz_ore::test]
3193 #[cfg_attr(miri, ignore)] fn test_datum_sizes() {
3195 let arena = RowArena::new();
3196
3197 let values_of_interest = vec![
3199 Datum::Null,
3200 Datum::False,
3201 Datum::Int16(0),
3202 Datum::Int32(0),
3203 Datum::Int64(0),
3204 Datum::UInt8(0),
3205 Datum::UInt8(1),
3206 Datum::UInt16(0),
3207 Datum::UInt16(1),
3208 Datum::UInt16(1 << 8),
3209 Datum::UInt32(0),
3210 Datum::UInt32(1),
3211 Datum::UInt32(1 << 8),
3212 Datum::UInt32(1 << 16),
3213 Datum::UInt32(1 << 24),
3214 Datum::UInt64(0),
3215 Datum::UInt64(1),
3216 Datum::UInt64(1 << 8),
3217 Datum::UInt64(1 << 16),
3218 Datum::UInt64(1 << 24),
3219 Datum::UInt64(1 << 32),
3220 Datum::UInt64(1 << 40),
3221 Datum::UInt64(1 << 48),
3222 Datum::UInt64(1 << 56),
3223 Datum::Float32(OrderedFloat(0.0)),
3224 Datum::Float64(OrderedFloat(0.0)),
3225 Datum::from(numeric::Numeric::from(0)),
3226 Datum::from(numeric::Numeric::from(1000)),
3227 Datum::from(numeric::Numeric::from(9999)),
3228 Datum::Date(
3229 NaiveDate::from_ymd_opt(1, 1, 1)
3230 .unwrap()
3231 .try_into()
3232 .unwrap(),
3233 ),
3234 Datum::Timestamp(
3235 CheckedTimestamp::from_timestamplike(
3236 DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3237 )
3238 .unwrap(),
3239 ),
3240 Datum::TimestampTz(
3241 CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3242 .unwrap(),
3243 ),
3244 Datum::Interval(Interval::default()),
3245 Datum::Bytes(&[]),
3246 Datum::String(""),
3247 Datum::JsonNull,
3248 Datum::Range(Range { inner: None }),
3249 arena.make_datum(|packer| {
3250 packer
3251 .push_range(Range::new(Some((
3252 RangeLowerBound::new(Datum::Int32(-1), true),
3253 RangeUpperBound::new(Datum::Int32(1), true),
3254 ))))
3255 .unwrap();
3256 }),
3257 ];
3258 for value in values_of_interest {
3259 if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3260 panic!("Disparity in claimed size for {:?}", value);
3261 }
3262 }
3263 }
3264
3265 #[mz_ore::test]
3266 fn test_range_errors() {
3267 fn test_range_errors_inner<'a>(
3268 datums: Vec<Vec<Datum<'a>>>,
3269 ) -> Result<(), InvalidRangeError> {
3270 let mut row = Row::default();
3271 let row_len = row.byte_len();
3272 let mut packer = row.packer();
3273 let r = packer.push_range_with(
3274 RangeLowerBound {
3275 inclusive: true,
3276 bound: Some(|row: &mut RowPacker| {
3277 for d in &datums[0] {
3278 row.push(d);
3279 }
3280 Ok(())
3281 }),
3282 },
3283 RangeUpperBound {
3284 inclusive: true,
3285 bound: Some(|row: &mut RowPacker| {
3286 for d in &datums[1] {
3287 row.push(d);
3288 }
3289 Ok(())
3290 }),
3291 },
3292 );
3293
3294 assert_eq!(row_len, row.byte_len());
3295
3296 r
3297 }
3298
3299 for panicking_case in [
3300 vec![vec![Datum::Int32(1)], vec![]],
3301 vec![
3302 vec![Datum::Int32(1), Datum::Int32(2)],
3303 vec![Datum::Int32(3)],
3304 ],
3305 vec![
3306 vec![Datum::Int32(1)],
3307 vec![Datum::Int32(2), Datum::Int32(3)],
3308 ],
3309 vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3310 vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3311 vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3312 vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3313 ] {
3314 #[allow(clippy::disallowed_methods)] let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3316 assert_err!(result);
3317 }
3318
3319 let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3320 assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3321 }
3322
3323 #[mz_ore::test]
3325 #[cfg_attr(miri, ignore)] fn test_list_encoding() {
3327 fn test_list_encoding_inner(len: usize) {
3328 let list_elem = |i: usize| {
3329 if i % 2 == 0 {
3330 Datum::False
3331 } else {
3332 Datum::True
3333 }
3334 };
3335 let mut row = Row::default();
3336 {
3337 let mut packer = row.packer();
3339 packer.push(Datum::String("start"));
3340 packer.push_list_with(|packer| {
3341 for i in 0..len {
3342 packer.push(list_elem(i));
3343 }
3344 });
3345 packer.push(Datum::String("end"));
3346 }
3347 let mut row_it = row.iter();
3349 assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3350 match row_it.next().unwrap() {
3351 Datum::List(list) => {
3352 let mut list_it = list.iter();
3353 for i in 0..len {
3354 assert_eq!(list_it.next().unwrap(), list_elem(i));
3355 }
3356 assert_none!(list_it.next());
3357 }
3358 _ => panic!("expected Datum::List"),
3359 }
3360 assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3361 assert_none!(row_it.next());
3362 }
3363
3364 test_list_encoding_inner(0);
3365 test_list_encoding_inner(1);
3366 test_list_encoding_inner(10);
3367 test_list_encoding_inner(TINY - 1); test_list_encoding_inner(TINY + 1); test_list_encoding_inner(SHORT + 1); }
3374}