1pub use self::container::DatumContainer;
22pub use self::container::DatumSeq;
23pub use self::offset_opt::OffsetOptimized;
24pub use self::spines::{
25 RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher,
26 RowValBuilder, RowValSpine, ValRowBatcher, ValRowBuilder, ValRowSpine,
27};
28use differential_dataflow::trace::implementations::OffsetList;
29
30mod spines {
32 use std::rc::Rc;
33
34 use columnation::Columnation;
35 use differential_dataflow::trace::implementations::Layout;
36 use differential_dataflow::trace::implementations::Update;
37 use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
38 use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
39 use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
40 use differential_dataflow::trace::implementations::spine_fueled::Spine;
41 use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
42 use mz_repr::Row;
43 use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker, ColumnationStack};
44
45 use crate::{DatumContainer, OffsetOptimized};
46
47 type KeyValBatcher<K, V, T, D> = MergeBatcher<
50 Vec<((K, V), T, D)>,
51 ColumnationChunker<((K, V), T, D)>,
52 ColInternalMerger<(K, V), T, D>,
53 >;
54 type KeyBatcher<K, T, D> = KeyValBatcher<K, (), T, D>;
55
56 pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
57 pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
58 pub type RowRowBuilder<T, R> = RcBuilder<
59 OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, ColumnationStack<((Row, Row), T, R)>>,
60 >;
61
62 pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
63 pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
64 pub type RowValBuilder<V, T, R> = RcBuilder<
65 OrdValBuilder<RowValLayout<((Row, V), T, R)>, ColumnationStack<((Row, V), T, R)>>,
66 >;
67
68 pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
69 pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
70 pub type RowBuilder<T, R> =
71 RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, ColumnationStack<((Row, ()), T, R)>>>;
72
73 pub type ValRowSpine<K, T, R> = Spine<Rc<OrdValBatch<ValRowLayout<((K, Row), T, R)>>>>;
74 pub type ValRowBatcher<K, T, R> = KeyValBatcher<K, Row, T, R>;
75 pub type ValRowBuilder<K, T, R> = RcBuilder<
76 OrdValBuilder<ValRowLayout<((K, Row), T, R)>, ColumnationStack<((K, Row), T, R)>>,
77 >;
78
79 pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
81 phantom: std::marker::PhantomData<U>,
82 }
83 pub struct RowValLayout<U: Update<Key = Row>> {
84 phantom: std::marker::PhantomData<U>,
85 }
86 pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
87 phantom: std::marker::PhantomData<U>,
88 }
89 pub struct ValRowLayout<U: Update<Val = Row>> {
92 phantom: std::marker::PhantomData<U>,
93 }
94
95 impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
96 where
97 U::Time: Columnation,
98 U::Diff: Columnation,
99 {
100 type KeyContainer = DatumContainer;
101 type ValContainer = DatumContainer;
102 type TimeContainer = ColumnationStack<U::Time>;
103 type DiffContainer = ColumnationStack<U::Diff>;
104 type OffsetContainer = OffsetOptimized;
105 }
106 impl<U: Update<Key = Row>> Layout for RowValLayout<U>
107 where
108 U::Val: Columnation,
109 U::Time: Columnation,
110 U::Diff: Columnation,
111 {
112 type KeyContainer = DatumContainer;
113 type ValContainer = ColumnationStack<U::Val>;
114 type TimeContainer = ColumnationStack<U::Time>;
115 type DiffContainer = ColumnationStack<U::Diff>;
116 type OffsetContainer = OffsetOptimized;
117 }
118 impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
119 where
120 U::Time: Columnation,
121 U::Diff: Columnation,
122 {
123 type KeyContainer = DatumContainer;
124 type ValContainer = ColumnationStack<()>;
125 type TimeContainer = ColumnationStack<U::Time>;
126 type DiffContainer = ColumnationStack<U::Diff>;
127 type OffsetContainer = OffsetOptimized;
128 }
129 impl<U: Update<Val = Row>> Layout for ValRowLayout<U>
130 where
131 U::Key: Columnation,
132 U::Time: Columnation,
133 U::Diff: Columnation,
134 {
135 type KeyContainer = ColumnationStack<U::Key>;
136 type ValContainer = DatumContainer;
137 type TimeContainer = ColumnationStack<U::Time>;
138 type DiffContainer = ColumnationStack<U::Diff>;
139 type OffsetContainer = OffsetOptimized;
140 }
141}
142
143mod container {
145
146 use std::cmp::Ordering;
147
148 use differential_dataflow::trace::implementations::BatchContainer;
149 use timely::container::PushInto;
150
151 use mz_repr::{Datum, Row, RowPacker, read_datum};
152
153 use super::bytes_container::BytesContainer;
154
155 pub struct DatumContainer {
160 bytes: BytesContainer,
161 }
162
163 impl DatumContainer {
164 #[inline]
166 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
167 self.bytes.heap_size(callback)
168 }
169 }
170
171 impl BatchContainer for DatumContainer {
172 type Owned = Row;
173 type ReadItem<'a> = DatumSeq<'a>;
174
175 #[inline(always)]
176 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
177 item.to_row()
178 }
179
180 #[inline]
181 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
182 let mut packer = other.packer();
183 item.copy_into(&mut packer);
184 }
185
186 #[inline(always)]
187 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
188 self.bytes.push_into(item.bytes);
189 }
190
191 #[inline(always)]
192 fn push_own(&mut self, item: &Self::Owned) {
193 self.bytes.push_into(item.data());
194 }
195
196 #[inline(always)]
197 fn clear(&mut self) {
198 self.bytes.clear();
199 }
200
201 #[inline(always)]
202 fn with_capacity(size: usize) -> Self {
203 Self {
204 bytes: BytesContainer::with_capacity(size),
205 }
206 }
207
208 #[inline(always)]
209 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
210 Self {
211 bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
212 }
213 }
214
215 #[inline(always)]
216 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
217 item
218 }
219
220 #[inline(always)]
221 fn index(&self, index: usize) -> Self::ReadItem<'_> {
222 DatumSeq {
223 bytes: self.bytes.index(index),
224 }
225 }
226
227 #[inline(always)]
228 fn len(&self) -> usize {
229 self.bytes.len()
230 }
231 }
232
233 impl PushInto<Row> for DatumContainer {
234 fn push_into(&mut self, item: Row) {
235 self.push_into(&item);
236 }
237 }
238
239 impl PushInto<&Row> for DatumContainer {
240 fn push_into(&mut self, item: &Row) {
241 self.push_own(item);
242 }
243 }
244
245 impl PushInto<DatumSeq<'_>> for DatumContainer {
246 fn push_into(&mut self, item: DatumSeq<'_>) {
247 self.bytes.push_into(item.as_bytes())
248 }
249 }
250
251 #[derive(Debug)]
252 pub struct DatumSeq<'a> {
253 bytes: &'a [u8],
254 }
255
256 impl<'a> DatumSeq<'a> {
257 #[inline]
260 pub fn from_row(row: &'a Row) -> Self {
261 Self { bytes: row.data() }
262 }
263
264 #[inline]
265 pub fn copy_into(&self, row: &mut RowPacker) {
266 unsafe { row.extend_by_slice_unchecked(self.bytes) }
268 }
269 #[inline]
270 fn as_bytes(&self) -> &'a [u8] {
271 self.bytes
272 }
273 #[inline]
274 pub fn to_row(&self) -> Row {
275 unsafe { Row::from_bytes_unchecked(self.bytes) }
277 }
278 }
279
280 impl<'a> Copy for DatumSeq<'a> {}
281 impl<'a> Clone for DatumSeq<'a> {
282 #[inline(always)]
283 fn clone(&self) -> Self {
284 *self
285 }
286 }
287
288 impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
289 #[inline]
290 fn eq(&self, other: &DatumSeq<'a>) -> bool {
291 self.bytes.eq(other.bytes)
292 }
293 }
294 impl<'a> PartialEq<&Row> for DatumSeq<'a> {
295 #[inline]
296 fn eq(&self, other: &&Row) -> bool {
297 self.bytes.eq(other.data())
298 }
299 }
300 impl<'a> Eq for DatumSeq<'a> {}
301 impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
302 #[inline]
303 fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
304 Some(self.cmp(other))
305 }
306 }
307 impl<'a> Ord for DatumSeq<'a> {
308 #[inline]
309 fn cmp(&self, other: &Self) -> Ordering {
310 match self.bytes.len().cmp(&other.bytes.len()) {
311 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
312 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
313 std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
314 }
315 }
316 }
317 impl<'a> Iterator for DatumSeq<'a> {
318 type Item = Datum<'a>;
319 #[inline]
320 fn next(&mut self) -> Option<Self::Item> {
321 if self.bytes.is_empty() {
322 None
323 } else {
324 let result = unsafe { read_datum(&mut self.bytes) };
325 Some(result)
326 }
327 }
328 }
329
330 use mz_repr::fixed_length::ToDatumIter;
331 impl<'long> ToDatumIter for DatumSeq<'long> {
332 type DatumIter<'short>
333 = DatumSeq<'short>
334 where
335 Self: 'short;
336 #[inline]
337 fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
338 *self
339 }
340 }
341
342 #[cfg(test)]
343 mod tests {
344 use crate::DatumContainer;
345 use differential_dataflow::trace::implementations::BatchContainer;
346 use mz_repr::adt::date::Date;
347 use mz_repr::adt::interval::Interval;
348 use mz_repr::{Datum, Row, SqlScalarType};
349
350 #[mz_ore::test]
351 #[cfg_attr(miri, ignore)] fn test_round_trip() {
353 fn round_trip(datums: Vec<Datum>) {
354 let row = Row::pack(datums.clone());
355
356 let mut container = DatumContainer::with_capacity(row.byte_len());
357 container.push_own(&row);
358
359 println!("{:?}", container.index(0).bytes);
362
363 let datums2 = container.index(0).collect::<Vec<_>>();
364 assert_eq!(datums, datums2);
365 }
366
367 round_trip(vec![]);
368 round_trip(
369 SqlScalarType::enumerate()
370 .iter()
371 .flat_map(|r#type| r#type.interesting_datums())
372 .collect(),
373 );
374 round_trip(vec![
375 Datum::Null,
376 Datum::Null,
377 Datum::False,
378 Datum::True,
379 Datum::Int16(-21),
380 Datum::Int32(-42),
381 Datum::Int64(-2_147_483_648 - 42),
382 Datum::UInt8(0),
383 Datum::UInt8(1),
384 Datum::UInt16(0),
385 Datum::UInt16(1),
386 Datum::UInt16(1 << 8),
387 Datum::UInt32(0),
388 Datum::UInt32(1),
389 Datum::UInt32(1 << 8),
390 Datum::UInt32(1 << 16),
391 Datum::UInt32(1 << 24),
392 Datum::UInt64(0),
393 Datum::UInt64(1),
394 Datum::UInt64(1 << 8),
395 Datum::UInt64(1 << 16),
396 Datum::UInt64(1 << 24),
397 Datum::UInt64(1 << 32),
398 Datum::UInt64(1 << 40),
399 Datum::UInt64(1 << 48),
400 Datum::UInt64(1 << 56),
401 Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
402 Datum::Interval(Interval {
403 months: 312,
404 ..Default::default()
405 }),
406 Datum::Interval(Interval::new(0, 0, 1_012_312)),
407 Datum::Bytes(&[]),
408 Datum::Bytes(&[0, 2, 1, 255]),
409 Datum::String(""),
410 Datum::String("العَرَبِيَّة"),
411 ]);
412 }
413 }
414}
415
416mod bytes_container {
417
418 use differential_dataflow::trace::implementations::BatchContainer;
419 use timely::container::PushInto;
420
421 use mz_ore::region::Region;
422
423 pub struct BytesContainer {
425 length: usize,
427 batches: Vec<BytesBatch>,
428 }
429
430 impl BytesContainer {
431 #[inline]
433 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
434 callback(
436 self.batches.len() * std::mem::size_of::<BytesBatch>(),
437 self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
438 );
439 for batch in self.batches.iter() {
440 batch.offsets.heap_size(&mut callback);
441 callback(batch.storage.len(), batch.storage.capacity());
442 }
443 }
444 }
445
446 impl BatchContainer for BytesContainer {
447 type Owned = Vec<u8>;
448 type ReadItem<'a> = &'a [u8];
449
450 #[inline]
451 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
452 item.to_vec()
453 }
454
455 #[inline]
456 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
457 other.clear();
458 other.extend_from_slice(item);
459 }
460
461 #[inline(always)]
462 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
463 self.push_into(item);
464 }
465
466 #[inline(always)]
467 fn push_own(&mut self, item: &Self::Owned) {
468 self.push_into(item.as_slice())
469 }
470
471 fn clear(&mut self) {
472 self.batches.clear();
473 self.batches.push(BytesBatch::with_capacities(0, 0));
474 self.length = 0;
475 }
476
477 fn with_capacity(size: usize) -> Self {
478 Self {
479 length: 0,
480 batches: vec![BytesBatch::with_capacities(size, size)],
481 }
482 }
483
484 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
485 let mut item_cap = 1;
486 let mut byte_cap = 0;
487 for batch in cont1.batches.iter() {
488 item_cap += batch.offsets.len() - 1;
489 byte_cap += batch.storage.len();
490 }
491 for batch in cont2.batches.iter() {
492 item_cap += batch.offsets.len() - 1;
493 byte_cap += batch.storage.len();
494 }
495 Self {
496 length: 0,
497 batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
498 }
499 }
500
501 #[inline(always)]
502 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
503 item
504 }
505
506 #[inline]
507 fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
508 for batch in self.batches.iter() {
509 if index < batch.len() {
510 return batch.index(index);
511 }
512 index -= batch.len();
513 }
514 panic!("Index out of bounds");
515 }
516
517 #[inline(always)]
518 fn len(&self) -> usize {
519 self.length
520 }
521 }
522
523 impl PushInto<&[u8]> for BytesContainer {
524 #[inline]
525 fn push_into(&mut self, item: &[u8]) {
526 self.length += 1;
527 if let Some(batch) = self.batches.last_mut() {
528 let success = batch.try_push(item);
529 if !success {
530 let item_cap = 2 * batch.offsets.len();
532 let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
533 let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
534 assert!(new_batch.try_push(item));
535 self.batches.push(new_batch);
536 }
537 }
538 }
539 }
540
541 pub struct BytesBatch {
545 offsets: crate::OffsetOptimized,
546 storage: Region<u8>,
547 len: usize,
548 }
549
550 impl BytesBatch {
551 fn try_push(&mut self, slice: &[u8]) -> bool {
554 if self.storage.len() + slice.len() <= self.storage.capacity() {
555 self.storage.extend_from_slice(slice);
556 self.offsets.push_into(self.storage.len());
557 self.len += 1;
558 true
559 } else {
560 false
561 }
562 }
563 #[inline]
564 fn index(&self, index: usize) -> &[u8] {
565 let lower = self.offsets.index(index);
566 let upper = self.offsets.index(index + 1);
567 &self.storage[lower..upper]
568 }
569 #[inline(always)]
570 fn len(&self) -> usize {
571 debug_assert_eq!(self.len, self.offsets.len() - 1);
572 self.len
573 }
574
575 fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
576 let mut offsets = crate::OffsetOptimized::with_capacity(item_cap + 1);
578 offsets.push_into(0);
579 Self {
580 offsets,
581 storage: Region::new_auto(byte_cap.next_power_of_two()),
582 len: 0,
583 }
584 }
585 }
586}
587
588mod offset_opt {
589 use differential_dataflow::trace::implementations::BatchContainer;
590 use differential_dataflow::trace::implementations::OffsetList;
591 use timely::container::PushInto;
592
593 enum OffsetStride {
594 Empty,
595 Zero,
596 Striding(usize, usize),
597 Saturated(usize, usize, usize),
598 }
599
600 impl OffsetStride {
601 #[inline]
603 fn push(&mut self, item: usize) -> bool {
604 match self {
605 OffsetStride::Empty => {
606 if item == 0 {
607 *self = OffsetStride::Zero;
608 true
609 } else {
610 false
611 }
612 }
613 OffsetStride::Zero => {
614 *self = OffsetStride::Striding(item, 2);
615 true
616 }
617 OffsetStride::Striding(stride, count) => {
618 if item == *stride * *count {
619 *count += 1;
620 true
621 } else if item == *stride * (*count - 1) {
622 *self = OffsetStride::Saturated(*stride, *count, 1);
623 true
624 } else {
625 false
626 }
627 }
628 OffsetStride::Saturated(stride, count, reps) => {
629 if item == *stride * (*count - 1) {
630 *reps += 1;
631 true
632 } else {
633 false
634 }
635 }
636 }
637 }
638
639 #[inline]
640 fn index(&self, index: usize) -> usize {
641 match self {
642 OffsetStride::Empty => {
643 panic!("Empty OffsetStride")
644 }
645 OffsetStride::Zero => 0,
646 OffsetStride::Striding(stride, _steps) => *stride * index,
647 OffsetStride::Saturated(stride, steps, _reps) => {
648 if index < *steps {
649 *stride * index
650 } else {
651 *stride * (*steps - 1)
652 }
653 }
654 }
655 }
656
657 #[inline]
658 fn len(&self) -> usize {
659 match self {
660 OffsetStride::Empty => 0,
661 OffsetStride::Zero => 1,
662 OffsetStride::Striding(_stride, steps) => *steps,
663 OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
664 }
665 }
666 }
667
668 pub struct OffsetOptimized {
669 strided: OffsetStride,
670 spilled: OffsetList,
671 }
672
673 impl BatchContainer for OffsetOptimized {
674 type Owned = usize;
675 type ReadItem<'a> = usize;
676
677 #[inline]
678 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
679 item
680 }
681
682 #[inline]
683 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
684 self.push_into(item)
685 }
686
687 #[inline]
688 fn push_own(&mut self, item: &Self::Owned) {
689 self.push_into(*item)
690 }
691
692 fn clear(&mut self) {
693 self.strided = OffsetStride::Empty;
694 self.spilled.clear();
695 }
696
697 fn with_capacity(_size: usize) -> Self {
698 Self {
699 strided: OffsetStride::Empty,
700 spilled: OffsetList::with_capacity(0),
701 }
702 }
703
704 fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
705 Self {
706 strided: OffsetStride::Empty,
707 spilled: OffsetList::with_capacity(0),
708 }
709 }
710
711 #[inline]
712 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
713 item
714 }
715
716 #[inline]
717 fn index(&self, index: usize) -> Self::ReadItem<'_> {
718 if index < self.strided.len() {
719 self.strided.index(index)
720 } else {
721 self.spilled.index(index - self.strided.len())
722 }
723 }
724
725 #[inline]
726 fn len(&self) -> usize {
727 self.strided.len() + self.spilled.len()
728 }
729 }
730
731 impl PushInto<usize> for OffsetOptimized {
732 #[inline]
733 fn push_into(&mut self, item: usize) {
734 if !self.spilled.is_empty() {
735 self.spilled.push(item);
736 } else {
737 let inserted = self.strided.push(item);
738 if !inserted {
739 self.spilled.push(item);
740 }
741 }
742 }
743 }
744
745 impl OffsetOptimized {
746 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
747 crate::offset_list_size(&self.spilled, callback);
748 }
749 }
750}
751
752#[inline]
754pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
755 #[inline(always)]
758 fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
759 let size_of_t = std::mem::size_of::<T>();
760 callback(data.len() * size_of_t, data.capacity() * size_of_t);
761 }
762
763 vec_size(&data.smol, &mut callback);
764 vec_size(&data.chonk, callback);
765}