1use std::borrow::Borrow;
11use std::cell::{Cell, RefCell};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::mem::{size_of, transmute};
16use std::ops::Deref;
17use std::str;
18
19use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
20use compact_bytes::CompactBytes;
21use mz_ore::cast::{CastFrom, ReinterpretCast};
22use mz_ore::soft_assert_no_log;
23use mz_ore::vec::Vector;
24use mz_persist_types::Codec64;
25use num_enum::{IntoPrimitive, TryFromPrimitive};
26use ordered_float::OrderedFloat;
27use proptest::prelude::*;
28use proptest::strategy::{BoxedStrategy, Strategy};
29use serde::{Deserialize, Serialize};
30use uuid::Uuid;
31
32use crate::adt::array::{
33 Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
34};
35use crate::adt::date::Date;
36use crate::adt::interval::Interval;
37use crate::adt::mz_acl_item::{AclItem, MzAclItem};
38use crate::adt::numeric;
39use crate::adt::numeric::Numeric;
40use crate::adt::range::{
41 self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
42};
43use crate::adt::timestamp::CheckedTimestamp;
44use crate::scalar::{DatumKind, arb_datum};
45use crate::{Datum, RelationDesc, Timestamp};
46
47pub(crate) mod encode;
48pub mod iter;
49
50include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
51
52#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
109pub struct Row {
110 data: CompactBytes,
111}
112
113impl Row {
114 const SIZE: usize = CompactBytes::MAX_INLINE;
115
116 pub fn decode_from_proto(
119 &mut self,
120 proto: &ProtoRow,
121 desc: &RelationDesc,
122 ) -> Result<(), String> {
123 let mut packer = self.packer();
124 for (col_idx, _, _) in desc.iter_all() {
125 let d = match proto.datums.get(col_idx.to_raw()) {
126 Some(x) => x,
127 None => {
128 packer.push(Datum::Null);
129 continue;
130 }
131 };
132 packer.try_push_proto(d)?;
133 }
134
135 Ok(())
136 }
137
138 #[inline]
140 pub fn with_capacity(cap: usize) -> Self {
141 Self {
142 data: CompactBytes::with_capacity(cap),
143 }
144 }
145
146 #[inline]
148 pub const fn empty() -> Self {
149 Self {
150 data: CompactBytes::empty(),
151 }
152 }
153
154 pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
161 Row {
162 data: CompactBytes::new(data),
163 }
164 }
165
166 pub fn packer(&mut self) -> RowPacker<'_> {
172 self.clear();
173 RowPacker { row: self }
174 }
175
176 pub fn pack<'a, I, D>(iter: I) -> Row
184 where
185 I: IntoIterator<Item = D>,
186 D: Borrow<Datum<'a>>,
187 {
188 let mut row = Row::default();
189 row.packer().extend(iter);
190 row
191 }
192
193 pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
198 where
199 I: IntoIterator<Item = D>,
200 D: Borrow<Datum<'a>>,
201 {
202 self.packer().extend(iter);
203 self.clone()
204 }
205
206 pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
210 where
211 I: IntoIterator<Item = Result<D, E>>,
212 D: Borrow<Datum<'a>>,
213 {
214 let mut row = Row::default();
215 row.packer().try_extend(iter)?;
216 Ok(row)
217 }
218
219 pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
225 let mut row = Row::with_capacity(datums_size(slice.iter()));
227 row.packer().extend(slice.iter());
228 row
229 }
230
231 pub fn byte_len(&self) -> usize {
233 let heap_size = if self.data.spilled() {
234 self.data.len()
235 } else {
236 0
237 };
238 let inline_size = std::mem::size_of::<Self>();
239 inline_size.saturating_add(heap_size)
240 }
241
242 pub fn data_len(&self) -> usize {
244 self.data.len()
245 }
246
247 pub fn byte_capacity(&self) -> usize {
249 self.data.capacity()
250 }
251
252 #[inline]
254 pub fn as_row_ref(&self) -> &RowRef {
255 RowRef::from_slice(self.data.as_slice())
256 }
257
258 #[inline]
260 fn clear(&mut self) {
261 self.data.clear();
262 }
263}
264
265impl Borrow<RowRef> for Row {
266 #[inline]
267 fn borrow(&self) -> &RowRef {
268 self.as_row_ref()
269 }
270}
271
272impl AsRef<RowRef> for Row {
273 #[inline]
274 fn as_ref(&self) -> &RowRef {
275 self.as_row_ref()
276 }
277}
278
279impl Deref for Row {
280 type Target = RowRef;
281
282 #[inline]
283 fn deref(&self) -> &Self::Target {
284 self.as_row_ref()
285 }
286}
287
288static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
290
291impl Clone for Row {
292 fn clone(&self) -> Self {
293 Row {
294 data: self.data.clone(),
295 }
296 }
297
298 fn clone_from(&mut self, source: &Self) {
299 self.data.clone_from(&source.data);
300 }
301}
302
303impl std::hash::Hash for Row {
305 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
306 self.as_row_ref().hash(state)
307 }
308}
309
310impl Arbitrary for Row {
311 type Parameters = prop::collection::SizeRange;
312 type Strategy = BoxedStrategy<Row>;
313
314 fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
315 prop::collection::vec(arb_datum(), size)
316 .prop_map(|items| {
317 let mut row = Row::default();
318 let mut packer = row.packer();
319 for item in items.iter() {
320 let datum: Datum<'_> = item.into();
321 packer.push(datum);
322 }
323 row
324 })
325 .boxed()
326 }
327}
328
329impl PartialOrd for Row {
330 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
331 Some(self.cmp(other))
332 }
333}
334
335impl Ord for Row {
336 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
337 self.as_ref().cmp(other.as_ref())
338 }
339}
340
341#[allow(missing_debug_implementations)]
342mod columnation {
343 use columnation::{Columnation, Region};
344 use mz_ore::region::LgAllocRegion;
345
346 use crate::Row;
347
348 pub struct RowStack {
353 region: LgAllocRegion<u8>,
354 }
355
356 impl RowStack {
357 const LIMIT: usize = 2 << 20;
358 }
359
360 impl Default for RowStack {
362 fn default() -> Self {
363 Self {
364 region: LgAllocRegion::with_limit(Self::LIMIT),
366 }
367 }
368 }
369
370 impl Columnation for Row {
371 type InnerRegion = RowStack;
372 }
373
374 impl Region for RowStack {
375 type Item = Row;
376 #[inline]
377 fn clear(&mut self) {
378 self.region.clear();
379 }
380 #[inline(always)]
381 unsafe fn copy(&mut self, item: &Row) -> Row {
382 if item.data.spilled() {
383 let bytes = self.region.copy_slice(&item.data[..]);
384 Row {
385 data: compact_bytes::CompactBytes::from_raw_parts(
386 bytes.as_mut_ptr(),
387 item.data.len(),
388 item.data.capacity(),
389 ),
390 }
391 } else {
392 item.clone()
393 }
394 }
395
396 fn reserve_items<'a, I>(&mut self, items: I)
397 where
398 Self: 'a,
399 I: Iterator<Item = &'a Self::Item> + Clone,
400 {
401 let size = items
402 .filter(|row| row.data.spilled())
403 .map(|row| row.data.len())
404 .sum();
405 let size = std::cmp::min(size, Self::LIMIT);
406 self.region.reserve(size);
407 }
408
409 fn reserve_regions<'a, I>(&mut self, regions: I)
410 where
411 Self: 'a,
412 I: Iterator<Item = &'a Self> + Clone,
413 {
414 let size = regions.map(|r| r.region.len()).sum();
415 let size = std::cmp::min(size, Self::LIMIT);
416 self.region.reserve(size);
417 }
418
419 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
420 self.region.heap_size(callback)
421 }
422 }
423}
424
425mod columnar {
426 use columnar::{
427 AsBytes, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
428 };
429 use mz_ore::cast::CastFrom;
430
431 use crate::{Row, RowRef};
432
433 #[derive(Copy, Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
434 pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
435 pub bounds: BC,
437 pub values: VC,
439 }
440
441 impl Columnar for Row {
442 type Ref<'a> = &'a RowRef;
443 fn copy_from(&mut self, other: Self::Ref<'_>) {
444 self.clear();
445 self.data.extend_from_slice(other.data());
446 }
447 fn into_owned(other: Self::Ref<'_>) -> Self {
448 other.to_owned()
449 }
450 type Container = Rows;
451 }
452
453 impl<'b, BC: Container<u64>> Container<Row> for Rows<BC, &'b [u8]> {
454 type Borrowed<'a>
455 = Rows<BC::Borrowed<'a>, &'a [u8]>
456 where
457 Self: 'a;
458 fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
459 Rows {
460 bounds: self.bounds.borrow(),
461 values: self.values,
462 }
463 }
464 }
465 impl<BC: Container<u64>> Container<Row> for Rows<BC, Vec<u8>> {
466 type Borrowed<'a>
467 = Rows<BC::Borrowed<'a>, &'a [u8]>
468 where
469 BC: 'a;
470 fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
471 Rows {
472 bounds: self.bounds.borrow(),
473 values: self.values.borrow(),
474 }
475 }
476 }
477
478 impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
479 fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
480 self.bounds.as_bytes().chain(self.values.as_bytes())
481 }
482 }
483 impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
484 fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
485 Self {
486 bounds: FromBytes::from_bytes(bytes),
487 values: FromBytes::from_bytes(bytes),
488 }
489 }
490 }
491
492 impl<BC: Len, VC> Len for Rows<BC, VC> {
493 #[inline(always)]
494 fn len(&self) -> usize {
495 self.bounds.len()
496 }
497 }
498
499 impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
500 type Ref = &'a RowRef;
501 #[inline(always)]
502 fn get(&self, index: usize) -> Self::Ref {
503 let lower = if index == 0 {
504 0
505 } else {
506 self.bounds.index_as(index - 1)
507 };
508 let upper = self.bounds.index_as(index);
509 let lower = usize::cast_from(lower);
510 let upper = usize::cast_from(upper);
511 RowRef::from_slice(&self.values[lower..upper])
512 }
513 }
514 impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
515 type Ref = &'a RowRef;
516 #[inline(always)]
517 fn get(&self, index: usize) -> Self::Ref {
518 let lower = if index == 0 {
519 0
520 } else {
521 self.bounds.index_as(index - 1)
522 };
523 let upper = self.bounds.index_as(index);
524 let lower = usize::cast_from(lower);
525 let upper = usize::cast_from(upper);
526 RowRef::from_slice(&self.values[lower..upper])
527 }
528 }
529
530 impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
531 #[inline(always)]
532 fn push(&mut self, item: &Row) {
533 self.values.extend_from_slice(item.data.as_slice());
534 self.bounds.push(u64::cast_from(self.values.len()));
535 }
536 }
537 impl<BC: Push<u64>> Push<&RowRef> for Rows<BC> {
538 fn push(&mut self, item: &RowRef) {
539 self.values.extend_from_slice(item.data());
540 self.bounds.push(u64::cast_from(self.values.len()));
541 }
542 }
543 impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
544 fn clear(&mut self) {
545 self.bounds.clear();
546 self.values.clear();
547 }
548 }
549 impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
550 fn heap_size(&self) -> (usize, usize) {
551 let (l0, c0) = self.bounds.heap_size();
552 let (l1, c1) = self.values.heap_size();
553 (l0 + l1, c0 + c1)
554 }
555 }
556}
557
558#[derive(PartialEq, Eq, Hash)]
562#[repr(transparent)]
563pub struct RowRef([u8]);
564
565impl RowRef {
566 pub fn from_slice(row: &[u8]) -> &RowRef {
571 #[allow(clippy::as_conversions)]
572 let ptr = row as *const [u8] as *const RowRef;
573 unsafe { &*ptr }
575 }
576
577 pub fn unpack(&self) -> Vec<Datum> {
579 let len = self.iter().count();
581 let mut vec = Vec::with_capacity(len);
582 vec.extend(self.iter());
583 vec
584 }
585
586 pub fn unpack_first(&self) -> Datum {
590 self.iter().next().unwrap()
591 }
592
593 pub fn iter(&self) -> DatumListIter {
595 DatumListIter { data: &self.0 }
596 }
597
598 pub fn byte_len(&self) -> usize {
600 self.0.len()
601 }
602
603 pub fn data(&self) -> &[u8] {
605 &self.0
606 }
607
608 pub fn is_empty(&self) -> bool {
610 self.0.is_empty()
611 }
612}
613
614impl ToOwned for RowRef {
615 type Owned = Row;
616
617 fn to_owned(&self) -> Self::Owned {
618 unsafe { Row::from_bytes_unchecked(&self.0) }
620 }
621}
622
623impl<'a> IntoIterator for &'a RowRef {
624 type Item = Datum<'a>;
625 type IntoIter = DatumListIter<'a>;
626
627 fn into_iter(self) -> DatumListIter<'a> {
628 DatumListIter { data: &self.0 }
629 }
630}
631
632impl PartialOrd for RowRef {
636 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
637 Some(self.cmp(other))
638 }
639}
640
641impl Ord for RowRef {
642 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
643 match self.0.len().cmp(&other.0.len()) {
644 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
645 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
646 std::cmp::Ordering::Equal => self.0.cmp(&other.0),
647 }
648 }
649}
650
651impl fmt::Debug for RowRef {
652 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
654 f.write_str("RowRef{")?;
655 f.debug_list().entries(self.into_iter()).finish()?;
656 f.write_str("}")
657 }
658}
659
660#[derive(Debug)]
668pub struct RowPacker<'a> {
669 row: &'a mut Row,
670}
671
672#[derive(Debug, Clone)]
673pub struct DatumListIter<'a> {
674 data: &'a [u8],
675}
676
677#[derive(Debug, Clone)]
678pub struct DatumDictIter<'a> {
679 data: &'a [u8],
680 prev_key: Option<&'a str>,
681}
682
683#[derive(Debug)]
685pub struct RowArena {
686 inner: RefCell<Vec<Vec<u8>>>,
693}
694
695#[derive(Clone, Copy, Eq, PartialEq, Hash)]
699pub struct DatumList<'a> {
700 data: &'a [u8],
702}
703
704impl<'a> Debug for DatumList<'a> {
705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706 f.debug_list().entries(self.iter()).finish()
707 }
708}
709
710impl Ord for DatumList<'_> {
711 fn cmp(&self, other: &DatumList) -> Ordering {
712 self.iter().cmp(other.iter())
713 }
714}
715
716impl PartialOrd for DatumList<'_> {
717 fn partial_cmp(&self, other: &DatumList) -> Option<Ordering> {
718 Some(self.cmp(other))
719 }
720}
721
722#[derive(Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
724pub struct DatumMap<'a> {
725 data: &'a [u8],
727}
728
729#[derive(Clone, Copy, Eq, PartialEq, Hash)]
732pub struct DatumNested<'a> {
733 val: &'a [u8],
734}
735
736impl<'a> std::fmt::Display for DatumNested<'a> {
737 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
738 std::fmt::Display::fmt(&self.datum(), f)
739 }
740}
741
742impl<'a> std::fmt::Debug for DatumNested<'a> {
743 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
744 f.debug_struct("DatumNested")
745 .field("val", &self.datum())
746 .finish()
747 }
748}
749
750impl<'a> DatumNested<'a> {
751 pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
755 let prev = *data;
756 let _ = unsafe { read_datum(data) };
757 DatumNested {
758 val: &prev[..(prev.len() - data.len())],
759 }
760 }
761
762 pub fn datum(&self) -> Datum<'a> {
764 let mut temp = self.val;
765 unsafe { read_datum(&mut temp) }
766 }
767}
768
769impl<'a> Ord for DatumNested<'a> {
770 fn cmp(&self, other: &Self) -> Ordering {
771 self.datum().cmp(&other.datum())
772 }
773}
774
775impl<'a> PartialOrd for DatumNested<'a> {
776 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
777 Some(self.cmp(other))
778 }
779}
780
781#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
785#[repr(u8)]
786enum Tag {
787 Null,
788 False,
789 True,
790 Int16,
791 Int32,
792 Int64,
793 UInt8,
794 UInt32,
795 Float32,
796 Float64,
797 Date,
798 Time,
799 Timestamp,
800 TimestampTz,
801 Interval,
802 BytesTiny,
803 BytesShort,
804 BytesLong,
805 BytesHuge,
806 StringTiny,
807 StringShort,
808 StringLong,
809 StringHuge,
810 Uuid,
811 Array,
812 ListTiny,
813 ListShort,
814 ListLong,
815 ListHuge,
816 Dict,
817 JsonNull,
818 Dummy,
819 Numeric,
820 UInt16,
821 UInt64,
822 MzTimestamp,
823 Range,
824 MzAclItem,
825 AclItem,
826 CheapTimestamp,
830 CheapTimestampTz,
834 NonNegativeInt16_0, NonNegativeInt16_8,
847 NonNegativeInt16_16,
848
849 NonNegativeInt32_0,
850 NonNegativeInt32_8,
851 NonNegativeInt32_16,
852 NonNegativeInt32_24,
853 NonNegativeInt32_32,
854
855 NonNegativeInt64_0,
856 NonNegativeInt64_8,
857 NonNegativeInt64_16,
858 NonNegativeInt64_24,
859 NonNegativeInt64_32,
860 NonNegativeInt64_40,
861 NonNegativeInt64_48,
862 NonNegativeInt64_56,
863 NonNegativeInt64_64,
864
865 NegativeInt16_0, NegativeInt16_8,
867 NegativeInt16_16,
868
869 NegativeInt32_0,
870 NegativeInt32_8,
871 NegativeInt32_16,
872 NegativeInt32_24,
873 NegativeInt32_32,
874
875 NegativeInt64_0,
876 NegativeInt64_8,
877 NegativeInt64_16,
878 NegativeInt64_24,
879 NegativeInt64_32,
880 NegativeInt64_40,
881 NegativeInt64_48,
882 NegativeInt64_56,
883 NegativeInt64_64,
884
885 UInt8_0, UInt8_8,
889
890 UInt16_0,
891 UInt16_8,
892 UInt16_16,
893
894 UInt32_0,
895 UInt32_8,
896 UInt32_16,
897 UInt32_24,
898 UInt32_32,
899
900 UInt64_0,
901 UInt64_8,
902 UInt64_16,
903 UInt64_24,
904 UInt64_32,
905 UInt64_40,
906 UInt64_48,
907 UInt64_56,
908 UInt64_64,
909}
910
911impl Tag {
912 fn actual_int_length(self) -> Option<usize> {
913 use Tag::*;
914 let val = match self {
915 NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
916 | UInt32_0 | UInt64_0 => 0,
917 NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
918 | UInt32_8 | UInt64_8 => 1,
919 NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
920 | UInt32_16 | UInt64_16 => 2,
921 NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
922 NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
923 NonNegativeInt64_40 | UInt64_40 => 5,
924 NonNegativeInt64_48 | UInt64_48 => 6,
925 NonNegativeInt64_56 | UInt64_56 => 7,
926 NonNegativeInt64_64 | UInt64_64 => 8,
927 NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
928 NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
929 NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
930 NegativeInt32_24 | NegativeInt64_24 => 3,
931 NegativeInt32_32 | NegativeInt64_32 => 4,
932 NegativeInt64_40 => 5,
933 NegativeInt64_48 => 6,
934 NegativeInt64_56 => 7,
935 NegativeInt64_64 => 8,
936
937 _ => return None,
938 };
939 Some(val)
940 }
941}
942
943fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
950 let len = u64::from_le_bytes(read_byte_array(data));
951 let len = usize::cast_from(len);
952 let (bytes, next) = data.split_at(len);
953 *data = next;
954 bytes
955}
956
957unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
966 let len = match tag {
967 Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
968 Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
969 usize::from(u16::from_le_bytes(read_byte_array(data)))
970 }
971 Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
972 usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
973 }
974 Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
975 usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
976 }
977 _ => unreachable!(),
978 };
979 let (bytes, next) = data.split_at(len);
980 *data = next;
981 match tag {
982 Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
983 Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
984 Datum::String(str::from_utf8_unchecked(bytes))
985 }
986 Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
987 Datum::List(DatumList { data: bytes })
988 }
989 _ => unreachable!(),
990 }
991}
992
993fn read_byte(data: &mut &[u8]) -> u8 {
994 let byte = data[0];
995 *data = &data[1..];
996 byte
997}
998
999fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1007 data: &mut &[u8],
1008 length: usize,
1009) -> [u8; N] {
1010 let mut raw = [FILL; N];
1011 let (prev, next) = data.split_at(length);
1012 (raw[..prev.len()]).copy_from_slice(prev);
1013 *data = next;
1014 raw
1015}
1016fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1024 read_byte_array_sign_extending::<N, 255>(data, length)
1025}
1026
1027fn read_byte_array_extending_nonnegative<const N: usize>(
1035 data: &mut &[u8],
1036 length: usize,
1037) -> [u8; N] {
1038 read_byte_array_sign_extending::<N, 0>(data, length)
1039}
1040
1041pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1042 let (prev, next) = data.split_first_chunk().unwrap();
1043 *data = next;
1044 *prev
1045}
1046
1047pub(super) fn read_date(data: &mut &[u8]) -> Date {
1048 let days = i32::from_le_bytes(read_byte_array(data));
1049 Date::from_pg_epoch(days).expect("unexpected date")
1050}
1051
1052pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1053 let year = i32::from_le_bytes(read_byte_array(data));
1054 let ordinal = u32::from_le_bytes(read_byte_array(data));
1055 NaiveDate::from_yo_opt(year, ordinal).unwrap()
1056}
1057
1058pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1059 let secs = u32::from_le_bytes(read_byte_array(data));
1060 let nanos = u32::from_le_bytes(read_byte_array(data));
1061 NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1062}
1063
1064pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1073 let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1074 match tag {
1075 Tag::Null => Datum::Null,
1076 Tag::False => Datum::False,
1077 Tag::True => Datum::True,
1078 Tag::UInt8_0 | Tag::UInt8_8 => {
1079 let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1080 data,
1081 tag.actual_int_length()
1082 .expect("returns a value for variable-length-encoded integer tags"),
1083 ));
1084 Datum::UInt8(i)
1085 }
1086 Tag::Int16 => {
1087 let i = i16::from_le_bytes(read_byte_array(data));
1088 Datum::Int16(i)
1089 }
1090 Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1091 let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1095 data,
1096 tag.actual_int_length()
1097 .expect("returns a value for variable-length-encoded integer tags"),
1098 ));
1099 Datum::Int16(i)
1100 }
1101 Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1102 let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1103 data,
1104 tag.actual_int_length()
1105 .expect("returns a value for variable-length-encoded integer tags"),
1106 ));
1107 Datum::UInt16(i)
1108 }
1109 Tag::Int32 => {
1110 let i = i32::from_le_bytes(read_byte_array(data));
1111 Datum::Int32(i)
1112 }
1113 Tag::NonNegativeInt32_0
1114 | Tag::NonNegativeInt32_32
1115 | Tag::NonNegativeInt32_8
1116 | Tag::NonNegativeInt32_16
1117 | Tag::NonNegativeInt32_24 => {
1118 let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1122 data,
1123 tag.actual_int_length()
1124 .expect("returns a value for variable-length-encoded integer tags"),
1125 ));
1126 Datum::Int32(i)
1127 }
1128 Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1129 let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1130 data,
1131 tag.actual_int_length()
1132 .expect("returns a value for variable-length-encoded integer tags"),
1133 ));
1134 Datum::UInt32(i)
1135 }
1136 Tag::Int64 => {
1137 let i = i64::from_le_bytes(read_byte_array(data));
1138 Datum::Int64(i)
1139 }
1140 Tag::NonNegativeInt64_0
1141 | Tag::NonNegativeInt64_64
1142 | Tag::NonNegativeInt64_8
1143 | Tag::NonNegativeInt64_16
1144 | Tag::NonNegativeInt64_24
1145 | Tag::NonNegativeInt64_32
1146 | Tag::NonNegativeInt64_40
1147 | Tag::NonNegativeInt64_48
1148 | Tag::NonNegativeInt64_56 => {
1149 let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1154 data,
1155 tag.actual_int_length()
1156 .expect("returns a value for variable-length-encoded integer tags"),
1157 ));
1158 Datum::Int64(i)
1159 }
1160 Tag::UInt64_0
1161 | Tag::UInt64_8
1162 | Tag::UInt64_16
1163 | Tag::UInt64_24
1164 | Tag::UInt64_32
1165 | Tag::UInt64_40
1166 | Tag::UInt64_48
1167 | Tag::UInt64_56
1168 | Tag::UInt64_64 => {
1169 let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1170 data,
1171 tag.actual_int_length()
1172 .expect("returns a value for variable-length-encoded integer tags"),
1173 ));
1174 Datum::UInt64(i)
1175 }
1176 Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1177 let i = i16::from_le_bytes(read_byte_array_extending_negative(
1181 data,
1182 tag.actual_int_length()
1183 .expect("returns a value for variable-length-encoded integer tags"),
1184 ));
1185 Datum::Int16(i)
1186 }
1187 Tag::NegativeInt32_0
1188 | Tag::NegativeInt32_32
1189 | Tag::NegativeInt32_8
1190 | Tag::NegativeInt32_16
1191 | Tag::NegativeInt32_24 => {
1192 let i = i32::from_le_bytes(read_byte_array_extending_negative(
1196 data,
1197 tag.actual_int_length()
1198 .expect("returns a value for variable-length-encoded integer tags"),
1199 ));
1200 Datum::Int32(i)
1201 }
1202 Tag::NegativeInt64_0
1203 | Tag::NegativeInt64_64
1204 | Tag::NegativeInt64_8
1205 | Tag::NegativeInt64_16
1206 | Tag::NegativeInt64_24
1207 | Tag::NegativeInt64_32
1208 | Tag::NegativeInt64_40
1209 | Tag::NegativeInt64_48
1210 | Tag::NegativeInt64_56 => {
1211 let i = i64::from_le_bytes(read_byte_array_extending_negative(
1215 data,
1216 tag.actual_int_length()
1217 .expect("returns a value for variable-length-encoded integer tags"),
1218 ));
1219 Datum::Int64(i)
1220 }
1221
1222 Tag::UInt8 => {
1223 let i = u8::from_le_bytes(read_byte_array(data));
1224 Datum::UInt8(i)
1225 }
1226 Tag::UInt16 => {
1227 let i = u16::from_le_bytes(read_byte_array(data));
1228 Datum::UInt16(i)
1229 }
1230 Tag::UInt32 => {
1231 let i = u32::from_le_bytes(read_byte_array(data));
1232 Datum::UInt32(i)
1233 }
1234 Tag::UInt64 => {
1235 let i = u64::from_le_bytes(read_byte_array(data));
1236 Datum::UInt64(i)
1237 }
1238 Tag::Float32 => {
1239 let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1240 Datum::Float32(OrderedFloat::from(f))
1241 }
1242 Tag::Float64 => {
1243 let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1244 Datum::Float64(OrderedFloat::from(f))
1245 }
1246 Tag::Date => Datum::Date(read_date(data)),
1247 Tag::Time => Datum::Time(read_time(data)),
1248 Tag::CheapTimestamp => {
1249 let ts = i64::from_le_bytes(read_byte_array(data));
1250 let secs = ts.div_euclid(1_000_000_000);
1251 let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1252 let ndt = DateTime::from_timestamp(secs, nsecs)
1253 .expect("We only write round-trippable timestamps")
1254 .naive_utc();
1255 Datum::Timestamp(
1256 CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1257 )
1258 }
1259 Tag::CheapTimestampTz => {
1260 let ts = i64::from_le_bytes(read_byte_array(data));
1261 let secs = ts.div_euclid(1_000_000_000);
1262 let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1263 let dt = DateTime::from_timestamp(secs, nsecs)
1264 .expect("We only write round-trippable timestamps");
1265 Datum::TimestampTz(
1266 CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1267 )
1268 }
1269 Tag::Timestamp => {
1270 let date = read_naive_date(data);
1271 let time = read_time(data);
1272 Datum::Timestamp(
1273 CheckedTimestamp::from_timestamplike(date.and_time(time))
1274 .expect("unexpected timestamp"),
1275 )
1276 }
1277 Tag::TimestampTz => {
1278 let date = read_naive_date(data);
1279 let time = read_time(data);
1280 Datum::TimestampTz(
1281 CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1282 date.and_time(time),
1283 Utc,
1284 ))
1285 .expect("unexpected timestamptz"),
1286 )
1287 }
1288 Tag::Interval => {
1289 let months = i32::from_le_bytes(read_byte_array(data));
1290 let days = i32::from_le_bytes(read_byte_array(data));
1291 let micros = i64::from_le_bytes(read_byte_array(data));
1292 Datum::Interval(Interval {
1293 months,
1294 days,
1295 micros,
1296 })
1297 }
1298 Tag::BytesTiny
1299 | Tag::BytesShort
1300 | Tag::BytesLong
1301 | Tag::BytesHuge
1302 | Tag::StringTiny
1303 | Tag::StringShort
1304 | Tag::StringLong
1305 | Tag::StringHuge
1306 | Tag::ListTiny
1307 | Tag::ListShort
1308 | Tag::ListLong
1309 | Tag::ListHuge => read_lengthed_datum(data, tag),
1310 Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1311 Tag::Array => {
1312 let ndims = read_byte(data);
1315 let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1316 let (dims, next) = data.split_at(dims_size);
1317 *data = next;
1318 let bytes = read_untagged_bytes(data);
1319 Datum::Array(Array {
1320 dims: ArrayDimensions { data: dims },
1321 elements: DatumList { data: bytes },
1322 })
1323 }
1324 Tag::Dict => {
1325 let bytes = read_untagged_bytes(data);
1326 Datum::Map(DatumMap { data: bytes })
1327 }
1328 Tag::JsonNull => Datum::JsonNull,
1329 Tag::Dummy => Datum::Dummy,
1330 Tag::Numeric => {
1331 let digits = read_byte(data).into();
1332 let exponent = i8::reinterpret_cast(read_byte(data));
1333 let bits = read_byte(data);
1334
1335 let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1336 let lsu_u8_len = lsu_u16_len * 2;
1337 let (lsu_u8, next) = data.split_at(lsu_u8_len);
1338 *data = next;
1339
1340 let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1344 for (i, c) in lsu_u8.chunks(2).enumerate() {
1345 lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1346 }
1347
1348 let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1349 Datum::from(d)
1350 }
1351 Tag::MzTimestamp => {
1352 let t = Timestamp::decode(read_byte_array(data));
1353 Datum::MzTimestamp(t)
1354 }
1355 Tag::Range => {
1356 let flag_byte = read_byte(data);
1358 let flags = range::InternalFlags::from_bits(flag_byte)
1359 .expect("range flags must be encoded validly");
1360
1361 if flags.contains(range::InternalFlags::EMPTY) {
1362 assert!(
1363 flags == range::InternalFlags::EMPTY,
1364 "empty ranges contain only RANGE_EMPTY flag"
1365 );
1366
1367 return Datum::Range(Range { inner: None });
1368 }
1369
1370 let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1371 None
1372 } else {
1373 Some(DatumNested::extract(data))
1374 };
1375
1376 let lower = RangeBound {
1377 inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1378 bound: lower_bound,
1379 };
1380
1381 let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1382 None
1383 } else {
1384 Some(DatumNested::extract(data))
1385 };
1386
1387 let upper = RangeBound {
1388 inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1389 bound: upper_bound,
1390 };
1391
1392 Datum::Range(Range {
1393 inner: Some(RangeInner { lower, upper }),
1394 })
1395 }
1396 Tag::MzAclItem => {
1397 const N: usize = MzAclItem::binary_size();
1398 let mz_acl_item =
1399 MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1400 Datum::MzAclItem(mz_acl_item)
1401 }
1402 Tag::AclItem => {
1403 const N: usize = AclItem::binary_size();
1404 let acl_item =
1405 AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1406 Datum::AclItem(acl_item)
1407 }
1408 }
1409}
1410
1411fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1415where
1416 D: Vector<u8>,
1417{
1418 let len = u64::cast_from(bytes.len());
1419 data.extend_from_slice(&len.to_le_bytes());
1420 data.extend_from_slice(bytes);
1421}
1422
1423fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1424where
1425 D: Vector<u8>,
1426{
1427 match tag {
1428 Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1429 let len = bytes.len().to_le_bytes();
1430 data.push(len[0]);
1431 }
1432 Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1433 let len = bytes.len().to_le_bytes();
1434 data.extend_from_slice(&len[0..2]);
1435 }
1436 Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1437 let len = bytes.len().to_le_bytes();
1438 data.extend_from_slice(&len[0..4]);
1439 }
1440 Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1441 let len = bytes.len().to_le_bytes();
1442 data.extend_from_slice(&len);
1443 }
1444 _ => unreachable!(),
1445 }
1446 data.extend_from_slice(bytes);
1447}
1448
1449pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1450 i32::to_le_bytes(date.pg_epoch_days())
1451}
1452
1453fn push_date<D>(data: &mut D, date: Date)
1454where
1455 D: Vector<u8>,
1456{
1457 data.extend_from_slice(&date_to_array(date));
1458}
1459
1460pub(super) fn naive_date_to_arrays(
1461 date: NaiveDate,
1462) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1463 (
1464 i32::to_le_bytes(date.year()),
1465 u32::to_le_bytes(date.ordinal()),
1466 )
1467}
1468
1469fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1470where
1471 D: Vector<u8>,
1472{
1473 let (ds1, ds2) = naive_date_to_arrays(date);
1474 data.extend_from_slice(&ds1);
1475 data.extend_from_slice(&ds2);
1476}
1477
1478pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1479 (
1480 u32::to_le_bytes(time.num_seconds_from_midnight()),
1481 u32::to_le_bytes(time.nanosecond()),
1482 )
1483}
1484
1485fn push_time<D>(data: &mut D, time: NaiveTime)
1486where
1487 D: Vector<u8>,
1488{
1489 let (ts1, ts2) = time_to_arrays(time);
1490 data.extend_from_slice(&ts1);
1491 data.extend_from_slice(&ts2);
1492}
1493
1494fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1504 let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1505 if subsec_nanos >= 1_000_000_000 {
1506 return None;
1507 }
1508 let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1509 as_ns.checked_add(i64::from(subsec_nanos))
1510}
1511
1512#[inline(always)]
1518#[allow(clippy::as_conversions)]
1519fn min_bytes_signed<T>(i: T) -> u8
1520where
1521 T: Into<i64>,
1522{
1523 let i: i64 = i.into();
1524
1525 let n_sign_bits = if i.is_negative() {
1529 i.leading_ones() as u8
1530 } else {
1531 i.leading_zeros() as u8
1532 };
1533
1534 (64 - n_sign_bits + 7) / 8
1535}
1536
1537#[inline(always)]
1545#[allow(clippy::as_conversions)]
1546fn min_bytes_unsigned<T>(i: T) -> u8
1547where
1548 T: Into<u64>,
1549{
1550 let i: u64 = i.into();
1551
1552 let n_sign_bits = i.leading_zeros() as u8;
1553
1554 (64 - n_sign_bits + 7) / 8
1555}
1556
1557const TINY: usize = 1 << 8;
1558const SHORT: usize = 1 << 16;
1559const LONG: usize = 1 << 32;
1560
1561fn push_datum<D>(data: &mut D, datum: Datum)
1562where
1563 D: Vector<u8>,
1564{
1565 match datum {
1566 Datum::Null => data.push(Tag::Null.into()),
1567 Datum::False => data.push(Tag::False.into()),
1568 Datum::True => data.push(Tag::True.into()),
1569 Datum::Int16(i) => {
1570 let mbs = min_bytes_signed(i);
1571 let tag = u8::from(if i.is_negative() {
1572 Tag::NegativeInt16_0
1573 } else {
1574 Tag::NonNegativeInt16_0
1575 }) + mbs;
1576
1577 data.push(tag);
1578 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1579 }
1580 Datum::Int32(i) => {
1581 let mbs = min_bytes_signed(i);
1582 let tag = u8::from(if i.is_negative() {
1583 Tag::NegativeInt32_0
1584 } else {
1585 Tag::NonNegativeInt32_0
1586 }) + mbs;
1587
1588 data.push(tag);
1589 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1590 }
1591 Datum::Int64(i) => {
1592 let mbs = min_bytes_signed(i);
1593 let tag = u8::from(if i.is_negative() {
1594 Tag::NegativeInt64_0
1595 } else {
1596 Tag::NonNegativeInt64_0
1597 }) + mbs;
1598
1599 data.push(tag);
1600 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1601 }
1602 Datum::UInt8(i) => {
1603 let mbu = min_bytes_unsigned(i);
1604 let tag = u8::from(Tag::UInt8_0) + mbu;
1605 data.push(tag);
1606 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1607 }
1608 Datum::UInt16(i) => {
1609 let mbu = min_bytes_unsigned(i);
1610 let tag = u8::from(Tag::UInt16_0) + mbu;
1611 data.push(tag);
1612 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1613 }
1614 Datum::UInt32(i) => {
1615 let mbu = min_bytes_unsigned(i);
1616 let tag = u8::from(Tag::UInt32_0) + mbu;
1617 data.push(tag);
1618 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1619 }
1620 Datum::UInt64(i) => {
1621 let mbu = min_bytes_unsigned(i);
1622 let tag = u8::from(Tag::UInt64_0) + mbu;
1623 data.push(tag);
1624 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1625 }
1626 Datum::Float32(f) => {
1627 data.push(Tag::Float32.into());
1628 data.extend_from_slice(&f.to_bits().to_le_bytes());
1629 }
1630 Datum::Float64(f) => {
1631 data.push(Tag::Float64.into());
1632 data.extend_from_slice(&f.to_bits().to_le_bytes());
1633 }
1634 Datum::Date(d) => {
1635 data.push(Tag::Date.into());
1636 push_date(data, d);
1637 }
1638 Datum::Time(t) => {
1639 data.push(Tag::Time.into());
1640 push_time(data, t);
1641 }
1642 Datum::Timestamp(t) => {
1643 let datetime = t.to_naive();
1644 if let Some(nanos) = checked_timestamp_nanos(datetime) {
1645 data.push(Tag::CheapTimestamp.into());
1646 data.extend_from_slice(&nanos.to_le_bytes());
1647 } else {
1648 data.push(Tag::Timestamp.into());
1649 push_naive_date(data, datetime.date());
1650 push_time(data, datetime.time());
1651 }
1652 }
1653 Datum::TimestampTz(t) => {
1654 let datetime = t.to_naive();
1655 if let Some(nanos) = checked_timestamp_nanos(datetime) {
1656 data.push(Tag::CheapTimestampTz.into());
1657 data.extend_from_slice(&nanos.to_le_bytes());
1658 } else {
1659 data.push(Tag::TimestampTz.into());
1660 push_naive_date(data, datetime.date());
1661 push_time(data, datetime.time());
1662 }
1663 }
1664 Datum::Interval(i) => {
1665 data.push(Tag::Interval.into());
1666 data.extend_from_slice(&i.months.to_le_bytes());
1667 data.extend_from_slice(&i.days.to_le_bytes());
1668 data.extend_from_slice(&i.micros.to_le_bytes());
1669 }
1670 Datum::Bytes(bytes) => {
1671 let tag = match bytes.len() {
1672 0..TINY => Tag::BytesTiny,
1673 TINY..SHORT => Tag::BytesShort,
1674 SHORT..LONG => Tag::BytesLong,
1675 _ => Tag::BytesHuge,
1676 };
1677 data.push(tag.into());
1678 push_lengthed_bytes(data, bytes, tag);
1679 }
1680 Datum::String(string) => {
1681 let tag = match string.len() {
1682 0..TINY => Tag::StringTiny,
1683 TINY..SHORT => Tag::StringShort,
1684 SHORT..LONG => Tag::StringLong,
1685 _ => Tag::StringHuge,
1686 };
1687 data.push(tag.into());
1688 push_lengthed_bytes(data, string.as_bytes(), tag);
1689 }
1690 Datum::List(list) => {
1691 let tag = match list.data.len() {
1692 0..TINY => Tag::ListTiny,
1693 TINY..SHORT => Tag::ListShort,
1694 SHORT..LONG => Tag::ListLong,
1695 _ => Tag::ListHuge,
1696 };
1697 data.push(tag.into());
1698 push_lengthed_bytes(data, list.data, tag);
1699 }
1700 Datum::Uuid(u) => {
1701 data.push(Tag::Uuid.into());
1702 data.extend_from_slice(u.as_bytes());
1703 }
1704 Datum::Array(array) => {
1705 data.push(Tag::Array.into());
1708 data.push(array.dims.ndims());
1709 data.extend_from_slice(array.dims.data);
1710 push_untagged_bytes(data, array.elements.data);
1711 }
1712 Datum::Map(dict) => {
1713 data.push(Tag::Dict.into());
1714 push_untagged_bytes(data, dict.data);
1715 }
1716 Datum::JsonNull => data.push(Tag::JsonNull.into()),
1717 Datum::MzTimestamp(t) => {
1718 data.push(Tag::MzTimestamp.into());
1719 data.extend_from_slice(&t.encode());
1720 }
1721 Datum::Dummy => data.push(Tag::Dummy.into()),
1722 Datum::Numeric(mut n) => {
1723 numeric::cx_datum().reduce(&mut n.0);
1728 let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1729 data.push(Tag::Numeric.into());
1730 data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1731 data.push(
1732 i8::try_from(exponent)
1733 .expect("exponent to fit within i8; should not exceed +/- 39")
1734 .to_le_bytes()[0],
1735 );
1736 data.push(bits);
1737
1738 let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1739
1740 if cfg!(target_endian = "little") {
1742 let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
1745 soft_assert_no_log!(
1748 lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
1749 "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
1750 Numeric::digits_to_lsu_elements_len(digits) * 2,
1751 lsu_bytes.len()
1752 );
1753 soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
1755 data.extend_from_slice(lsu_bytes);
1756 } else {
1757 for u in lsu {
1758 data.extend_from_slice(&u.to_le_bytes());
1759 }
1760 }
1761 }
1762 Datum::Range(range) => {
1763 data.push(Tag::Range.into());
1765 data.push(range.internal_flag_bits());
1766
1767 if let Some(RangeInner { lower, upper }) = range.inner {
1768 for bound in [lower.bound, upper.bound] {
1769 if let Some(bound) = bound {
1770 match bound.datum() {
1771 Datum::Null => panic!("cannot push Datum::Null into range"),
1772 d => push_datum::<D>(data, d),
1773 }
1774 }
1775 }
1776 }
1777 }
1778 Datum::MzAclItem(mz_acl_item) => {
1779 data.push(Tag::MzAclItem.into());
1780 data.extend_from_slice(&mz_acl_item.encode_binary());
1781 }
1782 Datum::AclItem(acl_item) => {
1783 data.push(Tag::AclItem.into());
1784 data.extend_from_slice(&acl_item.encode_binary());
1785 }
1786 }
1787}
1788
1789pub fn row_size<'a, I>(a: I) -> usize
1791where
1792 I: IntoIterator<Item = Datum<'a>>,
1793{
1794 let sz = datums_size::<_, _>(a);
1799 let size_of_row = std::mem::size_of::<Row>();
1800 if sz > Row::SIZE {
1804 sz + size_of_row
1805 } else {
1806 size_of_row
1807 }
1808}
1809
1810pub fn datum_size(datum: &Datum) -> usize {
1813 match datum {
1814 Datum::Null => 1,
1815 Datum::False => 1,
1816 Datum::True => 1,
1817 Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
1818 Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
1819 Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
1820 Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1821 Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1822 Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1823 Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1824 Datum::Float32(_) => 1 + size_of::<f32>(),
1825 Datum::Float64(_) => 1 + size_of::<f64>(),
1826 Datum::Date(_) => 1 + size_of::<i32>(),
1827 Datum::Time(_) => 1 + 8,
1828 Datum::Timestamp(t) => {
1829 1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
1830 8
1831 } else {
1832 16
1833 }
1834 }
1835 Datum::TimestampTz(t) => {
1836 1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
1837 8
1838 } else {
1839 16
1840 }
1841 }
1842 Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
1843 Datum::Bytes(bytes) => {
1844 let bytes_for_length = match bytes.len() {
1846 0..TINY => 1,
1847 TINY..SHORT => 2,
1848 SHORT..LONG => 4,
1849 _ => 8,
1850 };
1851 1 + bytes_for_length + bytes.len()
1852 }
1853 Datum::String(string) => {
1854 let bytes_for_length = match string.len() {
1856 0..TINY => 1,
1857 TINY..SHORT => 2,
1858 SHORT..LONG => 4,
1859 _ => 8,
1860 };
1861 1 + bytes_for_length + string.len()
1862 }
1863 Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
1864 Datum::Array(array) => {
1865 1 + size_of::<u8>()
1866 + array.dims.data.len()
1867 + size_of::<u64>()
1868 + array.elements.data.len()
1869 }
1870 Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
1871 Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
1872 Datum::JsonNull => 1,
1873 Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
1874 Datum::Dummy => 1,
1875 Datum::Numeric(d) => {
1876 let mut d = d.0.clone();
1877 numeric::cx_datum().reduce(&mut d);
1880 4 + (d.coefficient_units().len() * 2)
1882 }
1883 Datum::Range(Range { inner }) => {
1884 2 + match inner {
1886 None => 0,
1887 Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
1888 .iter()
1889 .map(|bound| match bound {
1890 None => 0,
1891 Some(bound) => bound.val.len(),
1892 })
1893 .sum(),
1894 }
1895 }
1896 Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
1897 Datum::AclItem(_) => 1 + AclItem::binary_size(),
1898 }
1899}
1900
1901pub fn datums_size<'a, I, D>(iter: I) -> usize
1906where
1907 I: IntoIterator<Item = D>,
1908 D: Borrow<Datum<'a>>,
1909{
1910 iter.into_iter().map(|d| datum_size(d.borrow())).sum()
1911}
1912
1913pub fn datum_list_size<'a, I, D>(iter: I) -> usize
1918where
1919 I: IntoIterator<Item = D>,
1920 D: Borrow<Datum<'a>>,
1921{
1922 1 + size_of::<u64>() + datums_size(iter)
1923}
1924
1925impl RowPacker<'_> {
1926 pub fn for_existing_row(row: &mut Row) -> RowPacker {
1933 RowPacker { row }
1934 }
1935
1936 #[inline]
1938 pub fn push<'a, D>(&mut self, datum: D)
1939 where
1940 D: Borrow<Datum<'a>>,
1941 {
1942 push_datum(&mut self.row.data, *datum.borrow());
1943 }
1944
1945 #[inline]
1947 pub fn extend<'a, I, D>(&mut self, iter: I)
1948 where
1949 I: IntoIterator<Item = D>,
1950 D: Borrow<Datum<'a>>,
1951 {
1952 for datum in iter {
1953 push_datum(&mut self.row.data, *datum.borrow())
1954 }
1955 }
1956
1957 #[inline]
1963 pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
1964 where
1965 I: IntoIterator<Item = Result<D, E>>,
1966 D: Borrow<Datum<'a>>,
1967 {
1968 for datum in iter {
1969 push_datum(&mut self.row.data, *datum?.borrow());
1970 }
1971 Ok(())
1972 }
1973
1974 pub fn extend_by_row(&mut self, row: &Row) {
1976 self.row.data.extend_from_slice(row.data.as_slice());
1977 }
1978
1979 #[inline]
1987 pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
1988 self.row.data.extend_from_slice(data)
1989 }
1990
1991 #[inline]
2013 pub fn push_list_with<F, R>(&mut self, f: F) -> R
2014 where
2015 F: FnOnce(&mut RowPacker) -> R,
2016 {
2017 let start = self.row.data.len();
2020 self.row.data.push(Tag::ListTiny.into());
2021 self.row.data.push(0);
2023
2024 let out = f(self);
2025
2026 let len = self.row.data.len() - start - 1 - 1;
2028 if len < TINY {
2030 self.row.data[start + 1] = len.to_le_bytes()[0];
2032 } else {
2033 long_list(&mut self.row.data, start, len);
2036 }
2037
2038 #[cold]
2045 fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2046 let long_list_inner = |data: &mut CompactBytes, len_len| {
2049 const ZEROS: [u8; 8] = [0; 8];
2052 data.extend_from_slice(&ZEROS[0..len_len - 1]);
2053 data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2062 data[start + 1..start + 1 + len_len]
2064 .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2065 };
2066 match len {
2067 0..TINY => {
2068 unreachable!()
2069 }
2070 TINY..SHORT => {
2071 data[start] = Tag::ListShort.into();
2072 long_list_inner(data, 2);
2073 }
2074 SHORT..LONG => {
2075 data[start] = Tag::ListLong.into();
2076 long_list_inner(data, 4);
2077 }
2078 _ => {
2079 data[start] = Tag::ListHuge.into();
2080 long_list_inner(data, 8);
2081 }
2082 };
2083 }
2084
2085 out
2086 }
2087
2088 pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2126 where
2127 F: FnOnce(&mut RowPacker) -> R,
2128 {
2129 self.row.data.push(Tag::Dict.into());
2130 let start = self.row.data.len();
2131 self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2133
2134 let res = f(self);
2135
2136 let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2137 self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2139
2140 res
2141 }
2142
2143 pub fn try_push_array<'a, I, D>(
2150 &mut self,
2151 dims: &[ArrayDimension],
2152 iter: I,
2153 ) -> Result<(), InvalidArrayError>
2154 where
2155 I: IntoIterator<Item = D>,
2156 D: Borrow<Datum<'a>>,
2157 {
2158 unsafe {
2160 self.push_array_with_unchecked(dims, |packer| {
2161 let mut nelements = 0;
2162 for datum in iter {
2163 packer.push(datum);
2164 nelements += 1;
2165 }
2166 Ok::<_, InvalidArrayError>(nelements)
2167 })
2168 }
2169 }
2170
2171 pub unsafe fn push_array_with_unchecked<F, E>(
2180 &mut self,
2181 dims: &[ArrayDimension],
2182 f: F,
2183 ) -> Result<(), E>
2184 where
2185 F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2186 E: From<InvalidArrayError>,
2187 {
2188 if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2200 return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2201 }
2202
2203 let start = self.row.data.len();
2204 self.row.data.push(Tag::Array.into());
2205
2206 self.row
2208 .data
2209 .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2210 for dim in dims {
2211 self.row
2212 .data
2213 .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2214 self.row
2215 .data
2216 .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2217 }
2218
2219 let off = self.row.data.len();
2221 self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2222 let nelements = match f(self) {
2223 Ok(nelements) => nelements,
2224 Err(e) => {
2225 self.row.data.truncate(start);
2226 return Err(e);
2227 }
2228 };
2229 let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2230 self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2231
2232 let cardinality = match dims {
2235 [] => 0,
2236 dims => dims.iter().map(|d| d.length).product(),
2237 };
2238 if nelements != cardinality {
2239 self.row.data.truncate(start);
2240 return Err(InvalidArrayError::WrongCardinality {
2241 actual: nelements,
2242 expected: cardinality,
2243 }
2244 .into());
2245 }
2246
2247 Ok(())
2248 }
2249
2250 pub fn push_array_with_row_major<F, I>(
2260 &mut self,
2261 dims: I,
2262 f: F,
2263 ) -> Result<(), InvalidArrayError>
2264 where
2265 I: IntoIterator<Item = ArrayDimension>,
2266 F: FnOnce(&mut RowPacker) -> usize,
2267 {
2268 let start = self.row.data.len();
2269 self.row.data.push(Tag::Array.into());
2270
2271 let dims_start = self.row.data.len();
2273 self.row.data.push(42);
2274
2275 let mut num_dims: u8 = 0;
2276 let mut cardinality: usize = 1;
2277 for dim in dims {
2278 num_dims += 1;
2279 cardinality *= dim.length;
2280
2281 self.row
2282 .data
2283 .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2284 self.row
2285 .data
2286 .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2287 }
2288
2289 if num_dims > MAX_ARRAY_DIMENSIONS {
2290 self.row.data.truncate(start);
2292 return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2293 }
2294 self.row.data[dims_start..dims_start + size_of::<u8>()]
2296 .copy_from_slice(&num_dims.to_le_bytes());
2297
2298 let off = self.row.data.len();
2300 self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2301
2302 let nelements = f(self);
2303
2304 let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2305 self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2306
2307 let cardinality = match num_dims {
2310 0 => 0,
2311 _ => cardinality,
2312 };
2313 if nelements != cardinality {
2314 self.row.data.truncate(start);
2315 return Err(InvalidArrayError::WrongCardinality {
2316 actual: nelements,
2317 expected: cardinality,
2318 });
2319 }
2320
2321 Ok(())
2322 }
2323
2324 pub fn push_list<'a, I, D>(&mut self, iter: I)
2328 where
2329 I: IntoIterator<Item = D>,
2330 D: Borrow<Datum<'a>>,
2331 {
2332 self.push_list_with(|packer| {
2333 for elem in iter {
2334 packer.push(*elem.borrow())
2335 }
2336 });
2337 }
2338
2339 pub fn push_dict<'a, I, D>(&mut self, iter: I)
2341 where
2342 I: IntoIterator<Item = (&'a str, D)>,
2343 D: Borrow<Datum<'a>>,
2344 {
2345 self.push_dict_with(|packer| {
2346 for (k, v) in iter {
2347 packer.push(Datum::String(k));
2348 packer.push(*v.borrow())
2349 }
2350 })
2351 }
2352
2353 pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2369 range.canonicalize()?;
2370 match range.inner {
2371 None => {
2372 self.row.data.push(Tag::Range.into());
2373 self.row.data.push(range::InternalFlags::EMPTY.bits());
2375 Ok(())
2376 }
2377 Some(inner) => self.push_range_with(
2378 RangeLowerBound {
2379 inclusive: inner.lower.inclusive,
2380 bound: inner
2381 .lower
2382 .bound
2383 .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2384 },
2385 RangeUpperBound {
2386 inclusive: inner.upper.inclusive,
2387 bound: inner
2388 .upper
2389 .bound
2390 .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2391 },
2392 ),
2393 }
2394 }
2395
2396 pub fn push_range_with<L, U, E>(
2419 &mut self,
2420 lower: RangeLowerBound<L>,
2421 upper: RangeUpperBound<U>,
2422 ) -> Result<(), E>
2423 where
2424 L: FnOnce(&mut RowPacker) -> Result<(), E>,
2425 U: FnOnce(&mut RowPacker) -> Result<(), E>,
2426 E: From<InvalidRangeError>,
2427 {
2428 let start = self.row.data.len();
2429 self.row.data.push(Tag::Range.into());
2430
2431 let mut flags = range::InternalFlags::empty();
2432
2433 flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2434 flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2435 flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2436 flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2437
2438 let mut expected_datums = 0;
2439
2440 self.row.data.push(flags.bits());
2441
2442 let datum_check = self.row.data.len();
2443
2444 if let Some(value) = lower.bound {
2445 let start = self.row.data.len();
2446 value(self)?;
2447 assert!(
2448 start < self.row.data.len(),
2449 "finite values must each push exactly one value; expected 1 but got 0"
2450 );
2451 expected_datums += 1;
2452 }
2453
2454 if let Some(value) = upper.bound {
2455 let start = self.row.data.len();
2456 value(self)?;
2457 assert!(
2458 start < self.row.data.len(),
2459 "finite values must each push exactly one value; expected 1 but got 0"
2460 );
2461 expected_datums += 1;
2462 }
2463
2464 let mut actual_datums = 0;
2468 let mut seen = None;
2469 let mut dataz = &self.row.data[datum_check..];
2470 while !dataz.is_empty() {
2471 let d = unsafe { read_datum(&mut dataz) };
2472 assert!(d != Datum::Null, "cannot push Datum::Null into range");
2473
2474 match seen {
2475 None => seen = Some(d),
2476 Some(seen) => {
2477 let seen_kind = DatumKind::from(seen);
2478 let d_kind = DatumKind::from(d);
2479 assert!(
2480 seen_kind == d_kind,
2481 "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2482 );
2483
2484 if seen > d {
2485 self.row.data.truncate(start);
2486 return Err(InvalidRangeError::MisorderedRangeBounds.into());
2487 }
2488 }
2489 }
2490 actual_datums += 1;
2491 }
2492
2493 assert!(
2494 actual_datums == expected_datums,
2495 "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2496 );
2497
2498 Ok(())
2499 }
2500
2501 pub fn clear(&mut self) {
2503 self.row.data.clear();
2504 }
2505
2506 pub unsafe fn truncate(&mut self, pos: usize) {
2519 self.row.data.truncate(pos)
2520 }
2521
2522 pub fn truncate_datums(&mut self, n: usize) {
2524 let prev_len = self.row.data.len();
2525 let mut iter = self.row.iter();
2526 for _ in iter.by_ref().take(n) {}
2527 let next_len = iter.data.len();
2528 unsafe { self.truncate(prev_len - next_len) }
2530 }
2531
2532 pub fn byte_len(&self) -> usize {
2534 self.row.byte_len()
2535 }
2536}
2537
2538impl<'a> IntoIterator for &'a Row {
2539 type Item = Datum<'a>;
2540 type IntoIter = DatumListIter<'a>;
2541 fn into_iter(self) -> DatumListIter<'a> {
2542 self.iter()
2543 }
2544}
2545
2546impl fmt::Debug for Row {
2547 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2549 f.write_str("Row{")?;
2550 f.debug_list().entries(self.iter()).finish()?;
2551 f.write_str("}")
2552 }
2553}
2554
2555impl fmt::Display for Row {
2556 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2558 f.write_str("(")?;
2559 for (i, datum) in self.iter().enumerate() {
2560 if i != 0 {
2561 f.write_str(", ")?;
2562 }
2563 write!(f, "{}", datum)?;
2564 }
2565 f.write_str(")")
2566 }
2567}
2568
2569impl<'a> DatumList<'a> {
2570 pub fn empty() -> DatumList<'static> {
2571 DatumList { data: &[] }
2572 }
2573
2574 pub fn iter(&self) -> DatumListIter<'a> {
2575 DatumListIter { data: self.data }
2576 }
2577
2578 pub fn data(&self) -> &'a [u8] {
2580 self.data
2581 }
2582}
2583
2584impl<'a> IntoIterator for &'a DatumList<'a> {
2585 type Item = Datum<'a>;
2586 type IntoIter = DatumListIter<'a>;
2587 fn into_iter(self) -> DatumListIter<'a> {
2588 self.iter()
2589 }
2590}
2591
2592impl<'a> Iterator for DatumListIter<'a> {
2593 type Item = Datum<'a>;
2594 fn next(&mut self) -> Option<Self::Item> {
2595 if self.data.is_empty() {
2596 None
2597 } else {
2598 Some(unsafe { read_datum(&mut self.data) })
2599 }
2600 }
2601}
2602
2603impl<'a> DatumMap<'a> {
2604 pub fn empty() -> DatumMap<'static> {
2605 DatumMap { data: &[] }
2606 }
2607
2608 pub fn iter(&self) -> DatumDictIter<'a> {
2609 DatumDictIter {
2610 data: self.data,
2611 prev_key: None,
2612 }
2613 }
2614
2615 pub fn data(&self) -> &'a [u8] {
2617 self.data
2618 }
2619}
2620
2621impl<'a> Debug for DatumMap<'a> {
2622 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2623 f.debug_map().entries(self.iter()).finish()
2624 }
2625}
2626
2627impl<'a> IntoIterator for &'a DatumMap<'a> {
2628 type Item = (&'a str, Datum<'a>);
2629 type IntoIter = DatumDictIter<'a>;
2630 fn into_iter(self) -> DatumDictIter<'a> {
2631 self.iter()
2632 }
2633}
2634
2635impl<'a> Iterator for DatumDictIter<'a> {
2636 type Item = (&'a str, Datum<'a>);
2637 fn next(&mut self) -> Option<Self::Item> {
2638 if self.data.is_empty() {
2639 None
2640 } else {
2641 let key_tag =
2642 Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2643 assert!(
2644 key_tag == Tag::StringTiny
2645 || key_tag == Tag::StringShort
2646 || key_tag == Tag::StringLong
2647 || key_tag == Tag::StringHuge,
2648 "Dict keys must be strings, got {:?}",
2649 key_tag
2650 );
2651 let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
2652 let val = unsafe { read_datum(&mut self.data) };
2653
2654 if cfg!(debug_assertions) {
2656 if let Some(prev_key) = self.prev_key {
2657 debug_assert!(
2658 prev_key < key,
2659 "Dict keys must be unique and given in ascending order: {} came before {}",
2660 prev_key,
2661 key
2662 );
2663 }
2664 self.prev_key = Some(key);
2665 }
2666
2667 Some((key, val))
2668 }
2669 }
2670}
2671
2672impl RowArena {
2673 pub fn new() -> Self {
2674 RowArena {
2675 inner: RefCell::new(vec![]),
2676 }
2677 }
2678
2679 pub fn with_capacity(capacity: usize) -> Self {
2682 RowArena {
2683 inner: RefCell::new(Vec::with_capacity(capacity)),
2684 }
2685 }
2686
2687 pub fn reserve(&self, additional: usize) {
2690 self.inner.borrow_mut().reserve(additional);
2691 }
2692
2693 #[allow(clippy::transmute_ptr_to_ptr)]
2695 pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
2696 let mut inner = self.inner.borrow_mut();
2697 inner.push(bytes);
2698 let owned_bytes = &inner[inner.len() - 1];
2699 unsafe {
2700 transmute::<&[u8], &'a [u8]>(owned_bytes)
2709 }
2710 }
2711
2712 pub fn push_string<'a>(&'a self, string: String) -> &'a str {
2714 let owned_bytes = self.push_bytes(string.into_bytes());
2715 unsafe {
2716 std::str::from_utf8_unchecked(owned_bytes)
2718 }
2719 }
2720
2721 pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
2727 let mut inner = self.inner.borrow_mut();
2728 inner.push(row.data.into_vec());
2729 unsafe {
2730 let datum = read_datum(&mut &inner[inner.len() - 1][..]);
2740 transmute::<Datum<'_>, Datum<'a>>(datum)
2741 }
2742 }
2743
2744 fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
2747 let mut inner = self.inner.borrow_mut();
2748 inner.push(row.data.into_vec());
2749 unsafe {
2750 let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
2760 transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
2761 }
2762 }
2763
2764 pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
2776 where
2777 F: FnOnce(&mut RowPacker),
2778 {
2779 let mut row = Row::default();
2780 f(&mut row.packer());
2781 self.push_unary_row(row)
2782 }
2783
2784 pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
2787 where
2788 F: FnOnce(&mut RowPacker),
2789 {
2790 let mut row = Row::default();
2791 f(&mut row.packer());
2792 self.push_unary_row_datum_nested(row)
2793 }
2794
2795 pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
2797 where
2798 F: FnOnce(&mut RowPacker) -> Result<(), E>,
2799 {
2800 let mut row = Row::default();
2801 f(&mut row.packer())?;
2802 Ok(self.push_unary_row(row))
2803 }
2804
2805 pub fn clear(&mut self) {
2807 self.inner.borrow_mut().clear();
2808 }
2809}
2810
2811impl Default for RowArena {
2812 fn default() -> RowArena {
2813 RowArena::new()
2814 }
2815}
2816
2817#[derive(Debug)]
2835pub struct SharedRow(Row);
2836
2837impl SharedRow {
2838 thread_local! {
2839 static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
2844 }
2845
2846 pub fn get() -> Self {
2854 let mut row = Self::SHARED_ROW
2855 .take()
2856 .expect("attempted to borrow already borrowed SharedRow");
2857 row.packer();
2859 Self(row)
2860 }
2861
2862 pub fn pack<'a, I, D>(iter: I) -> Row
2864 where
2865 I: IntoIterator<Item = D>,
2866 D: Borrow<Datum<'a>>,
2867 {
2868 let mut row_builder = Self::get();
2869 let mut row_packer = row_builder.packer();
2870 row_packer.extend(iter);
2871 row_builder.clone()
2872 }
2873}
2874
2875impl std::ops::Deref for SharedRow {
2876 type Target = Row;
2877
2878 fn deref(&self) -> &Self::Target {
2879 &self.0
2880 }
2881}
2882
2883impl std::ops::DerefMut for SharedRow {
2884 fn deref_mut(&mut self) -> &mut Self::Target {
2885 &mut self.0
2886 }
2887}
2888
2889impl Drop for SharedRow {
2890 fn drop(&mut self) {
2891 Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
2894 }
2895}
2896
2897#[cfg(test)]
2898mod tests {
2899 use chrono::{DateTime, NaiveDate};
2900 use mz_ore::{assert_err, assert_none};
2901
2902 use crate::ScalarType;
2903
2904 use super::*;
2905
2906 #[mz_ore::test]
2907 fn test_assumptions() {
2908 assert_eq!(size_of::<Tag>(), 1);
2909 #[cfg(target_endian = "big")]
2910 {
2911 assert!(false);
2913 }
2914 }
2915
2916 #[mz_ore::test]
2917 fn miri_test_arena() {
2918 let arena = RowArena::new();
2919
2920 assert_eq!(arena.push_string("".to_owned()), "");
2921 assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
2922
2923 let empty: &[u8] = &[];
2924 assert_eq!(arena.push_bytes(vec![]), empty);
2925 assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
2926
2927 let mut row = Row::default();
2928 let mut packer = row.packer();
2929 packer.push_dict_with(|row| {
2930 row.push(Datum::String("a"));
2931 row.push_list_with(|row| {
2932 row.push(Datum::String("one"));
2933 row.push(Datum::String("two"));
2934 row.push(Datum::String("three"));
2935 });
2936 row.push(Datum::String("b"));
2937 row.push(Datum::String("c"));
2938 });
2939 assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
2940 }
2941
2942 #[mz_ore::test]
2943 fn miri_test_round_trip() {
2944 fn round_trip(datums: Vec<Datum>) {
2945 let row = Row::pack(datums.clone());
2946
2947 println!("{:?}", row.data());
2950
2951 let datums2 = row.iter().collect::<Vec<_>>();
2952 let datums3 = row.unpack();
2953 assert_eq!(datums, datums2);
2954 assert_eq!(datums, datums3);
2955 }
2956
2957 round_trip(vec![]);
2958 round_trip(
2959 ScalarType::enumerate()
2960 .iter()
2961 .flat_map(|r#type| r#type.interesting_datums())
2962 .collect(),
2963 );
2964 round_trip(vec![
2965 Datum::Null,
2966 Datum::Null,
2967 Datum::False,
2968 Datum::True,
2969 Datum::Int16(-21),
2970 Datum::Int32(-42),
2971 Datum::Int64(-2_147_483_648 - 42),
2972 Datum::UInt8(0),
2973 Datum::UInt8(1),
2974 Datum::UInt16(0),
2975 Datum::UInt16(1),
2976 Datum::UInt16(1 << 8),
2977 Datum::UInt32(0),
2978 Datum::UInt32(1),
2979 Datum::UInt32(1 << 8),
2980 Datum::UInt32(1 << 16),
2981 Datum::UInt32(1 << 24),
2982 Datum::UInt64(0),
2983 Datum::UInt64(1),
2984 Datum::UInt64(1 << 8),
2985 Datum::UInt64(1 << 16),
2986 Datum::UInt64(1 << 24),
2987 Datum::UInt64(1 << 32),
2988 Datum::UInt64(1 << 40),
2989 Datum::UInt64(1 << 48),
2990 Datum::UInt64(1 << 56),
2991 Datum::Float32(OrderedFloat::from(-42.12)),
2992 Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
2993 Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
2994 Datum::Timestamp(
2995 CheckedTimestamp::from_timestamplike(
2996 NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
2997 .unwrap()
2998 .and_hms_opt(14, 32, 11)
2999 .unwrap(),
3000 )
3001 .unwrap(),
3002 ),
3003 Datum::TimestampTz(
3004 CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3005 .unwrap(),
3006 ),
3007 Datum::Interval(Interval {
3008 months: 312,
3009 ..Default::default()
3010 }),
3011 Datum::Interval(Interval::new(0, 0, 1_012_312)),
3012 Datum::Bytes(&[]),
3013 Datum::Bytes(&[0, 2, 1, 255]),
3014 Datum::String(""),
3015 Datum::String("العَرَبِيَّة"),
3016 ]);
3017 }
3018
3019 #[mz_ore::test]
3020 fn test_array() {
3021 const DIM: ArrayDimension = ArrayDimension {
3024 lower_bound: 2,
3025 length: 2,
3026 };
3027 let mut row = Row::default();
3028 let mut packer = row.packer();
3029 packer
3030 .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3031 .unwrap();
3032 let arr1 = row.unpack_first().unwrap_array();
3033 assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3034 assert_eq!(
3035 arr1.elements().into_iter().collect::<Vec<_>>(),
3036 vec![Datum::Int32(1), Datum::Int32(2)]
3037 );
3038
3039 let row = Row::pack_slice(&[Datum::Array(arr1)]);
3042 let arr2 = row.unpack_first().unwrap_array();
3043 assert_eq!(arr1, arr2);
3044 }
3045
3046 #[mz_ore::test]
3047 fn test_multidimensional_array() {
3048 let datums = vec![
3049 Datum::Int32(1),
3050 Datum::Int32(2),
3051 Datum::Int32(3),
3052 Datum::Int32(4),
3053 Datum::Int32(5),
3054 Datum::Int32(6),
3055 Datum::Int32(7),
3056 Datum::Int32(8),
3057 ];
3058
3059 let mut row = Row::default();
3060 let mut packer = row.packer();
3061 packer
3062 .try_push_array(
3063 &[
3064 ArrayDimension {
3065 lower_bound: 1,
3066 length: 1,
3067 },
3068 ArrayDimension {
3069 lower_bound: 1,
3070 length: 4,
3071 },
3072 ArrayDimension {
3073 lower_bound: 1,
3074 length: 2,
3075 },
3076 ],
3077 &datums,
3078 )
3079 .unwrap();
3080 let array = row.unpack_first().unwrap_array();
3081 assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3082 }
3083
3084 #[mz_ore::test]
3085 fn test_array_max_dimensions() {
3086 let mut row = Row::default();
3087 let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3088
3089 let res = row.packer().try_push_array(
3091 &vec![
3092 ArrayDimension {
3093 lower_bound: 1,
3094 length: 1
3095 };
3096 max_dims + 1
3097 ],
3098 vec![Datum::Int32(4)],
3099 );
3100 assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3101 assert!(row.data.is_empty());
3102
3103 row.packer()
3106 .try_push_array(
3107 &vec![
3108 ArrayDimension {
3109 lower_bound: 1,
3110 length: 1
3111 };
3112 max_dims
3113 ],
3114 vec![Datum::Int32(4)],
3115 )
3116 .unwrap();
3117 }
3118
3119 #[mz_ore::test]
3120 fn test_array_wrong_cardinality() {
3121 let mut row = Row::default();
3122 let res = row.packer().try_push_array(
3123 &[
3124 ArrayDimension {
3125 lower_bound: 1,
3126 length: 2,
3127 },
3128 ArrayDimension {
3129 lower_bound: 1,
3130 length: 3,
3131 },
3132 ],
3133 vec![Datum::Int32(1), Datum::Int32(2)],
3134 );
3135 assert_eq!(
3136 res,
3137 Err(InvalidArrayError::WrongCardinality {
3138 actual: 2,
3139 expected: 6,
3140 })
3141 );
3142 assert!(row.data.is_empty());
3143 }
3144
3145 #[mz_ore::test]
3146 fn test_nesting() {
3147 let mut row = Row::default();
3148 row.packer().push_dict_with(|row| {
3149 row.push(Datum::String("favourites"));
3150 row.push_list_with(|row| {
3151 row.push(Datum::String("ice cream"));
3152 row.push(Datum::String("oreos"));
3153 row.push(Datum::String("cheesecake"));
3154 });
3155 row.push(Datum::String("name"));
3156 row.push(Datum::String("bob"));
3157 });
3158
3159 let mut iter = row.unpack_first().unwrap_map().iter();
3160
3161 let (k, v) = iter.next().unwrap();
3162 assert_eq!(k, "favourites");
3163 assert_eq!(
3164 v.unwrap_list().iter().collect::<Vec<_>>(),
3165 vec![
3166 Datum::String("ice cream"),
3167 Datum::String("oreos"),
3168 Datum::String("cheesecake"),
3169 ]
3170 );
3171
3172 let (k, v) = iter.next().unwrap();
3173 assert_eq!(k, "name");
3174 assert_eq!(v, Datum::String("bob"));
3175 }
3176
3177 #[mz_ore::test]
3178 fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3179 let pack = |ok| {
3180 let mut row = Row::default();
3181 row.packer().push_dict_with(|row| {
3182 if ok {
3183 row.push(Datum::String("key"));
3184 row.push(Datum::Int32(42));
3185 Ok(7)
3186 } else {
3187 Err("fail")
3188 }
3189 })?;
3190 Ok(row)
3191 };
3192
3193 assert_eq!(pack(false), Err("fail"));
3194
3195 let row = pack(true)?;
3196 let mut dict = row.unpack_first().unwrap_map().iter();
3197 assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3198 assert_eq!(dict.next(), None);
3199
3200 Ok(())
3201 }
3202
3203 #[mz_ore::test]
3204 #[cfg_attr(miri, ignore)] fn test_datum_sizes() {
3206 let arena = RowArena::new();
3207
3208 let values_of_interest = vec![
3210 Datum::Null,
3211 Datum::False,
3212 Datum::Int16(0),
3213 Datum::Int32(0),
3214 Datum::Int64(0),
3215 Datum::UInt8(0),
3216 Datum::UInt8(1),
3217 Datum::UInt16(0),
3218 Datum::UInt16(1),
3219 Datum::UInt16(1 << 8),
3220 Datum::UInt32(0),
3221 Datum::UInt32(1),
3222 Datum::UInt32(1 << 8),
3223 Datum::UInt32(1 << 16),
3224 Datum::UInt32(1 << 24),
3225 Datum::UInt64(0),
3226 Datum::UInt64(1),
3227 Datum::UInt64(1 << 8),
3228 Datum::UInt64(1 << 16),
3229 Datum::UInt64(1 << 24),
3230 Datum::UInt64(1 << 32),
3231 Datum::UInt64(1 << 40),
3232 Datum::UInt64(1 << 48),
3233 Datum::UInt64(1 << 56),
3234 Datum::Float32(OrderedFloat(0.0)),
3235 Datum::Float64(OrderedFloat(0.0)),
3236 Datum::from(numeric::Numeric::from(0)),
3237 Datum::from(numeric::Numeric::from(1000)),
3238 Datum::from(numeric::Numeric::from(9999)),
3239 Datum::Date(
3240 NaiveDate::from_ymd_opt(1, 1, 1)
3241 .unwrap()
3242 .try_into()
3243 .unwrap(),
3244 ),
3245 Datum::Timestamp(
3246 CheckedTimestamp::from_timestamplike(
3247 DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3248 )
3249 .unwrap(),
3250 ),
3251 Datum::TimestampTz(
3252 CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3253 .unwrap(),
3254 ),
3255 Datum::Interval(Interval::default()),
3256 Datum::Bytes(&[]),
3257 Datum::String(""),
3258 Datum::JsonNull,
3259 Datum::Range(Range { inner: None }),
3260 arena.make_datum(|packer| {
3261 packer
3262 .push_range(Range::new(Some((
3263 RangeLowerBound::new(Datum::Int32(-1), true),
3264 RangeUpperBound::new(Datum::Int32(1), true),
3265 ))))
3266 .unwrap();
3267 }),
3268 ];
3269 for value in values_of_interest {
3270 if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3271 panic!("Disparity in claimed size for {:?}", value);
3272 }
3273 }
3274 }
3275
3276 #[mz_ore::test]
3277 fn test_range_errors() {
3278 fn test_range_errors_inner<'a>(
3279 datums: Vec<Vec<Datum<'a>>>,
3280 ) -> Result<(), InvalidRangeError> {
3281 let mut row = Row::default();
3282 let row_len = row.byte_len();
3283 let mut packer = row.packer();
3284 let r = packer.push_range_with(
3285 RangeLowerBound {
3286 inclusive: true,
3287 bound: Some(|row: &mut RowPacker| {
3288 for d in &datums[0] {
3289 row.push(d);
3290 }
3291 Ok(())
3292 }),
3293 },
3294 RangeUpperBound {
3295 inclusive: true,
3296 bound: Some(|row: &mut RowPacker| {
3297 for d in &datums[1] {
3298 row.push(d);
3299 }
3300 Ok(())
3301 }),
3302 },
3303 );
3304
3305 assert_eq!(row_len, row.byte_len());
3306
3307 r
3308 }
3309
3310 for panicking_case in [
3311 vec![vec![Datum::Int32(1)], vec![]],
3312 vec![
3313 vec![Datum::Int32(1), Datum::Int32(2)],
3314 vec![Datum::Int32(3)],
3315 ],
3316 vec![
3317 vec![Datum::Int32(1)],
3318 vec![Datum::Int32(2), Datum::Int32(3)],
3319 ],
3320 vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3321 vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3322 vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3323 vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3324 ] {
3325 #[allow(clippy::disallowed_methods)] let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3327 assert_err!(result);
3328 }
3329
3330 let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3331 assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3332 }
3333
3334 #[mz_ore::test]
3336 #[cfg_attr(miri, ignore)] fn test_list_encoding() {
3338 fn test_list_encoding_inner(len: usize) {
3339 let list_elem = |i: usize| {
3340 if i % 2 == 0 {
3341 Datum::False
3342 } else {
3343 Datum::True
3344 }
3345 };
3346 let mut row = Row::default();
3347 {
3348 let mut packer = row.packer();
3350 packer.push(Datum::String("start"));
3351 packer.push_list_with(|packer| {
3352 for i in 0..len {
3353 packer.push(list_elem(i));
3354 }
3355 });
3356 packer.push(Datum::String("end"));
3357 }
3358 let mut row_it = row.iter();
3360 assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3361 match row_it.next().unwrap() {
3362 Datum::List(list) => {
3363 let mut list_it = list.iter();
3364 for i in 0..len {
3365 assert_eq!(list_it.next().unwrap(), list_elem(i));
3366 }
3367 assert_none!(list_it.next());
3368 }
3369 _ => panic!("expected Datum::List"),
3370 }
3371 assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3372 assert_none!(row_it.next());
3373 }
3374
3375 test_list_encoding_inner(0);
3376 test_list_encoding_inner(1);
3377 test_list_encoding_inner(10);
3378 test_list_encoding_inner(TINY - 1); test_list_encoding_inner(TINY + 1); test_list_encoding_inner(SHORT + 1); }
3385}