1pub use self::container::DatumContainer;
11pub use self::container::DatumSeq;
12pub use self::offset_opt::OffsetOptimized;
13pub use self::spines::{
14 RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher,
15 RowValBuilder, RowValSpine,
16};
17use differential_dataflow::trace::implementations::OffsetList;
18
19mod spines {
21 use std::rc::Rc;
22
23 use columnation::Columnation;
24 use differential_dataflow::trace::implementations::Layout;
25 use differential_dataflow::trace::implementations::Update;
26 use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
27 use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
28 use differential_dataflow::trace::implementations::spine_fueled::Spine;
29 use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
30 use mz_repr::Row;
31 use mz_timely_util::columnation::ColumnationStack;
32
33 use crate::row_spine::{DatumContainer, OffsetOptimized};
34 use crate::typedefs::{KeyBatcher, KeyValBatcher};
35
36 pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
37 pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
38 pub type RowRowBuilder<T, R> = RcBuilder<
39 OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, ColumnationStack<((Row, Row), T, R)>>,
40 >;
41
42 pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
43 pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
44 pub type RowValBuilder<V, T, R> = RcBuilder<
45 OrdValBuilder<RowValLayout<((Row, V), T, R)>, ColumnationStack<((Row, V), T, R)>>,
46 >;
47
48 pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
49 pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
50 pub type RowBuilder<T, R> =
51 RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, ColumnationStack<((Row, ()), T, R)>>>;
52
53 pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
55 phantom: std::marker::PhantomData<U>,
56 }
57 pub struct RowValLayout<U: Update<Key = Row>> {
58 phantom: std::marker::PhantomData<U>,
59 }
60 pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
61 phantom: std::marker::PhantomData<U>,
62 }
63
64 impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
65 where
66 U::Time: Columnation,
67 U::Diff: Columnation,
68 {
69 type KeyContainer = DatumContainer;
70 type ValContainer = DatumContainer;
71 type TimeContainer = ColumnationStack<U::Time>;
72 type DiffContainer = ColumnationStack<U::Diff>;
73 type OffsetContainer = OffsetOptimized;
74 }
75 impl<U: Update<Key = Row>> Layout for RowValLayout<U>
76 where
77 U::Val: Columnation,
78 U::Time: Columnation,
79 U::Diff: Columnation,
80 {
81 type KeyContainer = DatumContainer;
82 type ValContainer = ColumnationStack<U::Val>;
83 type TimeContainer = ColumnationStack<U::Time>;
84 type DiffContainer = ColumnationStack<U::Diff>;
85 type OffsetContainer = OffsetOptimized;
86 }
87 impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
88 where
89 U::Time: Columnation,
90 U::Diff: Columnation,
91 {
92 type KeyContainer = DatumContainer;
93 type ValContainer = ColumnationStack<()>;
94 type TimeContainer = ColumnationStack<U::Time>;
95 type DiffContainer = ColumnationStack<U::Diff>;
96 type OffsetContainer = OffsetOptimized;
97 }
98}
99
100mod container {
102
103 use std::cmp::Ordering;
104
105 use differential_dataflow::trace::implementations::BatchContainer;
106 use timely::container::PushInto;
107
108 use mz_repr::{Datum, Row, RowPacker, read_datum};
109
110 use super::bytes_container::BytesContainer;
111
112 pub struct DatumContainer {
117 bytes: BytesContainer,
118 }
119
120 impl DatumContainer {
121 #[inline]
123 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
124 self.bytes.heap_size(callback)
125 }
126 }
127
128 impl BatchContainer for DatumContainer {
129 type Owned = Row;
130 type ReadItem<'a> = DatumSeq<'a>;
131
132 #[inline(always)]
133 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
134 item.to_row()
135 }
136
137 #[inline]
138 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
139 let mut packer = other.packer();
140 item.copy_into(&mut packer);
141 }
142
143 #[inline(always)]
144 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
145 self.bytes.push_into(item.bytes);
146 }
147
148 #[inline(always)]
149 fn push_own(&mut self, item: &Self::Owned) {
150 self.bytes.push_into(item.data());
151 }
152
153 #[inline(always)]
154 fn clear(&mut self) {
155 self.bytes.clear();
156 }
157
158 #[inline(always)]
159 fn with_capacity(size: usize) -> Self {
160 Self {
161 bytes: BytesContainer::with_capacity(size),
162 }
163 }
164
165 #[inline(always)]
166 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
167 Self {
168 bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
169 }
170 }
171
172 #[inline(always)]
173 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
174 item
175 }
176
177 #[inline(always)]
178 fn index(&self, index: usize) -> Self::ReadItem<'_> {
179 DatumSeq {
180 bytes: self.bytes.index(index),
181 }
182 }
183
184 #[inline(always)]
185 fn len(&self) -> usize {
186 self.bytes.len()
187 }
188 }
189
190 impl PushInto<Row> for DatumContainer {
191 fn push_into(&mut self, item: Row) {
192 self.push_into(&item);
193 }
194 }
195
196 impl PushInto<&Row> for DatumContainer {
197 fn push_into(&mut self, item: &Row) {
198 self.push_own(item);
199 }
200 }
201
202 impl PushInto<DatumSeq<'_>> for DatumContainer {
203 fn push_into(&mut self, item: DatumSeq<'_>) {
204 self.bytes.push_into(item.as_bytes())
205 }
206 }
207
208 #[derive(Debug)]
209 pub struct DatumSeq<'a> {
210 bytes: &'a [u8],
211 }
212
213 impl<'a> DatumSeq<'a> {
214 #[inline]
215 pub fn copy_into(&self, row: &mut RowPacker) {
216 unsafe { row.extend_by_slice_unchecked(self.bytes) }
218 }
219 #[inline]
220 fn as_bytes(&self) -> &'a [u8] {
221 self.bytes
222 }
223 #[inline]
224 pub fn to_row(&self) -> Row {
225 unsafe { Row::from_bytes_unchecked(self.bytes) }
227 }
228 }
229
230 impl<'a> Copy for DatumSeq<'a> {}
231 impl<'a> Clone for DatumSeq<'a> {
232 #[inline(always)]
233 fn clone(&self) -> Self {
234 *self
235 }
236 }
237
238 impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
239 #[inline]
240 fn eq(&self, other: &DatumSeq<'a>) -> bool {
241 self.bytes.eq(other.bytes)
242 }
243 }
244 impl<'a> PartialEq<&Row> for DatumSeq<'a> {
245 #[inline]
246 fn eq(&self, other: &&Row) -> bool {
247 self.bytes.eq(other.data())
248 }
249 }
250 impl<'a> Eq for DatumSeq<'a> {}
251 impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
252 #[inline]
253 fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
254 Some(self.cmp(other))
255 }
256 }
257 impl<'a> Ord for DatumSeq<'a> {
258 #[inline]
259 fn cmp(&self, other: &Self) -> Ordering {
260 match self.bytes.len().cmp(&other.bytes.len()) {
261 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
262 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
263 std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
264 }
265 }
266 }
267 impl<'a> Iterator for DatumSeq<'a> {
268 type Item = Datum<'a>;
269 #[inline]
270 fn next(&mut self) -> Option<Self::Item> {
271 if self.bytes.is_empty() {
272 None
273 } else {
274 let result = unsafe { read_datum(&mut self.bytes) };
275 Some(result)
276 }
277 }
278 }
279
280 use mz_repr::fixed_length::ToDatumIter;
281 impl<'long> ToDatumIter for DatumSeq<'long> {
282 type DatumIter<'short>
283 = DatumSeq<'short>
284 where
285 Self: 'short;
286 #[inline]
287 fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
288 *self
289 }
290 }
291
292 #[cfg(test)]
293 mod tests {
294 use crate::row_spine::DatumContainer;
295 use differential_dataflow::trace::implementations::BatchContainer;
296 use mz_repr::adt::date::Date;
297 use mz_repr::adt::interval::Interval;
298 use mz_repr::{Datum, Row, SqlScalarType};
299
300 #[mz_ore::test]
301 #[cfg_attr(miri, ignore)] fn test_round_trip() {
303 fn round_trip(datums: Vec<Datum>) {
304 let row = Row::pack(datums.clone());
305
306 let mut container = DatumContainer::with_capacity(row.byte_len());
307 container.push_own(&row);
308
309 println!("{:?}", container.index(0).bytes);
312
313 let datums2 = container.index(0).collect::<Vec<_>>();
314 assert_eq!(datums, datums2);
315 }
316
317 round_trip(vec![]);
318 round_trip(
319 SqlScalarType::enumerate()
320 .iter()
321 .flat_map(|r#type| r#type.interesting_datums())
322 .collect(),
323 );
324 round_trip(vec![
325 Datum::Null,
326 Datum::Null,
327 Datum::False,
328 Datum::True,
329 Datum::Int16(-21),
330 Datum::Int32(-42),
331 Datum::Int64(-2_147_483_648 - 42),
332 Datum::UInt8(0),
333 Datum::UInt8(1),
334 Datum::UInt16(0),
335 Datum::UInt16(1),
336 Datum::UInt16(1 << 8),
337 Datum::UInt32(0),
338 Datum::UInt32(1),
339 Datum::UInt32(1 << 8),
340 Datum::UInt32(1 << 16),
341 Datum::UInt32(1 << 24),
342 Datum::UInt64(0),
343 Datum::UInt64(1),
344 Datum::UInt64(1 << 8),
345 Datum::UInt64(1 << 16),
346 Datum::UInt64(1 << 24),
347 Datum::UInt64(1 << 32),
348 Datum::UInt64(1 << 40),
349 Datum::UInt64(1 << 48),
350 Datum::UInt64(1 << 56),
351 Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
352 Datum::Interval(Interval {
353 months: 312,
354 ..Default::default()
355 }),
356 Datum::Interval(Interval::new(0, 0, 1_012_312)),
357 Datum::Bytes(&[]),
358 Datum::Bytes(&[0, 2, 1, 255]),
359 Datum::String(""),
360 Datum::String("العَرَبِيَّة"),
361 ]);
362 }
363 }
364}
365
366mod bytes_container {
367
368 use differential_dataflow::trace::implementations::BatchContainer;
369 use timely::container::PushInto;
370
371 use mz_ore::region::Region;
372
373 pub struct BytesContainer {
375 length: usize,
377 batches: Vec<BytesBatch>,
378 }
379
380 impl BytesContainer {
381 #[inline]
383 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
384 callback(
386 self.batches.len() * std::mem::size_of::<BytesBatch>(),
387 self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
388 );
389 for batch in self.batches.iter() {
390 batch.offsets.heap_size(&mut callback);
391 callback(batch.storage.len(), batch.storage.capacity());
392 }
393 }
394 }
395
396 impl BatchContainer for BytesContainer {
397 type Owned = Vec<u8>;
398 type ReadItem<'a> = &'a [u8];
399
400 #[inline]
401 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
402 item.to_vec()
403 }
404
405 #[inline]
406 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
407 other.clear();
408 other.extend_from_slice(item);
409 }
410
411 #[inline(always)]
412 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
413 self.push_into(item);
414 }
415
416 #[inline(always)]
417 fn push_own(&mut self, item: &Self::Owned) {
418 self.push_into(item.as_slice())
419 }
420
421 fn clear(&mut self) {
422 self.batches.clear();
423 self.batches.push(BytesBatch::with_capacities(0, 0));
424 self.length = 0;
425 }
426
427 fn with_capacity(size: usize) -> Self {
428 Self {
429 length: 0,
430 batches: vec![BytesBatch::with_capacities(size, size)],
431 }
432 }
433
434 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
435 let mut item_cap = 1;
436 let mut byte_cap = 0;
437 for batch in cont1.batches.iter() {
438 item_cap += batch.offsets.len() - 1;
439 byte_cap += batch.storage.len();
440 }
441 for batch in cont2.batches.iter() {
442 item_cap += batch.offsets.len() - 1;
443 byte_cap += batch.storage.len();
444 }
445 Self {
446 length: 0,
447 batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
448 }
449 }
450
451 #[inline(always)]
452 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
453 item
454 }
455
456 #[inline]
457 fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
458 for batch in self.batches.iter() {
459 if index < batch.len() {
460 return batch.index(index);
461 }
462 index -= batch.len();
463 }
464 panic!("Index out of bounds");
465 }
466
467 #[inline(always)]
468 fn len(&self) -> usize {
469 self.length
470 }
471 }
472
473 impl PushInto<&[u8]> for BytesContainer {
474 #[inline]
475 fn push_into(&mut self, item: &[u8]) {
476 self.length += 1;
477 if let Some(batch) = self.batches.last_mut() {
478 let success = batch.try_push(item);
479 if !success {
480 let item_cap = 2 * batch.offsets.len();
482 let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
483 let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
484 assert!(new_batch.try_push(item));
485 self.batches.push(new_batch);
486 }
487 }
488 }
489 }
490
491 pub struct BytesBatch {
495 offsets: crate::row_spine::OffsetOptimized,
496 storage: Region<u8>,
497 len: usize,
498 }
499
500 impl BytesBatch {
501 fn try_push(&mut self, slice: &[u8]) -> bool {
504 if self.storage.len() + slice.len() <= self.storage.capacity() {
505 self.storage.extend_from_slice(slice);
506 self.offsets.push_into(self.storage.len());
507 self.len += 1;
508 true
509 } else {
510 false
511 }
512 }
513 #[inline]
514 fn index(&self, index: usize) -> &[u8] {
515 let lower = self.offsets.index(index);
516 let upper = self.offsets.index(index + 1);
517 &self.storage[lower..upper]
518 }
519 #[inline(always)]
520 fn len(&self) -> usize {
521 debug_assert_eq!(self.len, self.offsets.len() - 1);
522 self.len
523 }
524
525 fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
526 let mut offsets = crate::row_spine::OffsetOptimized::with_capacity(item_cap + 1);
528 offsets.push_into(0);
529 Self {
530 offsets,
531 storage: Region::new_auto(byte_cap.next_power_of_two()),
532 len: 0,
533 }
534 }
535 }
536}
537
538mod offset_opt {
539 use differential_dataflow::trace::implementations::BatchContainer;
540 use differential_dataflow::trace::implementations::OffsetList;
541 use timely::container::PushInto;
542
543 enum OffsetStride {
544 Empty,
545 Zero,
546 Striding(usize, usize),
547 Saturated(usize, usize, usize),
548 }
549
550 impl OffsetStride {
551 #[inline]
553 fn push(&mut self, item: usize) -> bool {
554 match self {
555 OffsetStride::Empty => {
556 if item == 0 {
557 *self = OffsetStride::Zero;
558 true
559 } else {
560 false
561 }
562 }
563 OffsetStride::Zero => {
564 *self = OffsetStride::Striding(item, 2);
565 true
566 }
567 OffsetStride::Striding(stride, count) => {
568 if item == *stride * *count {
569 *count += 1;
570 true
571 } else if item == *stride * (*count - 1) {
572 *self = OffsetStride::Saturated(*stride, *count, 1);
573 true
574 } else {
575 false
576 }
577 }
578 OffsetStride::Saturated(stride, count, reps) => {
579 if item == *stride * (*count - 1) {
580 *reps += 1;
581 true
582 } else {
583 false
584 }
585 }
586 }
587 }
588
589 #[inline]
590 fn index(&self, index: usize) -> usize {
591 match self {
592 OffsetStride::Empty => {
593 panic!("Empty OffsetStride")
594 }
595 OffsetStride::Zero => 0,
596 OffsetStride::Striding(stride, _steps) => *stride * index,
597 OffsetStride::Saturated(stride, steps, _reps) => {
598 if index < *steps {
599 *stride * index
600 } else {
601 *stride * (*steps - 1)
602 }
603 }
604 }
605 }
606
607 #[inline]
608 fn len(&self) -> usize {
609 match self {
610 OffsetStride::Empty => 0,
611 OffsetStride::Zero => 1,
612 OffsetStride::Striding(_stride, steps) => *steps,
613 OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
614 }
615 }
616 }
617
618 pub struct OffsetOptimized {
619 strided: OffsetStride,
620 spilled: OffsetList,
621 }
622
623 impl BatchContainer for OffsetOptimized {
624 type Owned = usize;
625 type ReadItem<'a> = usize;
626
627 #[inline]
628 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
629 item
630 }
631
632 #[inline]
633 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
634 self.push_into(item)
635 }
636
637 #[inline]
638 fn push_own(&mut self, item: &Self::Owned) {
639 self.push_into(*item)
640 }
641
642 fn clear(&mut self) {
643 self.strided = OffsetStride::Empty;
644 self.spilled.clear();
645 }
646
647 fn with_capacity(_size: usize) -> Self {
648 Self {
649 strided: OffsetStride::Empty,
650 spilled: OffsetList::with_capacity(0),
651 }
652 }
653
654 fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
655 Self {
656 strided: OffsetStride::Empty,
657 spilled: OffsetList::with_capacity(0),
658 }
659 }
660
661 #[inline]
662 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
663 item
664 }
665
666 #[inline]
667 fn index(&self, index: usize) -> Self::ReadItem<'_> {
668 if index < self.strided.len() {
669 self.strided.index(index)
670 } else {
671 self.spilled.index(index - self.strided.len())
672 }
673 }
674
675 #[inline]
676 fn len(&self) -> usize {
677 self.strided.len() + self.spilled.len()
678 }
679 }
680
681 impl PushInto<usize> for OffsetOptimized {
682 #[inline]
683 fn push_into(&mut self, item: usize) {
684 if !self.spilled.is_empty() {
685 self.spilled.push(item);
686 } else {
687 let inserted = self.strided.push(item);
688 if !inserted {
689 self.spilled.push(item);
690 }
691 }
692 }
693 }
694
695 impl OffsetOptimized {
696 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
697 crate::row_spine::offset_list_size(&self.spilled, callback);
698 }
699 }
700}
701
702#[inline]
704pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
705 #[inline(always)]
708 fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
709 let size_of_t = std::mem::size_of::<T>();
710 callback(data.len() * size_of_t, data.capacity() * size_of_t);
711 }
712
713 vec_size(&data.smol, &mut callback);
714 vec_size(&data.chonk, callback);
715}