1pub use self::container::DatumContainer;
22pub use self::container::DatumSeq;
23pub use self::offset_opt::OffsetOptimized;
24pub use self::spines::{
25 RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowColPagedBuilder, RowRowSpine,
26 RowSpine, RowValBatcher, RowValBuilder, RowValSpine, ValRowBatcher, ValRowBuilder, ValRowSpine,
27};
28use differential_dataflow::trace::implementations::OffsetList;
29
30mod spines {
32 use std::rc::Rc;
33
34 use columnation::Columnation;
35 use differential_dataflow::trace::implementations::Layout;
36 use differential_dataflow::trace::implementations::Update;
37 use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
38 use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
39 use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
40 use differential_dataflow::trace::implementations::spine_fueled::Spine;
41 use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
42 use mz_repr::Row;
43 use mz_timely_util::columnar::Column;
44 use mz_timely_util::columnation::{ColInternalMerger, ColumnationStack};
45
46 use crate::{DatumContainer, OffsetOptimized};
47
48 type KeyValBatcher<K, V, T, D> = MergeBatcher<ColInternalMerger<(K, V), T, D>>;
51 type KeyBatcher<K, T, D> = KeyValBatcher<K, (), T, D>;
52
53 pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
54 pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
55 pub type RowRowBuilder<T, R> = RcBuilder<
56 OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, ColumnationStack<((Row, Row), T, R)>>,
57 >;
58
59 pub type RowRowColPagedBuilder<T, R> =
64 RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, Column<((Row, Row), T, R)>>>;
65
66 pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
67 pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
68 pub type RowValBuilder<V, T, R> = RcBuilder<
69 OrdValBuilder<RowValLayout<((Row, V), T, R)>, ColumnationStack<((Row, V), T, R)>>,
70 >;
71
72 pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
73 pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
74 pub type RowBuilder<T, R> =
75 RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, ColumnationStack<((Row, ()), T, R)>>>;
76
77 pub type ValRowSpine<K, T, R> = Spine<Rc<OrdValBatch<ValRowLayout<((K, Row), T, R)>>>>;
78 pub type ValRowBatcher<K, T, R> = KeyValBatcher<K, Row, T, R>;
79 pub type ValRowBuilder<K, T, R> = RcBuilder<
80 OrdValBuilder<ValRowLayout<((K, Row), T, R)>, ColumnationStack<((K, Row), T, R)>>,
81 >;
82
83 pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
85 phantom: std::marker::PhantomData<U>,
86 }
87 pub struct RowValLayout<U: Update<Key = Row>> {
88 phantom: std::marker::PhantomData<U>,
89 }
90 pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
91 phantom: std::marker::PhantomData<U>,
92 }
93 pub struct ValRowLayout<U: Update<Val = Row>> {
96 phantom: std::marker::PhantomData<U>,
97 }
98
99 impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
100 where
101 U::Time: Columnation,
102 U::Diff: Columnation,
103 {
104 type KeyContainer = DatumContainer;
105 type ValContainer = DatumContainer;
106 type TimeContainer = ColumnationStack<U::Time>;
107 type DiffContainer = ColumnationStack<U::Diff>;
108 type OffsetContainer = OffsetOptimized;
109 }
110 impl<U: Update<Key = Row>> Layout for RowValLayout<U>
111 where
112 U::Val: Columnation,
113 U::Time: Columnation,
114 U::Diff: Columnation,
115 {
116 type KeyContainer = DatumContainer;
117 type ValContainer = ColumnationStack<U::Val>;
118 type TimeContainer = ColumnationStack<U::Time>;
119 type DiffContainer = ColumnationStack<U::Diff>;
120 type OffsetContainer = OffsetOptimized;
121 }
122 impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
123 where
124 U::Time: Columnation,
125 U::Diff: Columnation,
126 {
127 type KeyContainer = DatumContainer;
128 type ValContainer = ColumnationStack<()>;
129 type TimeContainer = ColumnationStack<U::Time>;
130 type DiffContainer = ColumnationStack<U::Diff>;
131 type OffsetContainer = OffsetOptimized;
132 }
133 impl<U: Update<Val = Row>> Layout for ValRowLayout<U>
134 where
135 U::Key: Columnation,
136 U::Time: Columnation,
137 U::Diff: Columnation,
138 {
139 type KeyContainer = ColumnationStack<U::Key>;
140 type ValContainer = DatumContainer;
141 type TimeContainer = ColumnationStack<U::Time>;
142 type DiffContainer = ColumnationStack<U::Diff>;
143 type OffsetContainer = OffsetOptimized;
144 }
145}
146
147mod container {
149
150 use std::cmp::Ordering;
151
152 use differential_dataflow::trace::implementations::BatchContainer;
153 use timely::container::PushInto;
154
155 use mz_repr::{Datum, Row, RowPacker, RowRef, read_datum};
156
157 use super::bytes_container::BytesContainer;
158
159 pub struct DatumContainer {
164 bytes: BytesContainer,
165 }
166
167 impl DatumContainer {
168 #[inline]
170 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
171 self.bytes.heap_size(callback)
172 }
173 }
174
175 impl BatchContainer for DatumContainer {
176 type Owned = Row;
177 type ReadItem<'a> = DatumSeq<'a>;
178
179 #[inline(always)]
180 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
181 item.to_row()
182 }
183
184 #[inline]
185 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
186 let mut packer = other.packer();
187 item.copy_into(&mut packer);
188 }
189
190 #[inline(always)]
191 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
192 self.bytes.push_into(item.bytes);
193 }
194
195 #[inline(always)]
196 fn push_own(&mut self, item: &Self::Owned) {
197 self.bytes.push_into(item.data());
198 }
199
200 #[inline(always)]
201 fn clear(&mut self) {
202 self.bytes.clear();
203 }
204
205 #[inline(always)]
206 fn with_capacity(size: usize) -> Self {
207 Self {
208 bytes: BytesContainer::with_capacity(size),
209 }
210 }
211
212 #[inline(always)]
213 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
214 Self {
215 bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
216 }
217 }
218
219 #[inline(always)]
220 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
221 item
222 }
223
224 #[inline(always)]
225 fn index(&self, index: usize) -> Self::ReadItem<'_> {
226 DatumSeq {
227 bytes: self.bytes.index(index),
228 }
229 }
230
231 #[inline(always)]
232 fn len(&self) -> usize {
233 self.bytes.len()
234 }
235 }
236
237 impl PushInto<Row> for DatumContainer {
238 fn push_into(&mut self, item: Row) {
239 self.push_into(&item);
240 }
241 }
242
243 impl PushInto<&Row> for DatumContainer {
244 fn push_into(&mut self, item: &Row) {
245 self.push_own(item);
246 }
247 }
248
249 impl PushInto<DatumSeq<'_>> for DatumContainer {
250 fn push_into(&mut self, item: DatumSeq<'_>) {
251 self.bytes.push_into(item.as_bytes())
252 }
253 }
254
255 impl PushInto<&RowRef> for DatumContainer {
256 fn push_into(&mut self, item: &RowRef) {
257 self.bytes.push_into(item.data())
258 }
259 }
260
261 #[derive(Debug)]
262 pub struct DatumSeq<'a> {
263 bytes: &'a [u8],
264 }
265
266 impl<'a> DatumSeq<'a> {
267 #[inline]
270 pub fn from_row(row: &'a Row) -> Self {
271 Self { bytes: row.data() }
272 }
273
274 #[inline]
275 pub fn copy_into(&self, row: &mut RowPacker) {
276 unsafe { row.extend_by_slice_unchecked(self.bytes) }
278 }
279 #[inline]
280 fn as_bytes(&self) -> &'a [u8] {
281 self.bytes
282 }
283 #[inline]
284 pub fn to_row(&self) -> Row {
285 unsafe { Row::from_bytes_unchecked(self.bytes) }
287 }
288 }
289
290 impl<'a> Copy for DatumSeq<'a> {}
291 impl<'a> Clone for DatumSeq<'a> {
292 #[inline(always)]
293 fn clone(&self) -> Self {
294 *self
295 }
296 }
297
298 impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
299 #[inline]
300 fn eq(&self, other: &DatumSeq<'a>) -> bool {
301 self.bytes.eq(other.bytes)
302 }
303 }
304 impl<'a> PartialEq<&Row> for DatumSeq<'a> {
305 #[inline]
306 fn eq(&self, other: &&Row) -> bool {
307 self.bytes.eq(other.data())
308 }
309 }
310 impl<'a> PartialEq<&RowRef> for DatumSeq<'a> {
311 #[inline]
312 fn eq(&self, other: &&RowRef) -> bool {
313 self.bytes.eq(other.data())
314 }
315 }
316 impl<'a> Eq for DatumSeq<'a> {}
317 impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
318 #[inline]
319 fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
320 Some(self.cmp(other))
321 }
322 }
323 impl<'a> Ord for DatumSeq<'a> {
324 #[inline]
325 fn cmp(&self, other: &Self) -> Ordering {
326 match self.bytes.len().cmp(&other.bytes.len()) {
327 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
328 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
329 std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
330 }
331 }
332 }
333 impl<'a> Iterator for DatumSeq<'a> {
334 type Item = Datum<'a>;
335 #[inline]
336 fn next(&mut self) -> Option<Self::Item> {
337 if self.bytes.is_empty() {
338 None
339 } else {
340 let result = unsafe { read_datum(&mut self.bytes) };
341 Some(result)
342 }
343 }
344 }
345
346 use mz_repr::fixed_length::ToDatumIter;
347 impl<'long> ToDatumIter for DatumSeq<'long> {
348 type DatumIter<'short>
349 = DatumSeq<'short>
350 where
351 Self: 'short;
352 #[inline]
353 fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
354 *self
355 }
356 }
357
358 #[cfg(test)]
359 mod tests {
360 use crate::DatumContainer;
361 use differential_dataflow::trace::implementations::BatchContainer;
362 use mz_repr::adt::date::Date;
363 use mz_repr::adt::interval::Interval;
364 use mz_repr::{Datum, Row, SqlScalarType};
365
366 #[mz_ore::test]
367 #[cfg_attr(miri, ignore)] fn test_round_trip() {
369 fn round_trip(datums: Vec<Datum>) {
370 let row = Row::pack(datums.clone());
371
372 let mut container = DatumContainer::with_capacity(row.byte_len());
373 container.push_own(&row);
374
375 println!("{:?}", container.index(0).bytes);
378
379 let datums2 = container.index(0).collect::<Vec<_>>();
380 assert_eq!(datums, datums2);
381 }
382
383 round_trip(vec![]);
384 round_trip(
385 SqlScalarType::enumerate()
386 .iter()
387 .flat_map(|r#type| r#type.interesting_datums())
388 .collect(),
389 );
390 round_trip(vec![
391 Datum::Null,
392 Datum::Null,
393 Datum::False,
394 Datum::True,
395 Datum::Int16(-21),
396 Datum::Int32(-42),
397 Datum::Int64(-2_147_483_648 - 42),
398 Datum::UInt8(0),
399 Datum::UInt8(1),
400 Datum::UInt16(0),
401 Datum::UInt16(1),
402 Datum::UInt16(1 << 8),
403 Datum::UInt32(0),
404 Datum::UInt32(1),
405 Datum::UInt32(1 << 8),
406 Datum::UInt32(1 << 16),
407 Datum::UInt32(1 << 24),
408 Datum::UInt64(0),
409 Datum::UInt64(1),
410 Datum::UInt64(1 << 8),
411 Datum::UInt64(1 << 16),
412 Datum::UInt64(1 << 24),
413 Datum::UInt64(1 << 32),
414 Datum::UInt64(1 << 40),
415 Datum::UInt64(1 << 48),
416 Datum::UInt64(1 << 56),
417 Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
418 Datum::Interval(Interval {
419 months: 312,
420 ..Default::default()
421 }),
422 Datum::Interval(Interval::new(0, 0, 1_012_312)),
423 Datum::Bytes(&[]),
424 Datum::Bytes(&[0, 2, 1, 255]),
425 Datum::String(""),
426 Datum::String("العَرَبِيَّة"),
427 ]);
428 }
429 }
430}
431
432mod bytes_container {
433
434 use differential_dataflow::trace::implementations::BatchContainer;
435 use timely::container::PushInto;
436
437 use mz_ore::region::Region;
438
439 pub struct BytesContainer {
441 length: usize,
443 batches: Vec<BytesBatch>,
444 }
445
446 impl BytesContainer {
447 #[inline]
449 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
450 callback(
452 self.batches.len() * std::mem::size_of::<BytesBatch>(),
453 self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
454 );
455 for batch in self.batches.iter() {
456 batch.offsets.heap_size(&mut callback);
457 callback(batch.storage.len(), batch.storage.capacity());
458 }
459 }
460 }
461
462 impl BatchContainer for BytesContainer {
463 type Owned = Vec<u8>;
464 type ReadItem<'a> = &'a [u8];
465
466 #[inline]
467 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
468 item.to_vec()
469 }
470
471 #[inline]
472 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
473 other.clear();
474 other.extend_from_slice(item);
475 }
476
477 #[inline(always)]
478 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
479 self.push_into(item);
480 }
481
482 #[inline(always)]
483 fn push_own(&mut self, item: &Self::Owned) {
484 self.push_into(item.as_slice())
485 }
486
487 fn clear(&mut self) {
488 self.batches.clear();
489 self.batches.push(BytesBatch::with_capacities(0, 0));
490 self.length = 0;
491 }
492
493 fn with_capacity(size: usize) -> Self {
494 Self {
495 length: 0,
496 batches: vec![BytesBatch::with_capacities(size, size)],
497 }
498 }
499
500 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
501 let mut item_cap = 1;
502 let mut byte_cap = 0;
503 for batch in cont1.batches.iter() {
504 item_cap += batch.offsets.len() - 1;
505 byte_cap += batch.storage.len();
506 }
507 for batch in cont2.batches.iter() {
508 item_cap += batch.offsets.len() - 1;
509 byte_cap += batch.storage.len();
510 }
511 Self {
512 length: 0,
513 batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
514 }
515 }
516
517 #[inline(always)]
518 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
519 item
520 }
521
522 #[inline]
523 fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
524 for batch in self.batches.iter() {
525 if index < batch.len() {
526 return batch.index(index);
527 }
528 index -= batch.len();
529 }
530 panic!("Index out of bounds");
531 }
532
533 #[inline(always)]
534 fn len(&self) -> usize {
535 self.length
536 }
537 }
538
539 impl PushInto<&[u8]> for BytesContainer {
540 #[inline]
541 fn push_into(&mut self, item: &[u8]) {
542 self.length += 1;
543 if let Some(batch) = self.batches.last_mut() {
544 let success = batch.try_push(item);
545 if !success {
546 let item_cap = 2 * batch.offsets.len();
548 let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
549 let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
550 assert!(new_batch.try_push(item));
551 self.batches.push(new_batch);
552 }
553 }
554 }
555 }
556
557 pub struct BytesBatch {
561 offsets: crate::OffsetOptimized,
562 storage: Region<u8>,
563 len: usize,
564 }
565
566 impl BytesBatch {
567 fn try_push(&mut self, slice: &[u8]) -> bool {
570 if self.storage.len() + slice.len() <= self.storage.capacity() {
571 self.storage.extend_from_slice(slice);
572 self.offsets.push_into(self.storage.len());
573 self.len += 1;
574 true
575 } else {
576 false
577 }
578 }
579 #[inline]
580 fn index(&self, index: usize) -> &[u8] {
581 let lower = self.offsets.index(index);
582 let upper = self.offsets.index(index + 1);
583 &self.storage[lower..upper]
584 }
585 #[inline(always)]
586 fn len(&self) -> usize {
587 debug_assert_eq!(self.len, self.offsets.len() - 1);
588 self.len
589 }
590
591 fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
592 let mut offsets = crate::OffsetOptimized::with_capacity(item_cap + 1);
594 offsets.push_into(0);
595 Self {
596 offsets,
597 storage: Region::new_auto(byte_cap.next_power_of_two()),
598 len: 0,
599 }
600 }
601 }
602}
603
604mod offset_opt {
605 use differential_dataflow::trace::implementations::BatchContainer;
606 use differential_dataflow::trace::implementations::OffsetList;
607 use timely::container::PushInto;
608
609 enum OffsetStride {
610 Empty,
611 Zero,
612 Striding(usize, usize),
613 Saturated(usize, usize, usize),
614 }
615
616 impl OffsetStride {
617 #[inline]
619 fn push(&mut self, item: usize) -> bool {
620 match self {
621 OffsetStride::Empty => {
622 if item == 0 {
623 *self = OffsetStride::Zero;
624 true
625 } else {
626 false
627 }
628 }
629 OffsetStride::Zero => {
630 *self = OffsetStride::Striding(item, 2);
631 true
632 }
633 OffsetStride::Striding(stride, count) => {
634 if item == *stride * *count {
635 *count += 1;
636 true
637 } else if item == *stride * (*count - 1) {
638 *self = OffsetStride::Saturated(*stride, *count, 1);
639 true
640 } else {
641 false
642 }
643 }
644 OffsetStride::Saturated(stride, count, reps) => {
645 if item == *stride * (*count - 1) {
646 *reps += 1;
647 true
648 } else {
649 false
650 }
651 }
652 }
653 }
654
655 #[inline]
656 fn index(&self, index: usize) -> usize {
657 match self {
658 OffsetStride::Empty => {
659 panic!("Empty OffsetStride")
660 }
661 OffsetStride::Zero => 0,
662 OffsetStride::Striding(stride, _steps) => *stride * index,
663 OffsetStride::Saturated(stride, steps, _reps) => {
664 if index < *steps {
665 *stride * index
666 } else {
667 *stride * (*steps - 1)
668 }
669 }
670 }
671 }
672
673 #[inline]
674 fn len(&self) -> usize {
675 match self {
676 OffsetStride::Empty => 0,
677 OffsetStride::Zero => 1,
678 OffsetStride::Striding(_stride, steps) => *steps,
679 OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
680 }
681 }
682 }
683
684 pub struct OffsetOptimized {
685 strided: OffsetStride,
686 spilled: OffsetList,
687 }
688
689 impl BatchContainer for OffsetOptimized {
690 type Owned = usize;
691 type ReadItem<'a> = usize;
692
693 #[inline]
694 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
695 item
696 }
697
698 #[inline]
699 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
700 self.push_into(item)
701 }
702
703 #[inline]
704 fn push_own(&mut self, item: &Self::Owned) {
705 self.push_into(*item)
706 }
707
708 fn clear(&mut self) {
709 self.strided = OffsetStride::Empty;
710 self.spilled.clear();
711 }
712
713 fn with_capacity(_size: usize) -> Self {
714 Self {
715 strided: OffsetStride::Empty,
716 spilled: OffsetList::with_capacity(0),
717 }
718 }
719
720 fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
721 Self {
722 strided: OffsetStride::Empty,
723 spilled: OffsetList::with_capacity(0),
724 }
725 }
726
727 #[inline]
728 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
729 item
730 }
731
732 #[inline]
733 fn index(&self, index: usize) -> Self::ReadItem<'_> {
734 if index < self.strided.len() {
735 self.strided.index(index)
736 } else {
737 self.spilled.index(index - self.strided.len())
738 }
739 }
740
741 #[inline]
742 fn len(&self) -> usize {
743 self.strided.len() + self.spilled.len()
744 }
745 }
746
747 impl PushInto<usize> for OffsetOptimized {
748 #[inline]
749 fn push_into(&mut self, item: usize) {
750 if !self.spilled.is_empty() {
751 self.spilled.push(item);
752 } else {
753 let inserted = self.strided.push(item);
754 if !inserted {
755 self.spilled.push(item);
756 }
757 }
758 }
759 }
760
761 impl OffsetOptimized {
762 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
763 crate::offset_list_size(&self.spilled, callback);
764 }
765 }
766}
767
768#[inline]
770pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
771 #[inline(always)]
774 fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
775 let size_of_t = std::mem::size_of::<T>();
776 callback(data.len() * size_of_t, data.capacity() * size_of_t);
777 }
778
779 vec_size(&data.smol, &mut callback);
780 vec_size(&data.chonk, callback);
781}