1use std::borrow::Borrow;
11use std::cell::{Cell, RefCell};
12use std::cmp::Ordering;
13use std::convert::{TryFrom, TryInto};
14use std::fmt::{self, Debug};
15use std::mem::{size_of, transmute};
16use std::ops::Deref;
17use std::str;
18
19use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
20use compact_bytes::CompactBytes;
21use mz_ore::cast::{CastFrom, ReinterpretCast};
22use mz_ore::soft_assert_no_log;
23use mz_ore::vec::Vector;
24use mz_persist_types::Codec64;
25use num_enum::{IntoPrimitive, TryFromPrimitive};
26use ordered_float::OrderedFloat;
27use proptest::prelude::*;
28use proptest::strategy::{BoxedStrategy, Strategy};
29use serde::{Deserialize, Serialize};
30use uuid::Uuid;
31
32use crate::adt::array::{
33 Array, ArrayDimension, ArrayDimensions, InvalidArrayError, MAX_ARRAY_DIMENSIONS,
34};
35use crate::adt::date::Date;
36use crate::adt::interval::Interval;
37use crate::adt::mz_acl_item::{AclItem, MzAclItem};
38use crate::adt::numeric;
39use crate::adt::numeric::Numeric;
40use crate::adt::range::{
41 self, InvalidRangeError, Range, RangeBound, RangeInner, RangeLowerBound, RangeUpperBound,
42};
43use crate::adt::timestamp::CheckedTimestamp;
44use crate::scalar::{DatumKind, arb_datum};
45use crate::{Datum, RelationDesc, Timestamp};
46
47pub(crate) mod encode;
48pub mod iter;
49
50include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
51
52#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
109pub struct Row {
110 data: CompactBytes,
111}
112
113impl Row {
114 const SIZE: usize = CompactBytes::MAX_INLINE;
115
116 pub fn decode_from_proto(
119 &mut self,
120 proto: &ProtoRow,
121 desc: &RelationDesc,
122 ) -> Result<(), String> {
123 let mut packer = self.packer();
124 for (col_idx, _, _) in desc.iter_all() {
125 let d = match proto.datums.get(col_idx.to_raw()) {
126 Some(x) => x,
127 None => {
128 packer.push(Datum::Null);
129 continue;
130 }
131 };
132 packer.try_push_proto(d)?;
133 }
134
135 Ok(())
136 }
137
138 #[inline]
140 pub fn with_capacity(cap: usize) -> Self {
141 Self {
142 data: CompactBytes::with_capacity(cap),
143 }
144 }
145
146 #[inline]
148 pub const fn empty() -> Self {
149 Self {
150 data: CompactBytes::empty(),
151 }
152 }
153
154 pub unsafe fn from_bytes_unchecked(data: &[u8]) -> Self {
161 Row {
162 data: CompactBytes::new(data),
163 }
164 }
165
166 pub fn packer(&mut self) -> RowPacker<'_> {
172 self.clear();
173 RowPacker { row: self }
174 }
175
176 pub fn pack<'a, I, D>(iter: I) -> Row
184 where
185 I: IntoIterator<Item = D>,
186 D: Borrow<Datum<'a>>,
187 {
188 let mut row = Row::default();
189 row.packer().extend(iter);
190 row
191 }
192
193 pub fn pack_using<'a, I, D>(&mut self, iter: I) -> Row
198 where
199 I: IntoIterator<Item = D>,
200 D: Borrow<Datum<'a>>,
201 {
202 self.packer().extend(iter);
203 self.clone()
204 }
205
206 pub fn try_pack<'a, I, D, E>(iter: I) -> Result<Row, E>
210 where
211 I: IntoIterator<Item = Result<D, E>>,
212 D: Borrow<Datum<'a>>,
213 {
214 let mut row = Row::default();
215 row.packer().try_extend(iter)?;
216 Ok(row)
217 }
218
219 pub fn pack_slice<'a>(slice: &[Datum<'a>]) -> Row {
225 let mut row = Row::with_capacity(datums_size(slice.iter()));
227 row.packer().extend(slice.iter());
228 row
229 }
230
231 pub fn byte_len(&self) -> usize {
233 let heap_size = if self.data.spilled() {
234 self.data.len()
235 } else {
236 0
237 };
238 let inline_size = std::mem::size_of::<Self>();
239 inline_size.saturating_add(heap_size)
240 }
241
242 pub fn data_len(&self) -> usize {
244 self.data.len()
245 }
246
247 pub fn byte_capacity(&self) -> usize {
249 self.data.capacity()
250 }
251
252 #[inline]
254 pub fn as_row_ref(&self) -> &RowRef {
255 RowRef::from_slice(self.data.as_slice())
256 }
257
258 #[inline]
260 fn clear(&mut self) {
261 self.data.clear();
262 }
263}
264
265impl Borrow<RowRef> for Row {
266 #[inline]
267 fn borrow(&self) -> &RowRef {
268 self.as_row_ref()
269 }
270}
271
272impl AsRef<RowRef> for Row {
273 #[inline]
274 fn as_ref(&self) -> &RowRef {
275 self.as_row_ref()
276 }
277}
278
279impl Deref for Row {
280 type Target = RowRef;
281
282 #[inline]
283 fn deref(&self) -> &Self::Target {
284 self.as_row_ref()
285 }
286}
287
288static_assertions::const_assert_eq!(std::mem::size_of::<Row>(), 24);
290
291impl Clone for Row {
292 fn clone(&self) -> Self {
293 Row {
294 data: self.data.clone(),
295 }
296 }
297
298 fn clone_from(&mut self, source: &Self) {
299 self.data.clone_from(&source.data);
300 }
301}
302
303impl std::hash::Hash for Row {
305 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
306 self.as_row_ref().hash(state)
307 }
308}
309
310impl Arbitrary for Row {
311 type Parameters = prop::collection::SizeRange;
312 type Strategy = BoxedStrategy<Row>;
313
314 fn arbitrary_with(size: Self::Parameters) -> Self::Strategy {
315 prop::collection::vec(arb_datum(), size)
316 .prop_map(|items| {
317 let mut row = Row::default();
318 let mut packer = row.packer();
319 for item in items.iter() {
320 let datum: Datum<'_> = item.into();
321 packer.push(datum);
322 }
323 row
324 })
325 .boxed()
326 }
327}
328
329impl PartialOrd for Row {
330 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
331 Some(self.cmp(other))
332 }
333}
334
335impl Ord for Row {
336 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
337 self.as_ref().cmp(other.as_ref())
338 }
339}
340
341#[allow(missing_debug_implementations)]
342mod columnation {
343 use columnation::{Columnation, Region};
344 use mz_ore::region::LgAllocRegion;
345
346 use crate::Row;
347
348 pub struct RowStack {
353 region: LgAllocRegion<u8>,
354 }
355
356 impl RowStack {
357 const LIMIT: usize = 2 << 20;
358 }
359
360 impl Default for RowStack {
362 fn default() -> Self {
363 Self {
364 region: LgAllocRegion::with_limit(Self::LIMIT),
366 }
367 }
368 }
369
370 impl Columnation for Row {
371 type InnerRegion = RowStack;
372 }
373
374 impl Region for RowStack {
375 type Item = Row;
376 #[inline]
377 fn clear(&mut self) {
378 self.region.clear();
379 }
380 #[inline(always)]
381 unsafe fn copy(&mut self, item: &Row) -> Row {
382 if item.data.spilled() {
383 let bytes = self.region.copy_slice(&item.data[..]);
384 Row {
385 data: compact_bytes::CompactBytes::from_raw_parts(
386 bytes.as_mut_ptr(),
387 item.data.len(),
388 item.data.capacity(),
389 ),
390 }
391 } else {
392 item.clone()
393 }
394 }
395
396 fn reserve_items<'a, I>(&mut self, items: I)
397 where
398 Self: 'a,
399 I: Iterator<Item = &'a Self::Item> + Clone,
400 {
401 let size = items
402 .filter(|row| row.data.spilled())
403 .map(|row| row.data.len())
404 .sum();
405 let size = std::cmp::min(size, Self::LIMIT);
406 self.region.reserve(size);
407 }
408
409 fn reserve_regions<'a, I>(&mut self, regions: I)
410 where
411 Self: 'a,
412 I: Iterator<Item = &'a Self> + Clone,
413 {
414 let size = regions.map(|r| r.region.len()).sum();
415 let size = std::cmp::min(size, Self::LIMIT);
416 self.region.reserve(size);
417 }
418
419 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
420 self.region.heap_size(callback)
421 }
422 }
423}
424
425mod columnar {
426 use columnar::common::PushIndexAs;
427 use columnar::{
428 AsBytes, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
429 };
430 use mz_ore::cast::CastFrom;
431
432 use crate::{Row, RowRef};
433
434 #[derive(Copy, Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
435 pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
436 pub bounds: BC,
438 pub values: VC,
440 }
441
442 impl Columnar for Row {
443 #[inline(always)]
444 fn copy_from(&mut self, other: columnar::Ref<'_, Self>) {
445 self.clear();
446 self.data.extend_from_slice(other.data());
447 }
448 #[inline(always)]
449 fn into_owned(other: columnar::Ref<'_, Self>) -> Self {
450 other.to_owned()
451 }
452 type Container = Rows;
453 #[inline(always)]
454 fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
455 where
456 Self: 'a,
457 {
458 thing
459 }
460 }
461
462 impl<BC: PushIndexAs<u64>> Container for Rows<BC, Vec<u8>> {
463 type Ref<'a> = &'a RowRef;
464 type Borrowed<'a>
465 = Rows<BC::Borrowed<'a>, &'a [u8]>
466 where
467 Self: 'a;
468 #[inline(always)]
469 fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
470 Rows {
471 bounds: self.bounds.borrow(),
472 values: self.values.borrow(),
473 }
474 }
475 #[inline(always)]
476 fn reborrow<'c, 'a: 'c>(item: Self::Borrowed<'a>) -> Self::Borrowed<'c>
477 where
478 Self: 'a,
479 {
480 Rows {
481 bounds: BC::reborrow(item.bounds),
482 values: item.values,
483 }
484 }
485
486 fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
487 where
488 Self: 'a,
489 {
490 item
491 }
492
493 fn reserve_for<'a, I>(&mut self, selves: I)
494 where
495 Self: 'a,
496 I: Iterator<Item = Self::Borrowed<'a>> + Clone,
497 {
498 self.bounds.reserve_for(selves.clone().map(|r| r.bounds));
499 self.values.reserve_for(selves.map(|r| r.values));
500 }
501 }
502
503 impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
504 #[inline(always)]
505 fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
506 columnar::chain(self.bounds.as_bytes(), self.values.as_bytes())
507 }
508 }
509 impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
510 #[inline(always)]
511 fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
512 Self {
513 bounds: FromBytes::from_bytes(bytes),
514 values: FromBytes::from_bytes(bytes),
515 }
516 }
517 }
518
519 impl<BC: Len, VC> Len for Rows<BC, VC> {
520 #[inline(always)]
521 fn len(&self) -> usize {
522 self.bounds.len()
523 }
524 }
525
526 impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
527 type Ref = &'a RowRef;
528 #[inline(always)]
529 fn get(&self, index: usize) -> Self::Ref {
530 let lower = if index == 0 {
531 0
532 } else {
533 self.bounds.index_as(index - 1)
534 };
535 let upper = self.bounds.index_as(index);
536 let lower = usize::cast_from(lower);
537 let upper = usize::cast_from(upper);
538 RowRef::from_slice(&self.values[lower..upper])
539 }
540 }
541 impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
542 type Ref = &'a RowRef;
543 #[inline(always)]
544 fn get(&self, index: usize) -> Self::Ref {
545 let lower = if index == 0 {
546 0
547 } else {
548 self.bounds.index_as(index - 1)
549 };
550 let upper = self.bounds.index_as(index);
551 let lower = usize::cast_from(lower);
552 let upper = usize::cast_from(upper);
553 RowRef::from_slice(&self.values[lower..upper])
554 }
555 }
556
557 impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
558 #[inline(always)]
559 fn push(&mut self, item: &Row) {
560 self.values.extend_from_slice(item.data.as_slice());
561 self.bounds.push(u64::cast_from(self.values.len()));
562 }
563 }
564 impl<BC: for<'a> Push<&'a u64>> Push<&RowRef> for Rows<BC> {
565 #[inline(always)]
566 fn push(&mut self, item: &RowRef) {
567 self.values.extend_from_slice(item.data());
568 self.bounds.push(&u64::cast_from(self.values.len()));
569 }
570 }
571 impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
572 #[inline(always)]
573 fn clear(&mut self) {
574 self.bounds.clear();
575 self.values.clear();
576 }
577 }
578 impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
579 #[inline(always)]
580 fn heap_size(&self) -> (usize, usize) {
581 let (l0, c0) = self.bounds.heap_size();
582 let (l1, c1) = self.values.heap_size();
583 (l0 + l1, c0 + c1)
584 }
585 }
586}
587
588#[derive(PartialEq, Eq, Hash)]
592#[repr(transparent)]
593pub struct RowRef([u8]);
594
595impl RowRef {
596 pub fn from_slice(row: &[u8]) -> &RowRef {
601 #[allow(clippy::as_conversions)]
602 let ptr = row as *const [u8] as *const RowRef;
603 unsafe { &*ptr }
605 }
606
607 pub fn unpack(&self) -> Vec<Datum<'_>> {
609 let len = self.iter().count();
611 let mut vec = Vec::with_capacity(len);
612 vec.extend(self.iter());
613 vec
614 }
615
616 pub fn unpack_first(&self) -> Datum<'_> {
620 self.iter().next().unwrap()
621 }
622
623 pub fn iter(&self) -> DatumListIter<'_> {
625 DatumListIter { data: &self.0 }
626 }
627
628 pub fn byte_len(&self) -> usize {
630 self.0.len()
631 }
632
633 pub fn data(&self) -> &[u8] {
635 &self.0
636 }
637
638 pub fn is_empty(&self) -> bool {
640 self.0.is_empty()
641 }
642}
643
644impl ToOwned for RowRef {
645 type Owned = Row;
646
647 fn to_owned(&self) -> Self::Owned {
648 unsafe { Row::from_bytes_unchecked(&self.0) }
650 }
651}
652
653impl<'a> IntoIterator for &'a RowRef {
654 type Item = Datum<'a>;
655 type IntoIter = DatumListIter<'a>;
656
657 fn into_iter(self) -> DatumListIter<'a> {
658 DatumListIter { data: &self.0 }
659 }
660}
661
662impl PartialOrd for RowRef {
666 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
667 Some(self.cmp(other))
668 }
669}
670
671impl Ord for RowRef {
672 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
673 match self.0.len().cmp(&other.0.len()) {
674 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
675 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
676 std::cmp::Ordering::Equal => self.0.cmp(&other.0),
677 }
678 }
679}
680
681impl fmt::Debug for RowRef {
682 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
684 f.write_str("RowRef{")?;
685 f.debug_list().entries(self.into_iter()).finish()?;
686 f.write_str("}")
687 }
688}
689
690#[derive(Debug)]
698pub struct RowPacker<'a> {
699 row: &'a mut Row,
700}
701
702#[derive(Debug, Clone)]
703pub struct DatumListIter<'a> {
704 data: &'a [u8],
705}
706
707#[derive(Debug, Clone)]
708pub struct DatumDictIter<'a> {
709 data: &'a [u8],
710 prev_key: Option<&'a str>,
711}
712
713#[derive(Debug)]
715pub struct RowArena {
716 inner: RefCell<Vec<Vec<u8>>>,
723}
724
725#[derive(Clone, Copy, Eq, PartialEq, Hash)]
729pub struct DatumList<'a> {
730 data: &'a [u8],
732}
733
734impl<'a> Debug for DatumList<'a> {
735 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
736 f.debug_list().entries(self.iter()).finish()
737 }
738}
739
740impl Ord for DatumList<'_> {
741 fn cmp(&self, other: &DatumList) -> Ordering {
742 self.iter().cmp(other.iter())
743 }
744}
745
746impl PartialOrd for DatumList<'_> {
747 fn partial_cmp(&self, other: &DatumList) -> Option<Ordering> {
748 Some(self.cmp(other))
749 }
750}
751
752#[derive(Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd)]
754pub struct DatumMap<'a> {
755 data: &'a [u8],
757}
758
759#[derive(Clone, Copy, Eq, PartialEq, Hash)]
762pub struct DatumNested<'a> {
763 val: &'a [u8],
764}
765
766impl<'a> std::fmt::Display for DatumNested<'a> {
767 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
768 std::fmt::Display::fmt(&self.datum(), f)
769 }
770}
771
772impl<'a> std::fmt::Debug for DatumNested<'a> {
773 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
774 f.debug_struct("DatumNested")
775 .field("val", &self.datum())
776 .finish()
777 }
778}
779
780impl<'a> DatumNested<'a> {
781 pub fn extract(data: &mut &'a [u8]) -> DatumNested<'a> {
785 let prev = *data;
786 let _ = unsafe { read_datum(data) };
787 DatumNested {
788 val: &prev[..(prev.len() - data.len())],
789 }
790 }
791
792 pub fn datum(&self) -> Datum<'a> {
794 let mut temp = self.val;
795 unsafe { read_datum(&mut temp) }
796 }
797}
798
799impl<'a> Ord for DatumNested<'a> {
800 fn cmp(&self, other: &Self) -> Ordering {
801 self.datum().cmp(&other.datum())
802 }
803}
804
805impl<'a> PartialOrd for DatumNested<'a> {
806 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
807 Some(self.cmp(other))
808 }
809}
810
811#[derive(Debug, Clone, Copy, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
815#[repr(u8)]
816enum Tag {
817 Null,
818 False,
819 True,
820 Int16,
821 Int32,
822 Int64,
823 UInt8,
824 UInt32,
825 Float32,
826 Float64,
827 Date,
828 Time,
829 Timestamp,
830 TimestampTz,
831 Interval,
832 BytesTiny,
833 BytesShort,
834 BytesLong,
835 BytesHuge,
836 StringTiny,
837 StringShort,
838 StringLong,
839 StringHuge,
840 Uuid,
841 Array,
842 ListTiny,
843 ListShort,
844 ListLong,
845 ListHuge,
846 Dict,
847 JsonNull,
848 Dummy,
849 Numeric,
850 UInt16,
851 UInt64,
852 MzTimestamp,
853 Range,
854 MzAclItem,
855 AclItem,
856 CheapTimestamp,
860 CheapTimestampTz,
864 NonNegativeInt16_0, NonNegativeInt16_8,
877 NonNegativeInt16_16,
878
879 NonNegativeInt32_0,
880 NonNegativeInt32_8,
881 NonNegativeInt32_16,
882 NonNegativeInt32_24,
883 NonNegativeInt32_32,
884
885 NonNegativeInt64_0,
886 NonNegativeInt64_8,
887 NonNegativeInt64_16,
888 NonNegativeInt64_24,
889 NonNegativeInt64_32,
890 NonNegativeInt64_40,
891 NonNegativeInt64_48,
892 NonNegativeInt64_56,
893 NonNegativeInt64_64,
894
895 NegativeInt16_0, NegativeInt16_8,
897 NegativeInt16_16,
898
899 NegativeInt32_0,
900 NegativeInt32_8,
901 NegativeInt32_16,
902 NegativeInt32_24,
903 NegativeInt32_32,
904
905 NegativeInt64_0,
906 NegativeInt64_8,
907 NegativeInt64_16,
908 NegativeInt64_24,
909 NegativeInt64_32,
910 NegativeInt64_40,
911 NegativeInt64_48,
912 NegativeInt64_56,
913 NegativeInt64_64,
914
915 UInt8_0, UInt8_8,
919
920 UInt16_0,
921 UInt16_8,
922 UInt16_16,
923
924 UInt32_0,
925 UInt32_8,
926 UInt32_16,
927 UInt32_24,
928 UInt32_32,
929
930 UInt64_0,
931 UInt64_8,
932 UInt64_16,
933 UInt64_24,
934 UInt64_32,
935 UInt64_40,
936 UInt64_48,
937 UInt64_56,
938 UInt64_64,
939}
940
941impl Tag {
942 fn actual_int_length(self) -> Option<usize> {
943 use Tag::*;
944 let val = match self {
945 NonNegativeInt16_0 | NonNegativeInt32_0 | NonNegativeInt64_0 | UInt8_0 | UInt16_0
946 | UInt32_0 | UInt64_0 => 0,
947 NonNegativeInt16_8 | NonNegativeInt32_8 | NonNegativeInt64_8 | UInt8_8 | UInt16_8
948 | UInt32_8 | UInt64_8 => 1,
949 NonNegativeInt16_16 | NonNegativeInt32_16 | NonNegativeInt64_16 | UInt16_16
950 | UInt32_16 | UInt64_16 => 2,
951 NonNegativeInt32_24 | NonNegativeInt64_24 | UInt32_24 | UInt64_24 => 3,
952 NonNegativeInt32_32 | NonNegativeInt64_32 | UInt32_32 | UInt64_32 => 4,
953 NonNegativeInt64_40 | UInt64_40 => 5,
954 NonNegativeInt64_48 | UInt64_48 => 6,
955 NonNegativeInt64_56 | UInt64_56 => 7,
956 NonNegativeInt64_64 | UInt64_64 => 8,
957 NegativeInt16_0 | NegativeInt32_0 | NegativeInt64_0 => 0,
958 NegativeInt16_8 | NegativeInt32_8 | NegativeInt64_8 => 1,
959 NegativeInt16_16 | NegativeInt32_16 | NegativeInt64_16 => 2,
960 NegativeInt32_24 | NegativeInt64_24 => 3,
961 NegativeInt32_32 | NegativeInt64_32 => 4,
962 NegativeInt64_40 => 5,
963 NegativeInt64_48 => 6,
964 NegativeInt64_56 => 7,
965 NegativeInt64_64 => 8,
966
967 _ => return None,
968 };
969 Some(val)
970 }
971}
972
973fn read_untagged_bytes<'a>(data: &mut &'a [u8]) -> &'a [u8] {
980 let len = u64::from_le_bytes(read_byte_array(data));
981 let len = usize::cast_from(len);
982 let (bytes, next) = data.split_at(len);
983 *data = next;
984 bytes
985}
986
987unsafe fn read_lengthed_datum<'a>(data: &mut &'a [u8], tag: Tag) -> Datum<'a> {
996 let len = match tag {
997 Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => usize::from(read_byte(data)),
998 Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
999 usize::from(u16::from_le_bytes(read_byte_array(data)))
1000 }
1001 Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1002 usize::cast_from(u32::from_le_bytes(read_byte_array(data)))
1003 }
1004 Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1005 usize::cast_from(u64::from_le_bytes(read_byte_array(data)))
1006 }
1007 _ => unreachable!(),
1008 };
1009 let (bytes, next) = data.split_at(len);
1010 *data = next;
1011 match tag {
1012 Tag::BytesTiny | Tag::BytesShort | Tag::BytesLong | Tag::BytesHuge => Datum::Bytes(bytes),
1013 Tag::StringTiny | Tag::StringShort | Tag::StringLong | Tag::StringHuge => {
1014 Datum::String(str::from_utf8_unchecked(bytes))
1015 }
1016 Tag::ListTiny | Tag::ListShort | Tag::ListLong | Tag::ListHuge => {
1017 Datum::List(DatumList { data: bytes })
1018 }
1019 _ => unreachable!(),
1020 }
1021}
1022
1023fn read_byte(data: &mut &[u8]) -> u8 {
1024 let byte = data[0];
1025 *data = &data[1..];
1026 byte
1027}
1028
1029fn read_byte_array_sign_extending<const N: usize, const FILL: u8>(
1037 data: &mut &[u8],
1038 length: usize,
1039) -> [u8; N] {
1040 let mut raw = [FILL; N];
1041 let (prev, next) = data.split_at(length);
1042 (raw[..prev.len()]).copy_from_slice(prev);
1043 *data = next;
1044 raw
1045}
1046fn read_byte_array_extending_negative<const N: usize>(data: &mut &[u8], length: usize) -> [u8; N] {
1054 read_byte_array_sign_extending::<N, 255>(data, length)
1055}
1056
1057fn read_byte_array_extending_nonnegative<const N: usize>(
1065 data: &mut &[u8],
1066 length: usize,
1067) -> [u8; N] {
1068 read_byte_array_sign_extending::<N, 0>(data, length)
1069}
1070
1071pub(super) fn read_byte_array<const N: usize>(data: &mut &[u8]) -> [u8; N] {
1072 let (prev, next) = data.split_first_chunk().unwrap();
1073 *data = next;
1074 *prev
1075}
1076
1077pub(super) fn read_date(data: &mut &[u8]) -> Date {
1078 let days = i32::from_le_bytes(read_byte_array(data));
1079 Date::from_pg_epoch(days).expect("unexpected date")
1080}
1081
1082pub(super) fn read_naive_date(data: &mut &[u8]) -> NaiveDate {
1083 let year = i32::from_le_bytes(read_byte_array(data));
1084 let ordinal = u32::from_le_bytes(read_byte_array(data));
1085 NaiveDate::from_yo_opt(year, ordinal).unwrap()
1086}
1087
1088pub(super) fn read_time(data: &mut &[u8]) -> NaiveTime {
1089 let secs = u32::from_le_bytes(read_byte_array(data));
1090 let nanos = u32::from_le_bytes(read_byte_array(data));
1091 NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap()
1092}
1093
1094pub unsafe fn read_datum<'a>(data: &mut &'a [u8]) -> Datum<'a> {
1103 let tag = Tag::try_from_primitive(read_byte(data)).expect("unknown row tag");
1104 match tag {
1105 Tag::Null => Datum::Null,
1106 Tag::False => Datum::False,
1107 Tag::True => Datum::True,
1108 Tag::UInt8_0 | Tag::UInt8_8 => {
1109 let i = u8::from_le_bytes(read_byte_array_extending_nonnegative(
1110 data,
1111 tag.actual_int_length()
1112 .expect("returns a value for variable-length-encoded integer tags"),
1113 ));
1114 Datum::UInt8(i)
1115 }
1116 Tag::Int16 => {
1117 let i = i16::from_le_bytes(read_byte_array(data));
1118 Datum::Int16(i)
1119 }
1120 Tag::NonNegativeInt16_0 | Tag::NonNegativeInt16_16 | Tag::NonNegativeInt16_8 => {
1121 let i = i16::from_le_bytes(read_byte_array_extending_nonnegative(
1125 data,
1126 tag.actual_int_length()
1127 .expect("returns a value for variable-length-encoded integer tags"),
1128 ));
1129 Datum::Int16(i)
1130 }
1131 Tag::UInt16_0 | Tag::UInt16_8 | Tag::UInt16_16 => {
1132 let i = u16::from_le_bytes(read_byte_array_extending_nonnegative(
1133 data,
1134 tag.actual_int_length()
1135 .expect("returns a value for variable-length-encoded integer tags"),
1136 ));
1137 Datum::UInt16(i)
1138 }
1139 Tag::Int32 => {
1140 let i = i32::from_le_bytes(read_byte_array(data));
1141 Datum::Int32(i)
1142 }
1143 Tag::NonNegativeInt32_0
1144 | Tag::NonNegativeInt32_32
1145 | Tag::NonNegativeInt32_8
1146 | Tag::NonNegativeInt32_16
1147 | Tag::NonNegativeInt32_24 => {
1148 let i = i32::from_le_bytes(read_byte_array_extending_nonnegative(
1152 data,
1153 tag.actual_int_length()
1154 .expect("returns a value for variable-length-encoded integer tags"),
1155 ));
1156 Datum::Int32(i)
1157 }
1158 Tag::UInt32_0 | Tag::UInt32_8 | Tag::UInt32_16 | Tag::UInt32_24 | Tag::UInt32_32 => {
1159 let i = u32::from_le_bytes(read_byte_array_extending_nonnegative(
1160 data,
1161 tag.actual_int_length()
1162 .expect("returns a value for variable-length-encoded integer tags"),
1163 ));
1164 Datum::UInt32(i)
1165 }
1166 Tag::Int64 => {
1167 let i = i64::from_le_bytes(read_byte_array(data));
1168 Datum::Int64(i)
1169 }
1170 Tag::NonNegativeInt64_0
1171 | Tag::NonNegativeInt64_64
1172 | Tag::NonNegativeInt64_8
1173 | Tag::NonNegativeInt64_16
1174 | Tag::NonNegativeInt64_24
1175 | Tag::NonNegativeInt64_32
1176 | Tag::NonNegativeInt64_40
1177 | Tag::NonNegativeInt64_48
1178 | Tag::NonNegativeInt64_56 => {
1179 let i = i64::from_le_bytes(read_byte_array_extending_nonnegative(
1184 data,
1185 tag.actual_int_length()
1186 .expect("returns a value for variable-length-encoded integer tags"),
1187 ));
1188 Datum::Int64(i)
1189 }
1190 Tag::UInt64_0
1191 | Tag::UInt64_8
1192 | Tag::UInt64_16
1193 | Tag::UInt64_24
1194 | Tag::UInt64_32
1195 | Tag::UInt64_40
1196 | Tag::UInt64_48
1197 | Tag::UInt64_56
1198 | Tag::UInt64_64 => {
1199 let i = u64::from_le_bytes(read_byte_array_extending_nonnegative(
1200 data,
1201 tag.actual_int_length()
1202 .expect("returns a value for variable-length-encoded integer tags"),
1203 ));
1204 Datum::UInt64(i)
1205 }
1206 Tag::NegativeInt16_0 | Tag::NegativeInt16_16 | Tag::NegativeInt16_8 => {
1207 let i = i16::from_le_bytes(read_byte_array_extending_negative(
1211 data,
1212 tag.actual_int_length()
1213 .expect("returns a value for variable-length-encoded integer tags"),
1214 ));
1215 Datum::Int16(i)
1216 }
1217 Tag::NegativeInt32_0
1218 | Tag::NegativeInt32_32
1219 | Tag::NegativeInt32_8
1220 | Tag::NegativeInt32_16
1221 | Tag::NegativeInt32_24 => {
1222 let i = i32::from_le_bytes(read_byte_array_extending_negative(
1226 data,
1227 tag.actual_int_length()
1228 .expect("returns a value for variable-length-encoded integer tags"),
1229 ));
1230 Datum::Int32(i)
1231 }
1232 Tag::NegativeInt64_0
1233 | Tag::NegativeInt64_64
1234 | Tag::NegativeInt64_8
1235 | Tag::NegativeInt64_16
1236 | Tag::NegativeInt64_24
1237 | Tag::NegativeInt64_32
1238 | Tag::NegativeInt64_40
1239 | Tag::NegativeInt64_48
1240 | Tag::NegativeInt64_56 => {
1241 let i = i64::from_le_bytes(read_byte_array_extending_negative(
1245 data,
1246 tag.actual_int_length()
1247 .expect("returns a value for variable-length-encoded integer tags"),
1248 ));
1249 Datum::Int64(i)
1250 }
1251
1252 Tag::UInt8 => {
1253 let i = u8::from_le_bytes(read_byte_array(data));
1254 Datum::UInt8(i)
1255 }
1256 Tag::UInt16 => {
1257 let i = u16::from_le_bytes(read_byte_array(data));
1258 Datum::UInt16(i)
1259 }
1260 Tag::UInt32 => {
1261 let i = u32::from_le_bytes(read_byte_array(data));
1262 Datum::UInt32(i)
1263 }
1264 Tag::UInt64 => {
1265 let i = u64::from_le_bytes(read_byte_array(data));
1266 Datum::UInt64(i)
1267 }
1268 Tag::Float32 => {
1269 let f = f32::from_bits(u32::from_le_bytes(read_byte_array(data)));
1270 Datum::Float32(OrderedFloat::from(f))
1271 }
1272 Tag::Float64 => {
1273 let f = f64::from_bits(u64::from_le_bytes(read_byte_array(data)));
1274 Datum::Float64(OrderedFloat::from(f))
1275 }
1276 Tag::Date => Datum::Date(read_date(data)),
1277 Tag::Time => Datum::Time(read_time(data)),
1278 Tag::CheapTimestamp => {
1279 let ts = i64::from_le_bytes(read_byte_array(data));
1280 let secs = ts.div_euclid(1_000_000_000);
1281 let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1282 let ndt = DateTime::from_timestamp(secs, nsecs)
1283 .expect("We only write round-trippable timestamps")
1284 .naive_utc();
1285 Datum::Timestamp(
1286 CheckedTimestamp::from_timestamplike(ndt).expect("unexpected timestamp"),
1287 )
1288 }
1289 Tag::CheapTimestampTz => {
1290 let ts = i64::from_le_bytes(read_byte_array(data));
1291 let secs = ts.div_euclid(1_000_000_000);
1292 let nsecs: u32 = ts.rem_euclid(1_000_000_000).try_into().unwrap();
1293 let dt = DateTime::from_timestamp(secs, nsecs)
1294 .expect("We only write round-trippable timestamps");
1295 Datum::TimestampTz(
1296 CheckedTimestamp::from_timestamplike(dt).expect("unexpected timestamp"),
1297 )
1298 }
1299 Tag::Timestamp => {
1300 let date = read_naive_date(data);
1301 let time = read_time(data);
1302 Datum::Timestamp(
1303 CheckedTimestamp::from_timestamplike(date.and_time(time))
1304 .expect("unexpected timestamp"),
1305 )
1306 }
1307 Tag::TimestampTz => {
1308 let date = read_naive_date(data);
1309 let time = read_time(data);
1310 Datum::TimestampTz(
1311 CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
1312 date.and_time(time),
1313 Utc,
1314 ))
1315 .expect("unexpected timestamptz"),
1316 )
1317 }
1318 Tag::Interval => {
1319 let months = i32::from_le_bytes(read_byte_array(data));
1320 let days = i32::from_le_bytes(read_byte_array(data));
1321 let micros = i64::from_le_bytes(read_byte_array(data));
1322 Datum::Interval(Interval {
1323 months,
1324 days,
1325 micros,
1326 })
1327 }
1328 Tag::BytesTiny
1329 | Tag::BytesShort
1330 | Tag::BytesLong
1331 | Tag::BytesHuge
1332 | Tag::StringTiny
1333 | Tag::StringShort
1334 | Tag::StringLong
1335 | Tag::StringHuge
1336 | Tag::ListTiny
1337 | Tag::ListShort
1338 | Tag::ListLong
1339 | Tag::ListHuge => read_lengthed_datum(data, tag),
1340 Tag::Uuid => Datum::Uuid(Uuid::from_bytes(read_byte_array(data))),
1341 Tag::Array => {
1342 let ndims = read_byte(data);
1345 let dims_size = usize::from(ndims) * size_of::<u64>() * 2;
1346 let (dims, next) = data.split_at(dims_size);
1347 *data = next;
1348 let bytes = read_untagged_bytes(data);
1349 Datum::Array(Array {
1350 dims: ArrayDimensions { data: dims },
1351 elements: DatumList { data: bytes },
1352 })
1353 }
1354 Tag::Dict => {
1355 let bytes = read_untagged_bytes(data);
1356 Datum::Map(DatumMap { data: bytes })
1357 }
1358 Tag::JsonNull => Datum::JsonNull,
1359 Tag::Dummy => Datum::Dummy,
1360 Tag::Numeric => {
1361 let digits = read_byte(data).into();
1362 let exponent = i8::reinterpret_cast(read_byte(data));
1363 let bits = read_byte(data);
1364
1365 let lsu_u16_len = Numeric::digits_to_lsu_elements_len(digits);
1366 let lsu_u8_len = lsu_u16_len * 2;
1367 let (lsu_u8, next) = data.split_at(lsu_u8_len);
1368 *data = next;
1369
1370 let mut lsu = [0; numeric::NUMERIC_DATUM_WIDTH_USIZE];
1374 for (i, c) in lsu_u8.chunks(2).enumerate() {
1375 lsu[i] = u16::from_le_bytes(c.try_into().unwrap());
1376 }
1377
1378 let d = Numeric::from_raw_parts(digits, exponent.into(), bits, lsu);
1379 Datum::from(d)
1380 }
1381 Tag::MzTimestamp => {
1382 let t = Timestamp::decode(read_byte_array(data));
1383 Datum::MzTimestamp(t)
1384 }
1385 Tag::Range => {
1386 let flag_byte = read_byte(data);
1388 let flags = range::InternalFlags::from_bits(flag_byte)
1389 .expect("range flags must be encoded validly");
1390
1391 if flags.contains(range::InternalFlags::EMPTY) {
1392 assert!(
1393 flags == range::InternalFlags::EMPTY,
1394 "empty ranges contain only RANGE_EMPTY flag"
1395 );
1396
1397 return Datum::Range(Range { inner: None });
1398 }
1399
1400 let lower_bound = if flags.contains(range::InternalFlags::LB_INFINITE) {
1401 None
1402 } else {
1403 Some(DatumNested::extract(data))
1404 };
1405
1406 let lower = RangeBound {
1407 inclusive: flags.contains(range::InternalFlags::LB_INCLUSIVE),
1408 bound: lower_bound,
1409 };
1410
1411 let upper_bound = if flags.contains(range::InternalFlags::UB_INFINITE) {
1412 None
1413 } else {
1414 Some(DatumNested::extract(data))
1415 };
1416
1417 let upper = RangeBound {
1418 inclusive: flags.contains(range::InternalFlags::UB_INCLUSIVE),
1419 bound: upper_bound,
1420 };
1421
1422 Datum::Range(Range {
1423 inner: Some(RangeInner { lower, upper }),
1424 })
1425 }
1426 Tag::MzAclItem => {
1427 const N: usize = MzAclItem::binary_size();
1428 let mz_acl_item =
1429 MzAclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid mz_aclitem");
1430 Datum::MzAclItem(mz_acl_item)
1431 }
1432 Tag::AclItem => {
1433 const N: usize = AclItem::binary_size();
1434 let acl_item =
1435 AclItem::decode_binary(&read_byte_array::<N>(data)).expect("invalid aclitem");
1436 Datum::AclItem(acl_item)
1437 }
1438 }
1439}
1440
1441fn push_untagged_bytes<D>(data: &mut D, bytes: &[u8])
1445where
1446 D: Vector<u8>,
1447{
1448 let len = u64::cast_from(bytes.len());
1449 data.extend_from_slice(&len.to_le_bytes());
1450 data.extend_from_slice(bytes);
1451}
1452
1453fn push_lengthed_bytes<D>(data: &mut D, bytes: &[u8], tag: Tag)
1454where
1455 D: Vector<u8>,
1456{
1457 match tag {
1458 Tag::BytesTiny | Tag::StringTiny | Tag::ListTiny => {
1459 let len = bytes.len().to_le_bytes();
1460 data.push(len[0]);
1461 }
1462 Tag::BytesShort | Tag::StringShort | Tag::ListShort => {
1463 let len = bytes.len().to_le_bytes();
1464 data.extend_from_slice(&len[0..2]);
1465 }
1466 Tag::BytesLong | Tag::StringLong | Tag::ListLong => {
1467 let len = bytes.len().to_le_bytes();
1468 data.extend_from_slice(&len[0..4]);
1469 }
1470 Tag::BytesHuge | Tag::StringHuge | Tag::ListHuge => {
1471 let len = bytes.len().to_le_bytes();
1472 data.extend_from_slice(&len);
1473 }
1474 _ => unreachable!(),
1475 }
1476 data.extend_from_slice(bytes);
1477}
1478
1479pub(super) fn date_to_array(date: Date) -> [u8; size_of::<i32>()] {
1480 i32::to_le_bytes(date.pg_epoch_days())
1481}
1482
1483fn push_date<D>(data: &mut D, date: Date)
1484where
1485 D: Vector<u8>,
1486{
1487 data.extend_from_slice(&date_to_array(date));
1488}
1489
1490pub(super) fn naive_date_to_arrays(
1491 date: NaiveDate,
1492) -> ([u8; size_of::<i32>()], [u8; size_of::<u32>()]) {
1493 (
1494 i32::to_le_bytes(date.year()),
1495 u32::to_le_bytes(date.ordinal()),
1496 )
1497}
1498
1499fn push_naive_date<D>(data: &mut D, date: NaiveDate)
1500where
1501 D: Vector<u8>,
1502{
1503 let (ds1, ds2) = naive_date_to_arrays(date);
1504 data.extend_from_slice(&ds1);
1505 data.extend_from_slice(&ds2);
1506}
1507
1508pub(super) fn time_to_arrays(time: NaiveTime) -> ([u8; size_of::<u32>()], [u8; size_of::<u32>()]) {
1509 (
1510 u32::to_le_bytes(time.num_seconds_from_midnight()),
1511 u32::to_le_bytes(time.nanosecond()),
1512 )
1513}
1514
1515fn push_time<D>(data: &mut D, time: NaiveTime)
1516where
1517 D: Vector<u8>,
1518{
1519 let (ts1, ts2) = time_to_arrays(time);
1520 data.extend_from_slice(&ts1);
1521 data.extend_from_slice(&ts2);
1522}
1523
1524fn checked_timestamp_nanos(dt: NaiveDateTime) -> Option<i64> {
1534 let subsec_nanos = dt.and_utc().timestamp_subsec_nanos();
1535 if subsec_nanos >= 1_000_000_000 {
1536 return None;
1537 }
1538 let as_ns = dt.and_utc().timestamp().checked_mul(1_000_000_000)?;
1539 as_ns.checked_add(i64::from(subsec_nanos))
1540}
1541
1542#[inline(always)]
1548#[allow(clippy::as_conversions)]
1549fn min_bytes_signed<T>(i: T) -> u8
1550where
1551 T: Into<i64>,
1552{
1553 let i: i64 = i.into();
1554
1555 let n_sign_bits = if i.is_negative() {
1559 i.leading_ones() as u8
1560 } else {
1561 i.leading_zeros() as u8
1562 };
1563
1564 (64 - n_sign_bits + 7) / 8
1565}
1566
1567#[inline(always)]
1575#[allow(clippy::as_conversions)]
1576fn min_bytes_unsigned<T>(i: T) -> u8
1577where
1578 T: Into<u64>,
1579{
1580 let i: u64 = i.into();
1581
1582 let n_sign_bits = i.leading_zeros() as u8;
1583
1584 (64 - n_sign_bits + 7) / 8
1585}
1586
1587const TINY: usize = 1 << 8;
1588const SHORT: usize = 1 << 16;
1589const LONG: usize = 1 << 32;
1590
1591fn push_datum<D>(data: &mut D, datum: Datum)
1592where
1593 D: Vector<u8>,
1594{
1595 match datum {
1596 Datum::Null => data.push(Tag::Null.into()),
1597 Datum::False => data.push(Tag::False.into()),
1598 Datum::True => data.push(Tag::True.into()),
1599 Datum::Int16(i) => {
1600 let mbs = min_bytes_signed(i);
1601 let tag = u8::from(if i.is_negative() {
1602 Tag::NegativeInt16_0
1603 } else {
1604 Tag::NonNegativeInt16_0
1605 }) + mbs;
1606
1607 data.push(tag);
1608 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1609 }
1610 Datum::Int32(i) => {
1611 let mbs = min_bytes_signed(i);
1612 let tag = u8::from(if i.is_negative() {
1613 Tag::NegativeInt32_0
1614 } else {
1615 Tag::NonNegativeInt32_0
1616 }) + mbs;
1617
1618 data.push(tag);
1619 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1620 }
1621 Datum::Int64(i) => {
1622 let mbs = min_bytes_signed(i);
1623 let tag = u8::from(if i.is_negative() {
1624 Tag::NegativeInt64_0
1625 } else {
1626 Tag::NonNegativeInt64_0
1627 }) + mbs;
1628
1629 data.push(tag);
1630 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbs)]);
1631 }
1632 Datum::UInt8(i) => {
1633 let mbu = min_bytes_unsigned(i);
1634 let tag = u8::from(Tag::UInt8_0) + mbu;
1635 data.push(tag);
1636 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1637 }
1638 Datum::UInt16(i) => {
1639 let mbu = min_bytes_unsigned(i);
1640 let tag = u8::from(Tag::UInt16_0) + mbu;
1641 data.push(tag);
1642 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1643 }
1644 Datum::UInt32(i) => {
1645 let mbu = min_bytes_unsigned(i);
1646 let tag = u8::from(Tag::UInt32_0) + mbu;
1647 data.push(tag);
1648 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1649 }
1650 Datum::UInt64(i) => {
1651 let mbu = min_bytes_unsigned(i);
1652 let tag = u8::from(Tag::UInt64_0) + mbu;
1653 data.push(tag);
1654 data.extend_from_slice(&i.to_le_bytes()[0..usize::from(mbu)]);
1655 }
1656 Datum::Float32(f) => {
1657 data.push(Tag::Float32.into());
1658 data.extend_from_slice(&f.to_bits().to_le_bytes());
1659 }
1660 Datum::Float64(f) => {
1661 data.push(Tag::Float64.into());
1662 data.extend_from_slice(&f.to_bits().to_le_bytes());
1663 }
1664 Datum::Date(d) => {
1665 data.push(Tag::Date.into());
1666 push_date(data, d);
1667 }
1668 Datum::Time(t) => {
1669 data.push(Tag::Time.into());
1670 push_time(data, t);
1671 }
1672 Datum::Timestamp(t) => {
1673 let datetime = t.to_naive();
1674 if let Some(nanos) = checked_timestamp_nanos(datetime) {
1675 data.push(Tag::CheapTimestamp.into());
1676 data.extend_from_slice(&nanos.to_le_bytes());
1677 } else {
1678 data.push(Tag::Timestamp.into());
1679 push_naive_date(data, datetime.date());
1680 push_time(data, datetime.time());
1681 }
1682 }
1683 Datum::TimestampTz(t) => {
1684 let datetime = t.to_naive();
1685 if let Some(nanos) = checked_timestamp_nanos(datetime) {
1686 data.push(Tag::CheapTimestampTz.into());
1687 data.extend_from_slice(&nanos.to_le_bytes());
1688 } else {
1689 data.push(Tag::TimestampTz.into());
1690 push_naive_date(data, datetime.date());
1691 push_time(data, datetime.time());
1692 }
1693 }
1694 Datum::Interval(i) => {
1695 data.push(Tag::Interval.into());
1696 data.extend_from_slice(&i.months.to_le_bytes());
1697 data.extend_from_slice(&i.days.to_le_bytes());
1698 data.extend_from_slice(&i.micros.to_le_bytes());
1699 }
1700 Datum::Bytes(bytes) => {
1701 let tag = match bytes.len() {
1702 0..TINY => Tag::BytesTiny,
1703 TINY..SHORT => Tag::BytesShort,
1704 SHORT..LONG => Tag::BytesLong,
1705 _ => Tag::BytesHuge,
1706 };
1707 data.push(tag.into());
1708 push_lengthed_bytes(data, bytes, tag);
1709 }
1710 Datum::String(string) => {
1711 let tag = match string.len() {
1712 0..TINY => Tag::StringTiny,
1713 TINY..SHORT => Tag::StringShort,
1714 SHORT..LONG => Tag::StringLong,
1715 _ => Tag::StringHuge,
1716 };
1717 data.push(tag.into());
1718 push_lengthed_bytes(data, string.as_bytes(), tag);
1719 }
1720 Datum::List(list) => {
1721 let tag = match list.data.len() {
1722 0..TINY => Tag::ListTiny,
1723 TINY..SHORT => Tag::ListShort,
1724 SHORT..LONG => Tag::ListLong,
1725 _ => Tag::ListHuge,
1726 };
1727 data.push(tag.into());
1728 push_lengthed_bytes(data, list.data, tag);
1729 }
1730 Datum::Uuid(u) => {
1731 data.push(Tag::Uuid.into());
1732 data.extend_from_slice(u.as_bytes());
1733 }
1734 Datum::Array(array) => {
1735 data.push(Tag::Array.into());
1738 data.push(array.dims.ndims());
1739 data.extend_from_slice(array.dims.data);
1740 push_untagged_bytes(data, array.elements.data);
1741 }
1742 Datum::Map(dict) => {
1743 data.push(Tag::Dict.into());
1744 push_untagged_bytes(data, dict.data);
1745 }
1746 Datum::JsonNull => data.push(Tag::JsonNull.into()),
1747 Datum::MzTimestamp(t) => {
1748 data.push(Tag::MzTimestamp.into());
1749 data.extend_from_slice(&t.encode());
1750 }
1751 Datum::Dummy => data.push(Tag::Dummy.into()),
1752 Datum::Numeric(mut n) => {
1753 numeric::cx_datum().reduce(&mut n.0);
1758 let (digits, exponent, bits, lsu) = n.0.to_raw_parts();
1759 data.push(Tag::Numeric.into());
1760 data.push(u8::try_from(digits).expect("digits to fit within u8; should not exceed 39"));
1761 data.push(
1762 i8::try_from(exponent)
1763 .expect("exponent to fit within i8; should not exceed +/- 39")
1764 .to_le_bytes()[0],
1765 );
1766 data.push(bits);
1767
1768 let lsu = &lsu[..Numeric::digits_to_lsu_elements_len(digits)];
1769
1770 if cfg!(target_endian = "little") {
1772 let (prefix, lsu_bytes, suffix) = unsafe { lsu.align_to::<u8>() };
1775 soft_assert_no_log!(
1778 lsu_bytes.len() == Numeric::digits_to_lsu_elements_len(digits) * 2,
1779 "u8 version of numeric LSU contained the wrong number of elements; expected {}, but got {}",
1780 Numeric::digits_to_lsu_elements_len(digits) * 2,
1781 lsu_bytes.len()
1782 );
1783 soft_assert_no_log!(prefix.is_empty() && suffix.is_empty());
1785 data.extend_from_slice(lsu_bytes);
1786 } else {
1787 for u in lsu {
1788 data.extend_from_slice(&u.to_le_bytes());
1789 }
1790 }
1791 }
1792 Datum::Range(range) => {
1793 data.push(Tag::Range.into());
1795 data.push(range.internal_flag_bits());
1796
1797 if let Some(RangeInner { lower, upper }) = range.inner {
1798 for bound in [lower.bound, upper.bound] {
1799 if let Some(bound) = bound {
1800 match bound.datum() {
1801 Datum::Null => panic!("cannot push Datum::Null into range"),
1802 d => push_datum::<D>(data, d),
1803 }
1804 }
1805 }
1806 }
1807 }
1808 Datum::MzAclItem(mz_acl_item) => {
1809 data.push(Tag::MzAclItem.into());
1810 data.extend_from_slice(&mz_acl_item.encode_binary());
1811 }
1812 Datum::AclItem(acl_item) => {
1813 data.push(Tag::AclItem.into());
1814 data.extend_from_slice(&acl_item.encode_binary());
1815 }
1816 }
1817}
1818
1819pub fn row_size<'a, I>(a: I) -> usize
1821where
1822 I: IntoIterator<Item = Datum<'a>>,
1823{
1824 let sz = datums_size::<_, _>(a);
1829 let size_of_row = std::mem::size_of::<Row>();
1830 if sz > Row::SIZE {
1834 sz + size_of_row
1835 } else {
1836 size_of_row
1837 }
1838}
1839
1840pub fn datum_size(datum: &Datum) -> usize {
1843 match datum {
1844 Datum::Null => 1,
1845 Datum::False => 1,
1846 Datum::True => 1,
1847 Datum::Int16(i) => 1 + usize::from(min_bytes_signed(*i)),
1848 Datum::Int32(i) => 1 + usize::from(min_bytes_signed(*i)),
1849 Datum::Int64(i) => 1 + usize::from(min_bytes_signed(*i)),
1850 Datum::UInt8(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1851 Datum::UInt16(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1852 Datum::UInt32(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1853 Datum::UInt64(i) => 1 + usize::from(min_bytes_unsigned(*i)),
1854 Datum::Float32(_) => 1 + size_of::<f32>(),
1855 Datum::Float64(_) => 1 + size_of::<f64>(),
1856 Datum::Date(_) => 1 + size_of::<i32>(),
1857 Datum::Time(_) => 1 + 8,
1858 Datum::Timestamp(t) => {
1859 1 + if checked_timestamp_nanos(t.to_naive()).is_some() {
1860 8
1861 } else {
1862 16
1863 }
1864 }
1865 Datum::TimestampTz(t) => {
1866 1 + if checked_timestamp_nanos(t.naive_utc()).is_some() {
1867 8
1868 } else {
1869 16
1870 }
1871 }
1872 Datum::Interval(_) => 1 + size_of::<i32>() + size_of::<i32>() + size_of::<i64>(),
1873 Datum::Bytes(bytes) => {
1874 let bytes_for_length = match bytes.len() {
1876 0..TINY => 1,
1877 TINY..SHORT => 2,
1878 SHORT..LONG => 4,
1879 _ => 8,
1880 };
1881 1 + bytes_for_length + bytes.len()
1882 }
1883 Datum::String(string) => {
1884 let bytes_for_length = match string.len() {
1886 0..TINY => 1,
1887 TINY..SHORT => 2,
1888 SHORT..LONG => 4,
1889 _ => 8,
1890 };
1891 1 + bytes_for_length + string.len()
1892 }
1893 Datum::Uuid(_) => 1 + size_of::<uuid::Bytes>(),
1894 Datum::Array(array) => {
1895 1 + size_of::<u8>()
1896 + array.dims.data.len()
1897 + size_of::<u64>()
1898 + array.elements.data.len()
1899 }
1900 Datum::List(list) => 1 + size_of::<u64>() + list.data.len(),
1901 Datum::Map(dict) => 1 + size_of::<u64>() + dict.data.len(),
1902 Datum::JsonNull => 1,
1903 Datum::MzTimestamp(_) => 1 + size_of::<Timestamp>(),
1904 Datum::Dummy => 1,
1905 Datum::Numeric(d) => {
1906 let mut d = d.0.clone();
1907 numeric::cx_datum().reduce(&mut d);
1910 4 + (d.coefficient_units().len() * 2)
1912 }
1913 Datum::Range(Range { inner }) => {
1914 2 + match inner {
1916 None => 0,
1917 Some(RangeInner { lower, upper }) => [lower.bound, upper.bound]
1918 .iter()
1919 .map(|bound| match bound {
1920 None => 0,
1921 Some(bound) => bound.val.len(),
1922 })
1923 .sum(),
1924 }
1925 }
1926 Datum::MzAclItem(_) => 1 + MzAclItem::binary_size(),
1927 Datum::AclItem(_) => 1 + AclItem::binary_size(),
1928 }
1929}
1930
1931pub fn datums_size<'a, I, D>(iter: I) -> usize
1936where
1937 I: IntoIterator<Item = D>,
1938 D: Borrow<Datum<'a>>,
1939{
1940 iter.into_iter().map(|d| datum_size(d.borrow())).sum()
1941}
1942
1943pub fn datum_list_size<'a, I, D>(iter: I) -> usize
1948where
1949 I: IntoIterator<Item = D>,
1950 D: Borrow<Datum<'a>>,
1951{
1952 1 + size_of::<u64>() + datums_size(iter)
1953}
1954
1955impl RowPacker<'_> {
1956 pub fn for_existing_row(row: &mut Row) -> RowPacker<'_> {
1963 RowPacker { row }
1964 }
1965
1966 #[inline]
1968 pub fn push<'a, D>(&mut self, datum: D)
1969 where
1970 D: Borrow<Datum<'a>>,
1971 {
1972 push_datum(&mut self.row.data, *datum.borrow());
1973 }
1974
1975 #[inline]
1977 pub fn extend<'a, I, D>(&mut self, iter: I)
1978 where
1979 I: IntoIterator<Item = D>,
1980 D: Borrow<Datum<'a>>,
1981 {
1982 for datum in iter {
1983 push_datum(&mut self.row.data, *datum.borrow())
1984 }
1985 }
1986
1987 #[inline]
1993 pub fn try_extend<'a, I, E, D>(&mut self, iter: I) -> Result<(), E>
1994 where
1995 I: IntoIterator<Item = Result<D, E>>,
1996 D: Borrow<Datum<'a>>,
1997 {
1998 for datum in iter {
1999 push_datum(&mut self.row.data, *datum?.borrow());
2000 }
2001 Ok(())
2002 }
2003
2004 pub fn extend_by_row(&mut self, row: &Row) {
2006 self.row.data.extend_from_slice(row.data.as_slice());
2007 }
2008
2009 #[inline]
2017 pub unsafe fn extend_by_slice_unchecked(&mut self, data: &[u8]) {
2018 self.row.data.extend_from_slice(data)
2019 }
2020
2021 #[inline]
2043 pub fn push_list_with<F, R>(&mut self, f: F) -> R
2044 where
2045 F: FnOnce(&mut RowPacker) -> R,
2046 {
2047 let start = self.row.data.len();
2050 self.row.data.push(Tag::ListTiny.into());
2051 self.row.data.push(0);
2053
2054 let out = f(self);
2055
2056 let len = self.row.data.len() - start - 1 - 1;
2058 if len < TINY {
2060 self.row.data[start + 1] = len.to_le_bytes()[0];
2062 } else {
2063 long_list(&mut self.row.data, start, len);
2066 }
2067
2068 #[cold]
2075 fn long_list(data: &mut CompactBytes, start: usize, len: usize) {
2076 let long_list_inner = |data: &mut CompactBytes, len_len| {
2079 const ZEROS: [u8; 8] = [0; 8];
2082 data.extend_from_slice(&ZEROS[0..len_len - 1]);
2083 data.copy_within(start + 1 + 1..start + 1 + 1 + len, start + 1 + len_len);
2092 data[start + 1..start + 1 + len_len]
2094 .copy_from_slice(&len.to_le_bytes()[0..len_len]);
2095 };
2096 match len {
2097 0..TINY => {
2098 unreachable!()
2099 }
2100 TINY..SHORT => {
2101 data[start] = Tag::ListShort.into();
2102 long_list_inner(data, 2);
2103 }
2104 SHORT..LONG => {
2105 data[start] = Tag::ListLong.into();
2106 long_list_inner(data, 4);
2107 }
2108 _ => {
2109 data[start] = Tag::ListHuge.into();
2110 long_list_inner(data, 8);
2111 }
2112 };
2113 }
2114
2115 out
2116 }
2117
2118 pub fn push_dict_with<F, R>(&mut self, f: F) -> R
2156 where
2157 F: FnOnce(&mut RowPacker) -> R,
2158 {
2159 self.row.data.push(Tag::Dict.into());
2160 let start = self.row.data.len();
2161 self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2163
2164 let res = f(self);
2165
2166 let len = u64::cast_from(self.row.data.len() - start - size_of::<u64>());
2167 self.row.data[start..start + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2169
2170 res
2171 }
2172
2173 pub fn try_push_array<'a, I, D>(
2180 &mut self,
2181 dims: &[ArrayDimension],
2182 iter: I,
2183 ) -> Result<(), InvalidArrayError>
2184 where
2185 I: IntoIterator<Item = D>,
2186 D: Borrow<Datum<'a>>,
2187 {
2188 unsafe {
2190 self.push_array_with_unchecked(dims, |packer| {
2191 let mut nelements = 0;
2192 for datum in iter {
2193 packer.push(datum);
2194 nelements += 1;
2195 }
2196 Ok::<_, InvalidArrayError>(nelements)
2197 })
2198 }
2199 }
2200
2201 pub unsafe fn push_array_with_unchecked<F, E>(
2210 &mut self,
2211 dims: &[ArrayDimension],
2212 f: F,
2213 ) -> Result<(), E>
2214 where
2215 F: FnOnce(&mut RowPacker) -> Result<usize, E>,
2216 E: From<InvalidArrayError>,
2217 {
2218 if dims.len() > usize::from(MAX_ARRAY_DIMENSIONS) {
2230 return Err(InvalidArrayError::TooManyDimensions(dims.len()).into());
2231 }
2232
2233 let start = self.row.data.len();
2234 self.row.data.push(Tag::Array.into());
2235
2236 self.row
2238 .data
2239 .push(dims.len().try_into().expect("ndims verified to fit in u8"));
2240 for dim in dims {
2241 self.row
2242 .data
2243 .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2244 self.row
2245 .data
2246 .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2247 }
2248
2249 let off = self.row.data.len();
2251 self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2252 let nelements = match f(self) {
2253 Ok(nelements) => nelements,
2254 Err(e) => {
2255 self.row.data.truncate(start);
2256 return Err(e);
2257 }
2258 };
2259 let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2260 self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2261
2262 let cardinality = match dims {
2265 [] => 0,
2266 dims => dims.iter().map(|d| d.length).product(),
2267 };
2268 if nelements != cardinality {
2269 self.row.data.truncate(start);
2270 return Err(InvalidArrayError::WrongCardinality {
2271 actual: nelements,
2272 expected: cardinality,
2273 }
2274 .into());
2275 }
2276
2277 Ok(())
2278 }
2279
2280 pub fn push_array_with_row_major<F, I>(
2290 &mut self,
2291 dims: I,
2292 f: F,
2293 ) -> Result<(), InvalidArrayError>
2294 where
2295 I: IntoIterator<Item = ArrayDimension>,
2296 F: FnOnce(&mut RowPacker) -> usize,
2297 {
2298 let start = self.row.data.len();
2299 self.row.data.push(Tag::Array.into());
2300
2301 let dims_start = self.row.data.len();
2303 self.row.data.push(42);
2304
2305 let mut num_dims: u8 = 0;
2306 let mut cardinality: usize = 1;
2307 for dim in dims {
2308 num_dims += 1;
2309 cardinality *= dim.length;
2310
2311 self.row
2312 .data
2313 .extend_from_slice(&i64::cast_from(dim.lower_bound).to_le_bytes());
2314 self.row
2315 .data
2316 .extend_from_slice(&u64::cast_from(dim.length).to_le_bytes());
2317 }
2318
2319 if num_dims > MAX_ARRAY_DIMENSIONS {
2320 self.row.data.truncate(start);
2322 return Err(InvalidArrayError::TooManyDimensions(usize::from(num_dims)));
2323 }
2324 self.row.data[dims_start..dims_start + size_of::<u8>()]
2326 .copy_from_slice(&num_dims.to_le_bytes());
2327
2328 let off = self.row.data.len();
2330 self.row.data.extend_from_slice(&[0; size_of::<u64>()]);
2331
2332 let nelements = f(self);
2333
2334 let len = u64::cast_from(self.row.data.len() - off - size_of::<u64>());
2335 self.row.data[off..off + size_of::<u64>()].copy_from_slice(&len.to_le_bytes());
2336
2337 let cardinality = match num_dims {
2340 0 => 0,
2341 _ => cardinality,
2342 };
2343 if nelements != cardinality {
2344 self.row.data.truncate(start);
2345 return Err(InvalidArrayError::WrongCardinality {
2346 actual: nelements,
2347 expected: cardinality,
2348 });
2349 }
2350
2351 Ok(())
2352 }
2353
2354 pub fn push_list<'a, I, D>(&mut self, iter: I)
2358 where
2359 I: IntoIterator<Item = D>,
2360 D: Borrow<Datum<'a>>,
2361 {
2362 self.push_list_with(|packer| {
2363 for elem in iter {
2364 packer.push(*elem.borrow())
2365 }
2366 });
2367 }
2368
2369 pub fn push_dict<'a, I, D>(&mut self, iter: I)
2371 where
2372 I: IntoIterator<Item = (&'a str, D)>,
2373 D: Borrow<Datum<'a>>,
2374 {
2375 self.push_dict_with(|packer| {
2376 for (k, v) in iter {
2377 packer.push(Datum::String(k));
2378 packer.push(*v.borrow())
2379 }
2380 })
2381 }
2382
2383 pub fn push_range<'a>(&mut self, mut range: Range<Datum<'a>>) -> Result<(), InvalidRangeError> {
2399 range.canonicalize()?;
2400 match range.inner {
2401 None => {
2402 self.row.data.push(Tag::Range.into());
2403 self.row.data.push(range::InternalFlags::EMPTY.bits());
2405 Ok(())
2406 }
2407 Some(inner) => self.push_range_with(
2408 RangeLowerBound {
2409 inclusive: inner.lower.inclusive,
2410 bound: inner
2411 .lower
2412 .bound
2413 .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2414 },
2415 RangeUpperBound {
2416 inclusive: inner.upper.inclusive,
2417 bound: inner
2418 .upper
2419 .bound
2420 .map(|value| move |row: &mut RowPacker| Ok(row.push(value))),
2421 },
2422 ),
2423 }
2424 }
2425
2426 pub fn push_range_with<L, U, E>(
2449 &mut self,
2450 lower: RangeLowerBound<L>,
2451 upper: RangeUpperBound<U>,
2452 ) -> Result<(), E>
2453 where
2454 L: FnOnce(&mut RowPacker) -> Result<(), E>,
2455 U: FnOnce(&mut RowPacker) -> Result<(), E>,
2456 E: From<InvalidRangeError>,
2457 {
2458 let start = self.row.data.len();
2459 self.row.data.push(Tag::Range.into());
2460
2461 let mut flags = range::InternalFlags::empty();
2462
2463 flags.set(range::InternalFlags::LB_INFINITE, lower.bound.is_none());
2464 flags.set(range::InternalFlags::UB_INFINITE, upper.bound.is_none());
2465 flags.set(range::InternalFlags::LB_INCLUSIVE, lower.inclusive);
2466 flags.set(range::InternalFlags::UB_INCLUSIVE, upper.inclusive);
2467
2468 let mut expected_datums = 0;
2469
2470 self.row.data.push(flags.bits());
2471
2472 let datum_check = self.row.data.len();
2473
2474 if let Some(value) = lower.bound {
2475 let start = self.row.data.len();
2476 value(self)?;
2477 assert!(
2478 start < self.row.data.len(),
2479 "finite values must each push exactly one value; expected 1 but got 0"
2480 );
2481 expected_datums += 1;
2482 }
2483
2484 if let Some(value) = upper.bound {
2485 let start = self.row.data.len();
2486 value(self)?;
2487 assert!(
2488 start < self.row.data.len(),
2489 "finite values must each push exactly one value; expected 1 but got 0"
2490 );
2491 expected_datums += 1;
2492 }
2493
2494 let mut actual_datums = 0;
2498 let mut seen = None;
2499 let mut dataz = &self.row.data[datum_check..];
2500 while !dataz.is_empty() {
2501 let d = unsafe { read_datum(&mut dataz) };
2502 assert!(d != Datum::Null, "cannot push Datum::Null into range");
2503
2504 match seen {
2505 None => seen = Some(d),
2506 Some(seen) => {
2507 let seen_kind = DatumKind::from(seen);
2508 let d_kind = DatumKind::from(d);
2509 assert!(
2510 seen_kind == d_kind,
2511 "range contains inconsistent data; expected {seen_kind:?} but got {d_kind:?}"
2512 );
2513
2514 if seen > d {
2515 self.row.data.truncate(start);
2516 return Err(InvalidRangeError::MisorderedRangeBounds.into());
2517 }
2518 }
2519 }
2520 actual_datums += 1;
2521 }
2522
2523 assert!(
2524 actual_datums == expected_datums,
2525 "finite values must each push exactly one value; expected {expected_datums} but got {actual_datums}"
2526 );
2527
2528 Ok(())
2529 }
2530
2531 pub fn clear(&mut self) {
2533 self.row.data.clear();
2534 }
2535
2536 pub unsafe fn truncate(&mut self, pos: usize) {
2549 self.row.data.truncate(pos)
2550 }
2551
2552 pub fn truncate_datums(&mut self, n: usize) {
2554 let prev_len = self.row.data.len();
2555 let mut iter = self.row.iter();
2556 for _ in iter.by_ref().take(n) {}
2557 let next_len = iter.data.len();
2558 unsafe { self.truncate(prev_len - next_len) }
2560 }
2561
2562 pub fn byte_len(&self) -> usize {
2564 self.row.byte_len()
2565 }
2566}
2567
2568impl<'a> IntoIterator for &'a Row {
2569 type Item = Datum<'a>;
2570 type IntoIter = DatumListIter<'a>;
2571 fn into_iter(self) -> DatumListIter<'a> {
2572 self.iter()
2573 }
2574}
2575
2576impl fmt::Debug for Row {
2577 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2579 f.write_str("Row{")?;
2580 f.debug_list().entries(self.iter()).finish()?;
2581 f.write_str("}")
2582 }
2583}
2584
2585impl fmt::Display for Row {
2586 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2588 f.write_str("(")?;
2589 for (i, datum) in self.iter().enumerate() {
2590 if i != 0 {
2591 f.write_str(", ")?;
2592 }
2593 write!(f, "{}", datum)?;
2594 }
2595 f.write_str(")")
2596 }
2597}
2598
2599impl<'a> DatumList<'a> {
2600 pub fn empty() -> DatumList<'static> {
2601 DatumList { data: &[] }
2602 }
2603
2604 pub fn iter(&self) -> DatumListIter<'a> {
2605 DatumListIter { data: self.data }
2606 }
2607
2608 pub fn data(&self) -> &'a [u8] {
2610 self.data
2611 }
2612}
2613
2614impl<'a> IntoIterator for &'a DatumList<'a> {
2615 type Item = Datum<'a>;
2616 type IntoIter = DatumListIter<'a>;
2617 fn into_iter(self) -> DatumListIter<'a> {
2618 self.iter()
2619 }
2620}
2621
2622impl<'a> Iterator for DatumListIter<'a> {
2623 type Item = Datum<'a>;
2624 fn next(&mut self) -> Option<Self::Item> {
2625 if self.data.is_empty() {
2626 None
2627 } else {
2628 Some(unsafe { read_datum(&mut self.data) })
2629 }
2630 }
2631}
2632
2633impl<'a> DatumMap<'a> {
2634 pub fn empty() -> DatumMap<'static> {
2635 DatumMap { data: &[] }
2636 }
2637
2638 pub fn iter(&self) -> DatumDictIter<'a> {
2639 DatumDictIter {
2640 data: self.data,
2641 prev_key: None,
2642 }
2643 }
2644
2645 pub fn data(&self) -> &'a [u8] {
2647 self.data
2648 }
2649}
2650
2651impl<'a> Debug for DatumMap<'a> {
2652 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2653 f.debug_map().entries(self.iter()).finish()
2654 }
2655}
2656
2657impl<'a> IntoIterator for &'a DatumMap<'a> {
2658 type Item = (&'a str, Datum<'a>);
2659 type IntoIter = DatumDictIter<'a>;
2660 fn into_iter(self) -> DatumDictIter<'a> {
2661 self.iter()
2662 }
2663}
2664
2665impl<'a> Iterator for DatumDictIter<'a> {
2666 type Item = (&'a str, Datum<'a>);
2667 fn next(&mut self) -> Option<Self::Item> {
2668 if self.data.is_empty() {
2669 None
2670 } else {
2671 let key_tag =
2672 Tag::try_from_primitive(read_byte(&mut self.data)).expect("unknown row tag");
2673 assert!(
2674 key_tag == Tag::StringTiny
2675 || key_tag == Tag::StringShort
2676 || key_tag == Tag::StringLong
2677 || key_tag == Tag::StringHuge,
2678 "Dict keys must be strings, got {:?}",
2679 key_tag
2680 );
2681 let key = unsafe { read_lengthed_datum(&mut self.data, key_tag).unwrap_str() };
2682 let val = unsafe { read_datum(&mut self.data) };
2683
2684 if cfg!(debug_assertions) {
2686 if let Some(prev_key) = self.prev_key {
2687 debug_assert!(
2688 prev_key < key,
2689 "Dict keys must be unique and given in ascending order: {} came before {}",
2690 prev_key,
2691 key
2692 );
2693 }
2694 self.prev_key = Some(key);
2695 }
2696
2697 Some((key, val))
2698 }
2699 }
2700}
2701
2702impl RowArena {
2703 pub fn new() -> Self {
2704 RowArena {
2705 inner: RefCell::new(vec![]),
2706 }
2707 }
2708
2709 pub fn with_capacity(capacity: usize) -> Self {
2712 RowArena {
2713 inner: RefCell::new(Vec::with_capacity(capacity)),
2714 }
2715 }
2716
2717 pub fn reserve(&self, additional: usize) {
2720 self.inner.borrow_mut().reserve(additional);
2721 }
2722
2723 #[allow(clippy::transmute_ptr_to_ptr)]
2725 pub fn push_bytes<'a>(&'a self, bytes: Vec<u8>) -> &'a [u8] {
2726 let mut inner = self.inner.borrow_mut();
2727 inner.push(bytes);
2728 let owned_bytes = &inner[inner.len() - 1];
2729 unsafe {
2730 transmute::<&[u8], &'a [u8]>(owned_bytes)
2739 }
2740 }
2741
2742 pub fn push_string<'a>(&'a self, string: String) -> &'a str {
2744 let owned_bytes = self.push_bytes(string.into_bytes());
2745 unsafe {
2746 std::str::from_utf8_unchecked(owned_bytes)
2748 }
2749 }
2750
2751 pub fn push_unary_row<'a>(&'a self, row: Row) -> Datum<'a> {
2757 let mut inner = self.inner.borrow_mut();
2758 inner.push(row.data.into_vec());
2759 unsafe {
2760 let datum = read_datum(&mut &inner[inner.len() - 1][..]);
2770 transmute::<Datum<'_>, Datum<'a>>(datum)
2771 }
2772 }
2773
2774 fn push_unary_row_datum_nested<'a>(&'a self, row: Row) -> DatumNested<'a> {
2777 let mut inner = self.inner.borrow_mut();
2778 inner.push(row.data.into_vec());
2779 unsafe {
2780 let nested = DatumNested::extract(&mut &inner[inner.len() - 1][..]);
2790 transmute::<DatumNested<'_>, DatumNested<'a>>(nested)
2791 }
2792 }
2793
2794 pub fn make_datum<'a, F>(&'a self, f: F) -> Datum<'a>
2806 where
2807 F: FnOnce(&mut RowPacker),
2808 {
2809 let mut row = Row::default();
2810 f(&mut row.packer());
2811 self.push_unary_row(row)
2812 }
2813
2814 pub fn make_datum_nested<'a, F>(&'a self, f: F) -> DatumNested<'a>
2817 where
2818 F: FnOnce(&mut RowPacker),
2819 {
2820 let mut row = Row::default();
2821 f(&mut row.packer());
2822 self.push_unary_row_datum_nested(row)
2823 }
2824
2825 pub fn try_make_datum<'a, F, E>(&'a self, f: F) -> Result<Datum<'a>, E>
2827 where
2828 F: FnOnce(&mut RowPacker) -> Result<(), E>,
2829 {
2830 let mut row = Row::default();
2831 f(&mut row.packer())?;
2832 Ok(self.push_unary_row(row))
2833 }
2834
2835 pub fn clear(&mut self) {
2837 self.inner.borrow_mut().clear();
2838 }
2839}
2840
2841impl Default for RowArena {
2842 fn default() -> RowArena {
2843 RowArena::new()
2844 }
2845}
2846
2847#[derive(Debug)]
2865pub struct SharedRow(Row);
2866
2867impl SharedRow {
2868 thread_local! {
2869 static SHARED_ROW: Cell<Option<Row>> = const { Cell::new(Some(Row::empty())) }
2874 }
2875
2876 pub fn get() -> Self {
2884 let mut row = Self::SHARED_ROW
2885 .take()
2886 .expect("attempted to borrow already borrowed SharedRow");
2887 row.packer();
2889 Self(row)
2890 }
2891
2892 pub fn pack<'a, I, D>(iter: I) -> Row
2894 where
2895 I: IntoIterator<Item = D>,
2896 D: Borrow<Datum<'a>>,
2897 {
2898 let mut row_builder = Self::get();
2899 let mut row_packer = row_builder.packer();
2900 row_packer.extend(iter);
2901 row_builder.clone()
2902 }
2903}
2904
2905impl std::ops::Deref for SharedRow {
2906 type Target = Row;
2907
2908 fn deref(&self) -> &Self::Target {
2909 &self.0
2910 }
2911}
2912
2913impl std::ops::DerefMut for SharedRow {
2914 fn deref_mut(&mut self) -> &mut Self::Target {
2915 &mut self.0
2916 }
2917}
2918
2919impl Drop for SharedRow {
2920 fn drop(&mut self) {
2921 Self::SHARED_ROW.set(Some(std::mem::take(&mut self.0)))
2924 }
2925}
2926
2927#[cfg(test)]
2928mod tests {
2929 use chrono::{DateTime, NaiveDate};
2930 use mz_ore::{assert_err, assert_none};
2931
2932 use crate::ScalarType;
2933
2934 use super::*;
2935
2936 #[mz_ore::test]
2937 fn test_assumptions() {
2938 assert_eq!(size_of::<Tag>(), 1);
2939 #[cfg(target_endian = "big")]
2940 {
2941 assert!(false);
2943 }
2944 }
2945
2946 #[mz_ore::test]
2947 fn miri_test_arena() {
2948 let arena = RowArena::new();
2949
2950 assert_eq!(arena.push_string("".to_owned()), "");
2951 assert_eq!(arena.push_string("العَرَبِيَّة".to_owned()), "العَرَبِيَّة");
2952
2953 let empty: &[u8] = &[];
2954 assert_eq!(arena.push_bytes(vec![]), empty);
2955 assert_eq!(arena.push_bytes(vec![0, 2, 1, 255]), &[0, 2, 1, 255]);
2956
2957 let mut row = Row::default();
2958 let mut packer = row.packer();
2959 packer.push_dict_with(|row| {
2960 row.push(Datum::String("a"));
2961 row.push_list_with(|row| {
2962 row.push(Datum::String("one"));
2963 row.push(Datum::String("two"));
2964 row.push(Datum::String("three"));
2965 });
2966 row.push(Datum::String("b"));
2967 row.push(Datum::String("c"));
2968 });
2969 assert_eq!(arena.push_unary_row(row.clone()), row.unpack_first());
2970 }
2971
2972 #[mz_ore::test]
2973 fn miri_test_round_trip() {
2974 fn round_trip(datums: Vec<Datum>) {
2975 let row = Row::pack(datums.clone());
2976
2977 println!("{:?}", row.data());
2980
2981 let datums2 = row.iter().collect::<Vec<_>>();
2982 let datums3 = row.unpack();
2983 assert_eq!(datums, datums2);
2984 assert_eq!(datums, datums3);
2985 }
2986
2987 round_trip(vec![]);
2988 round_trip(
2989 ScalarType::enumerate()
2990 .iter()
2991 .flat_map(|r#type| r#type.interesting_datums())
2992 .collect(),
2993 );
2994 round_trip(vec![
2995 Datum::Null,
2996 Datum::Null,
2997 Datum::False,
2998 Datum::True,
2999 Datum::Int16(-21),
3000 Datum::Int32(-42),
3001 Datum::Int64(-2_147_483_648 - 42),
3002 Datum::UInt8(0),
3003 Datum::UInt8(1),
3004 Datum::UInt16(0),
3005 Datum::UInt16(1),
3006 Datum::UInt16(1 << 8),
3007 Datum::UInt32(0),
3008 Datum::UInt32(1),
3009 Datum::UInt32(1 << 8),
3010 Datum::UInt32(1 << 16),
3011 Datum::UInt32(1 << 24),
3012 Datum::UInt64(0),
3013 Datum::UInt64(1),
3014 Datum::UInt64(1 << 8),
3015 Datum::UInt64(1 << 16),
3016 Datum::UInt64(1 << 24),
3017 Datum::UInt64(1 << 32),
3018 Datum::UInt64(1 << 40),
3019 Datum::UInt64(1 << 48),
3020 Datum::UInt64(1 << 56),
3021 Datum::Float32(OrderedFloat::from(-42.12)),
3022 Datum::Float64(OrderedFloat::from(-2_147_483_648.0 - 42.12)),
3023 Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
3024 Datum::Timestamp(
3025 CheckedTimestamp::from_timestamplike(
3026 NaiveDate::from_isoywd_opt(2019, 30, chrono::Weekday::Wed)
3027 .unwrap()
3028 .and_hms_opt(14, 32, 11)
3029 .unwrap(),
3030 )
3031 .unwrap(),
3032 ),
3033 Datum::TimestampTz(
3034 CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(61, 0).unwrap())
3035 .unwrap(),
3036 ),
3037 Datum::Interval(Interval {
3038 months: 312,
3039 ..Default::default()
3040 }),
3041 Datum::Interval(Interval::new(0, 0, 1_012_312)),
3042 Datum::Bytes(&[]),
3043 Datum::Bytes(&[0, 2, 1, 255]),
3044 Datum::String(""),
3045 Datum::String("العَرَبِيَّة"),
3046 ]);
3047 }
3048
3049 #[mz_ore::test]
3050 fn test_array() {
3051 const DIM: ArrayDimension = ArrayDimension {
3054 lower_bound: 2,
3055 length: 2,
3056 };
3057 let mut row = Row::default();
3058 let mut packer = row.packer();
3059 packer
3060 .try_push_array(&[DIM], vec![Datum::Int32(1), Datum::Int32(2)])
3061 .unwrap();
3062 let arr1 = row.unpack_first().unwrap_array();
3063 assert_eq!(arr1.dims().into_iter().collect::<Vec<_>>(), vec![DIM]);
3064 assert_eq!(
3065 arr1.elements().into_iter().collect::<Vec<_>>(),
3066 vec![Datum::Int32(1), Datum::Int32(2)]
3067 );
3068
3069 let row = Row::pack_slice(&[Datum::Array(arr1)]);
3072 let arr2 = row.unpack_first().unwrap_array();
3073 assert_eq!(arr1, arr2);
3074 }
3075
3076 #[mz_ore::test]
3077 fn test_multidimensional_array() {
3078 let datums = vec![
3079 Datum::Int32(1),
3080 Datum::Int32(2),
3081 Datum::Int32(3),
3082 Datum::Int32(4),
3083 Datum::Int32(5),
3084 Datum::Int32(6),
3085 Datum::Int32(7),
3086 Datum::Int32(8),
3087 ];
3088
3089 let mut row = Row::default();
3090 let mut packer = row.packer();
3091 packer
3092 .try_push_array(
3093 &[
3094 ArrayDimension {
3095 lower_bound: 1,
3096 length: 1,
3097 },
3098 ArrayDimension {
3099 lower_bound: 1,
3100 length: 4,
3101 },
3102 ArrayDimension {
3103 lower_bound: 1,
3104 length: 2,
3105 },
3106 ],
3107 &datums,
3108 )
3109 .unwrap();
3110 let array = row.unpack_first().unwrap_array();
3111 assert_eq!(array.elements().into_iter().collect::<Vec<_>>(), datums);
3112 }
3113
3114 #[mz_ore::test]
3115 fn test_array_max_dimensions() {
3116 let mut row = Row::default();
3117 let max_dims = usize::from(MAX_ARRAY_DIMENSIONS);
3118
3119 let res = row.packer().try_push_array(
3121 &vec![
3122 ArrayDimension {
3123 lower_bound: 1,
3124 length: 1
3125 };
3126 max_dims + 1
3127 ],
3128 vec![Datum::Int32(4)],
3129 );
3130 assert_eq!(res, Err(InvalidArrayError::TooManyDimensions(max_dims + 1)));
3131 assert!(row.data.is_empty());
3132
3133 row.packer()
3136 .try_push_array(
3137 &vec![
3138 ArrayDimension {
3139 lower_bound: 1,
3140 length: 1
3141 };
3142 max_dims
3143 ],
3144 vec![Datum::Int32(4)],
3145 )
3146 .unwrap();
3147 }
3148
3149 #[mz_ore::test]
3150 fn test_array_wrong_cardinality() {
3151 let mut row = Row::default();
3152 let res = row.packer().try_push_array(
3153 &[
3154 ArrayDimension {
3155 lower_bound: 1,
3156 length: 2,
3157 },
3158 ArrayDimension {
3159 lower_bound: 1,
3160 length: 3,
3161 },
3162 ],
3163 vec![Datum::Int32(1), Datum::Int32(2)],
3164 );
3165 assert_eq!(
3166 res,
3167 Err(InvalidArrayError::WrongCardinality {
3168 actual: 2,
3169 expected: 6,
3170 })
3171 );
3172 assert!(row.data.is_empty());
3173 }
3174
3175 #[mz_ore::test]
3176 fn test_nesting() {
3177 let mut row = Row::default();
3178 row.packer().push_dict_with(|row| {
3179 row.push(Datum::String("favourites"));
3180 row.push_list_with(|row| {
3181 row.push(Datum::String("ice cream"));
3182 row.push(Datum::String("oreos"));
3183 row.push(Datum::String("cheesecake"));
3184 });
3185 row.push(Datum::String("name"));
3186 row.push(Datum::String("bob"));
3187 });
3188
3189 let mut iter = row.unpack_first().unwrap_map().iter();
3190
3191 let (k, v) = iter.next().unwrap();
3192 assert_eq!(k, "favourites");
3193 assert_eq!(
3194 v.unwrap_list().iter().collect::<Vec<_>>(),
3195 vec![
3196 Datum::String("ice cream"),
3197 Datum::String("oreos"),
3198 Datum::String("cheesecake"),
3199 ]
3200 );
3201
3202 let (k, v) = iter.next().unwrap();
3203 assert_eq!(k, "name");
3204 assert_eq!(v, Datum::String("bob"));
3205 }
3206
3207 #[mz_ore::test]
3208 fn test_dict_errors() -> Result<(), Box<dyn std::error::Error>> {
3209 let pack = |ok| {
3210 let mut row = Row::default();
3211 row.packer().push_dict_with(|row| {
3212 if ok {
3213 row.push(Datum::String("key"));
3214 row.push(Datum::Int32(42));
3215 Ok(7)
3216 } else {
3217 Err("fail")
3218 }
3219 })?;
3220 Ok(row)
3221 };
3222
3223 assert_eq!(pack(false), Err("fail"));
3224
3225 let row = pack(true)?;
3226 let mut dict = row.unpack_first().unwrap_map().iter();
3227 assert_eq!(dict.next(), Some(("key", Datum::Int32(42))));
3228 assert_eq!(dict.next(), None);
3229
3230 Ok(())
3231 }
3232
3233 #[mz_ore::test]
3234 #[cfg_attr(miri, ignore)] fn test_datum_sizes() {
3236 let arena = RowArena::new();
3237
3238 let values_of_interest = vec![
3240 Datum::Null,
3241 Datum::False,
3242 Datum::Int16(0),
3243 Datum::Int32(0),
3244 Datum::Int64(0),
3245 Datum::UInt8(0),
3246 Datum::UInt8(1),
3247 Datum::UInt16(0),
3248 Datum::UInt16(1),
3249 Datum::UInt16(1 << 8),
3250 Datum::UInt32(0),
3251 Datum::UInt32(1),
3252 Datum::UInt32(1 << 8),
3253 Datum::UInt32(1 << 16),
3254 Datum::UInt32(1 << 24),
3255 Datum::UInt64(0),
3256 Datum::UInt64(1),
3257 Datum::UInt64(1 << 8),
3258 Datum::UInt64(1 << 16),
3259 Datum::UInt64(1 << 24),
3260 Datum::UInt64(1 << 32),
3261 Datum::UInt64(1 << 40),
3262 Datum::UInt64(1 << 48),
3263 Datum::UInt64(1 << 56),
3264 Datum::Float32(OrderedFloat(0.0)),
3265 Datum::Float64(OrderedFloat(0.0)),
3266 Datum::from(numeric::Numeric::from(0)),
3267 Datum::from(numeric::Numeric::from(1000)),
3268 Datum::from(numeric::Numeric::from(9999)),
3269 Datum::Date(
3270 NaiveDate::from_ymd_opt(1, 1, 1)
3271 .unwrap()
3272 .try_into()
3273 .unwrap(),
3274 ),
3275 Datum::Timestamp(
3276 CheckedTimestamp::from_timestamplike(
3277 DateTime::from_timestamp(0, 0).unwrap().naive_utc(),
3278 )
3279 .unwrap(),
3280 ),
3281 Datum::TimestampTz(
3282 CheckedTimestamp::from_timestamplike(DateTime::from_timestamp(0, 0).unwrap())
3283 .unwrap(),
3284 ),
3285 Datum::Interval(Interval::default()),
3286 Datum::Bytes(&[]),
3287 Datum::String(""),
3288 Datum::JsonNull,
3289 Datum::Range(Range { inner: None }),
3290 arena.make_datum(|packer| {
3291 packer
3292 .push_range(Range::new(Some((
3293 RangeLowerBound::new(Datum::Int32(-1), true),
3294 RangeUpperBound::new(Datum::Int32(1), true),
3295 ))))
3296 .unwrap();
3297 }),
3298 ];
3299 for value in values_of_interest {
3300 if datum_size(&value) != Row::pack_slice(&[value]).data.len() {
3301 panic!("Disparity in claimed size for {:?}", value);
3302 }
3303 }
3304 }
3305
3306 #[mz_ore::test]
3307 fn test_range_errors() {
3308 fn test_range_errors_inner<'a>(
3309 datums: Vec<Vec<Datum<'a>>>,
3310 ) -> Result<(), InvalidRangeError> {
3311 let mut row = Row::default();
3312 let row_len = row.byte_len();
3313 let mut packer = row.packer();
3314 let r = packer.push_range_with(
3315 RangeLowerBound {
3316 inclusive: true,
3317 bound: Some(|row: &mut RowPacker| {
3318 for d in &datums[0] {
3319 row.push(d);
3320 }
3321 Ok(())
3322 }),
3323 },
3324 RangeUpperBound {
3325 inclusive: true,
3326 bound: Some(|row: &mut RowPacker| {
3327 for d in &datums[1] {
3328 row.push(d);
3329 }
3330 Ok(())
3331 }),
3332 },
3333 );
3334
3335 assert_eq!(row_len, row.byte_len());
3336
3337 r
3338 }
3339
3340 for panicking_case in [
3341 vec![vec![Datum::Int32(1)], vec![]],
3342 vec![
3343 vec![Datum::Int32(1), Datum::Int32(2)],
3344 vec![Datum::Int32(3)],
3345 ],
3346 vec![
3347 vec![Datum::Int32(1)],
3348 vec![Datum::Int32(2), Datum::Int32(3)],
3349 ],
3350 vec![vec![Datum::Int32(1), Datum::Int32(2)], vec![]],
3351 vec![vec![Datum::Int32(1)], vec![Datum::UInt16(2)]],
3352 vec![vec![Datum::Null], vec![Datum::Int32(2)]],
3353 vec![vec![Datum::Int32(1)], vec![Datum::Null]],
3354 ] {
3355 #[allow(clippy::disallowed_methods)] let result = std::panic::catch_unwind(|| test_range_errors_inner(panicking_case));
3357 assert_err!(result);
3358 }
3359
3360 let e = test_range_errors_inner(vec![vec![Datum::Int32(2)], vec![Datum::Int32(1)]]);
3361 assert_eq!(e, Err(InvalidRangeError::MisorderedRangeBounds));
3362 }
3363
3364 #[mz_ore::test]
3366 #[cfg_attr(miri, ignore)] fn test_list_encoding() {
3368 fn test_list_encoding_inner(len: usize) {
3369 let list_elem = |i: usize| {
3370 if i % 2 == 0 {
3371 Datum::False
3372 } else {
3373 Datum::True
3374 }
3375 };
3376 let mut row = Row::default();
3377 {
3378 let mut packer = row.packer();
3380 packer.push(Datum::String("start"));
3381 packer.push_list_with(|packer| {
3382 for i in 0..len {
3383 packer.push(list_elem(i));
3384 }
3385 });
3386 packer.push(Datum::String("end"));
3387 }
3388 let mut row_it = row.iter();
3390 assert_eq!(row_it.next().unwrap(), Datum::String("start"));
3391 match row_it.next().unwrap() {
3392 Datum::List(list) => {
3393 let mut list_it = list.iter();
3394 for i in 0..len {
3395 assert_eq!(list_it.next().unwrap(), list_elem(i));
3396 }
3397 assert_none!(list_it.next());
3398 }
3399 _ => panic!("expected Datum::List"),
3400 }
3401 assert_eq!(row_it.next().unwrap(), Datum::String("end"));
3402 assert_none!(row_it.next());
3403 }
3404
3405 test_list_encoding_inner(0);
3406 test_list_encoding_inner(1);
3407 test_list_encoding_inner(10);
3408 test_list_encoding_inner(TINY - 1); test_list_encoding_inner(TINY + 1); test_list_encoding_inner(SHORT + 1); }
3415}