1pub use self::container::DatumContainer;
11pub use self::container::DatumSeq;
12pub use self::offset_opt::OffsetOptimized;
13pub use self::spines::{
14 RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher,
15 RowValBuilder, RowValSpine,
16};
17use differential_dataflow::trace::implementations::OffsetList;
18
19mod spines {
21 use std::rc::Rc;
22
23 use differential_dataflow::containers::{Columnation, TimelyStack};
24 use differential_dataflow::trace::implementations::Layout;
25 use differential_dataflow::trace::implementations::Update;
26 use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
27 use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
28 use differential_dataflow::trace::implementations::spine_fueled::Spine;
29 use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
30 use mz_repr::Row;
31
32 use crate::row_spine::{DatumContainer, OffsetOptimized};
33 use crate::typedefs::{KeyBatcher, KeyValBatcher};
34
35 pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
36 pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
37 pub type RowRowBuilder<T, R> =
38 RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, TimelyStack<((Row, Row), T, R)>>>;
39
40 pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
41 pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
42 pub type RowValBuilder<V, T, R> =
43 RcBuilder<OrdValBuilder<RowValLayout<((Row, V), T, R)>, TimelyStack<((Row, V), T, R)>>>;
44
45 pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
46 pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
47 pub type RowBuilder<T, R> =
48 RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, TimelyStack<((Row, ()), T, R)>>>;
49
50 pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
52 phantom: std::marker::PhantomData<U>,
53 }
54 pub struct RowValLayout<U: Update<Key = Row>> {
55 phantom: std::marker::PhantomData<U>,
56 }
57 pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
58 phantom: std::marker::PhantomData<U>,
59 }
60
61 impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
62 where
63 U::Time: Columnation,
64 U::Diff: Columnation,
65 {
66 type KeyContainer = DatumContainer;
67 type ValContainer = DatumContainer;
68 type TimeContainer = TimelyStack<U::Time>;
69 type DiffContainer = TimelyStack<U::Diff>;
70 type OffsetContainer = OffsetOptimized;
71 }
72 impl<U: Update<Key = Row>> Layout for RowValLayout<U>
73 where
74 U::Val: Columnation,
75 U::Time: Columnation,
76 U::Diff: Columnation,
77 {
78 type KeyContainer = DatumContainer;
79 type ValContainer = TimelyStack<U::Val>;
80 type TimeContainer = TimelyStack<U::Time>;
81 type DiffContainer = TimelyStack<U::Diff>;
82 type OffsetContainer = OffsetOptimized;
83 }
84 impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
85 where
86 U::Time: Columnation,
87 U::Diff: Columnation,
88 {
89 type KeyContainer = DatumContainer;
90 type ValContainer = TimelyStack<()>;
91 type TimeContainer = TimelyStack<U::Time>;
92 type DiffContainer = TimelyStack<U::Diff>;
93 type OffsetContainer = OffsetOptimized;
94 }
95}
96
97mod container {
99
100 use std::cmp::Ordering;
101
102 use differential_dataflow::trace::implementations::BatchContainer;
103 use timely::container::PushInto;
104
105 use mz_repr::{Datum, Row, RowPacker, read_datum};
106
107 use super::bytes_container::BytesContainer;
108
109 pub struct DatumContainer {
114 bytes: BytesContainer,
115 }
116
117 impl DatumContainer {
118 #[inline]
120 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
121 self.bytes.heap_size(callback)
122 }
123 }
124
125 impl BatchContainer for DatumContainer {
126 type Owned = Row;
127 type ReadItem<'a> = DatumSeq<'a>;
128
129 #[inline(always)]
130 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
131 item.to_row()
132 }
133
134 #[inline]
135 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
136 let mut packer = other.packer();
137 item.copy_into(&mut packer);
138 }
139
140 #[inline(always)]
141 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
142 self.bytes.push_into(item.bytes);
143 }
144
145 #[inline(always)]
146 fn push_own(&mut self, item: &Self::Owned) {
147 self.bytes.push_into(item.data());
148 }
149
150 #[inline(always)]
151 fn clear(&mut self) {
152 self.bytes.clear();
153 }
154
155 #[inline(always)]
156 fn with_capacity(size: usize) -> Self {
157 Self {
158 bytes: BytesContainer::with_capacity(size),
159 }
160 }
161
162 #[inline(always)]
163 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
164 Self {
165 bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
166 }
167 }
168
169 #[inline(always)]
170 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
171 item
172 }
173
174 #[inline(always)]
175 fn index(&self, index: usize) -> Self::ReadItem<'_> {
176 DatumSeq {
177 bytes: self.bytes.index(index),
178 }
179 }
180
181 #[inline(always)]
182 fn len(&self) -> usize {
183 self.bytes.len()
184 }
185 }
186
187 impl PushInto<Row> for DatumContainer {
188 fn push_into(&mut self, item: Row) {
189 self.push_into(&item);
190 }
191 }
192
193 impl PushInto<&Row> for DatumContainer {
194 fn push_into(&mut self, item: &Row) {
195 self.push_own(item);
196 }
197 }
198
199 impl PushInto<DatumSeq<'_>> for DatumContainer {
200 fn push_into(&mut self, item: DatumSeq<'_>) {
201 self.bytes.push_into(item.as_bytes())
202 }
203 }
204
205 #[derive(Debug)]
206 pub struct DatumSeq<'a> {
207 bytes: &'a [u8],
208 }
209
210 impl<'a> DatumSeq<'a> {
211 #[inline]
212 pub fn copy_into(&self, row: &mut RowPacker) {
213 unsafe { row.extend_by_slice_unchecked(self.bytes) }
215 }
216 #[inline]
217 fn as_bytes(&self) -> &'a [u8] {
218 self.bytes
219 }
220 #[inline]
221 pub fn to_row(&self) -> Row {
222 unsafe { Row::from_bytes_unchecked(self.bytes) }
224 }
225 }
226
227 impl<'a> Copy for DatumSeq<'a> {}
228 impl<'a> Clone for DatumSeq<'a> {
229 #[inline(always)]
230 fn clone(&self) -> Self {
231 *self
232 }
233 }
234
235 impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
236 #[inline]
237 fn eq(&self, other: &DatumSeq<'a>) -> bool {
238 self.bytes.eq(other.bytes)
239 }
240 }
241 impl<'a> PartialEq<&Row> for DatumSeq<'a> {
242 #[inline]
243 fn eq(&self, other: &&Row) -> bool {
244 self.bytes.eq(other.data())
245 }
246 }
247 impl<'a> Eq for DatumSeq<'a> {}
248 impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
249 #[inline]
250 fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
251 Some(self.cmp(other))
252 }
253 }
254 impl<'a> Ord for DatumSeq<'a> {
255 #[inline]
256 fn cmp(&self, other: &Self) -> Ordering {
257 match self.bytes.len().cmp(&other.bytes.len()) {
258 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
259 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
260 std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
261 }
262 }
263 }
264 impl<'a> Iterator for DatumSeq<'a> {
265 type Item = Datum<'a>;
266 #[inline]
267 fn next(&mut self) -> Option<Self::Item> {
268 if self.bytes.is_empty() {
269 None
270 } else {
271 let result = unsafe { read_datum(&mut self.bytes) };
272 Some(result)
273 }
274 }
275 }
276
277 use mz_repr::fixed_length::ToDatumIter;
278 impl<'long> ToDatumIter for DatumSeq<'long> {
279 type DatumIter<'short>
280 = DatumSeq<'short>
281 where
282 Self: 'short;
283 #[inline]
284 fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
285 *self
286 }
287 }
288
289 #[cfg(test)]
290 mod tests {
291 use crate::row_spine::DatumContainer;
292 use differential_dataflow::trace::implementations::BatchContainer;
293 use mz_repr::adt::date::Date;
294 use mz_repr::adt::interval::Interval;
295 use mz_repr::{Datum, Row, SqlScalarType};
296
297 #[mz_ore::test]
298 #[cfg_attr(miri, ignore)] fn test_round_trip() {
300 fn round_trip(datums: Vec<Datum>) {
301 let row = Row::pack(datums.clone());
302
303 let mut container = DatumContainer::with_capacity(row.byte_len());
304 container.push_own(&row);
305
306 println!("{:?}", container.index(0).bytes);
309
310 let datums2 = container.index(0).collect::<Vec<_>>();
311 assert_eq!(datums, datums2);
312 }
313
314 round_trip(vec![]);
315 round_trip(
316 SqlScalarType::enumerate()
317 .iter()
318 .flat_map(|r#type| r#type.interesting_datums())
319 .collect(),
320 );
321 round_trip(vec![
322 Datum::Null,
323 Datum::Null,
324 Datum::False,
325 Datum::True,
326 Datum::Int16(-21),
327 Datum::Int32(-42),
328 Datum::Int64(-2_147_483_648 - 42),
329 Datum::UInt8(0),
330 Datum::UInt8(1),
331 Datum::UInt16(0),
332 Datum::UInt16(1),
333 Datum::UInt16(1 << 8),
334 Datum::UInt32(0),
335 Datum::UInt32(1),
336 Datum::UInt32(1 << 8),
337 Datum::UInt32(1 << 16),
338 Datum::UInt32(1 << 24),
339 Datum::UInt64(0),
340 Datum::UInt64(1),
341 Datum::UInt64(1 << 8),
342 Datum::UInt64(1 << 16),
343 Datum::UInt64(1 << 24),
344 Datum::UInt64(1 << 32),
345 Datum::UInt64(1 << 40),
346 Datum::UInt64(1 << 48),
347 Datum::UInt64(1 << 56),
348 Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
349 Datum::Interval(Interval {
350 months: 312,
351 ..Default::default()
352 }),
353 Datum::Interval(Interval::new(0, 0, 1_012_312)),
354 Datum::Bytes(&[]),
355 Datum::Bytes(&[0, 2, 1, 255]),
356 Datum::String(""),
357 Datum::String("العَرَبِيَّة"),
358 ]);
359 }
360 }
361}
362
363mod bytes_container {
364
365 use differential_dataflow::trace::implementations::BatchContainer;
366 use timely::container::PushInto;
367
368 use mz_ore::region::Region;
369
370 pub struct BytesContainer {
372 length: usize,
374 batches: Vec<BytesBatch>,
375 }
376
377 impl BytesContainer {
378 #[inline]
380 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
381 callback(
383 self.batches.len() * std::mem::size_of::<BytesBatch>(),
384 self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
385 );
386 for batch in self.batches.iter() {
387 batch.offsets.heap_size(&mut callback);
388 callback(batch.storage.len(), batch.storage.capacity());
389 }
390 }
391 }
392
393 impl BatchContainer for BytesContainer {
394 type Owned = Vec<u8>;
395 type ReadItem<'a> = &'a [u8];
396
397 #[inline]
398 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
399 item.to_vec()
400 }
401
402 #[inline]
403 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
404 other.clear();
405 other.extend_from_slice(item);
406 }
407
408 #[inline(always)]
409 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
410 self.push_into(item);
411 }
412
413 #[inline(always)]
414 fn push_own(&mut self, item: &Self::Owned) {
415 self.push_into(item.as_slice())
416 }
417
418 fn clear(&mut self) {
419 self.batches.clear();
420 self.batches.push(BytesBatch::with_capacities(0, 0));
421 self.length = 0;
422 }
423
424 fn with_capacity(size: usize) -> Self {
425 Self {
426 length: 0,
427 batches: vec![BytesBatch::with_capacities(size, size)],
428 }
429 }
430
431 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
432 let mut item_cap = 1;
433 let mut byte_cap = 0;
434 for batch in cont1.batches.iter() {
435 item_cap += batch.offsets.len() - 1;
436 byte_cap += batch.storage.len();
437 }
438 for batch in cont2.batches.iter() {
439 item_cap += batch.offsets.len() - 1;
440 byte_cap += batch.storage.len();
441 }
442 Self {
443 length: 0,
444 batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
445 }
446 }
447
448 #[inline(always)]
449 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
450 item
451 }
452
453 #[inline]
454 fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
455 for batch in self.batches.iter() {
456 if index < batch.len() {
457 return batch.index(index);
458 }
459 index -= batch.len();
460 }
461 panic!("Index out of bounds");
462 }
463
464 #[inline(always)]
465 fn len(&self) -> usize {
466 self.length
467 }
468 }
469
470 impl PushInto<&[u8]> for BytesContainer {
471 #[inline]
472 fn push_into(&mut self, item: &[u8]) {
473 self.length += 1;
474 if let Some(batch) = self.batches.last_mut() {
475 let success = batch.try_push(item);
476 if !success {
477 let item_cap = 2 * batch.offsets.len();
479 let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
480 let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
481 assert!(new_batch.try_push(item));
482 self.batches.push(new_batch);
483 }
484 }
485 }
486 }
487
488 pub struct BytesBatch {
492 offsets: crate::row_spine::OffsetOptimized,
493 storage: Region<u8>,
494 len: usize,
495 }
496
497 impl BytesBatch {
498 fn try_push(&mut self, slice: &[u8]) -> bool {
501 if self.storage.len() + slice.len() <= self.storage.capacity() {
502 self.storage.extend_from_slice(slice);
503 self.offsets.push_into(self.storage.len());
504 self.len += 1;
505 true
506 } else {
507 false
508 }
509 }
510 fn index(&self, index: usize) -> &[u8] {
511 let lower = self.offsets.index(index);
512 let upper = self.offsets.index(index + 1);
513 &self.storage[lower..upper]
514 }
515 #[inline(always)]
516 fn len(&self) -> usize {
517 debug_assert_eq!(self.len, self.offsets.len() - 1);
518 self.len
519 }
520
521 fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
522 let mut offsets = crate::row_spine::OffsetOptimized::with_capacity(item_cap + 1);
524 offsets.push_into(0);
525 Self {
526 offsets,
527 storage: Region::new_auto(byte_cap.next_power_of_two()),
528 len: 0,
529 }
530 }
531 }
532}
533
534mod offset_opt {
535 use differential_dataflow::trace::implementations::BatchContainer;
536 use differential_dataflow::trace::implementations::OffsetList;
537 use timely::container::PushInto;
538
539 enum OffsetStride {
540 Empty,
541 Zero,
542 Striding(usize, usize),
543 Saturated(usize, usize, usize),
544 }
545
546 impl OffsetStride {
547 #[inline]
549 fn push(&mut self, item: usize) -> bool {
550 match self {
551 OffsetStride::Empty => {
552 if item == 0 {
553 *self = OffsetStride::Zero;
554 true
555 } else {
556 false
557 }
558 }
559 OffsetStride::Zero => {
560 *self = OffsetStride::Striding(item, 2);
561 true
562 }
563 OffsetStride::Striding(stride, count) => {
564 if item == *stride * *count {
565 *count += 1;
566 true
567 } else if item == *stride * (*count - 1) {
568 *self = OffsetStride::Saturated(*stride, *count, 1);
569 true
570 } else {
571 false
572 }
573 }
574 OffsetStride::Saturated(stride, count, reps) => {
575 if item == *stride * (*count - 1) {
576 *reps += 1;
577 true
578 } else {
579 false
580 }
581 }
582 }
583 }
584
585 #[inline]
586 fn index(&self, index: usize) -> usize {
587 match self {
588 OffsetStride::Empty => {
589 panic!("Empty OffsetStride")
590 }
591 OffsetStride::Zero => 0,
592 OffsetStride::Striding(stride, _steps) => *stride * index,
593 OffsetStride::Saturated(stride, steps, _reps) => {
594 if index < *steps {
595 *stride * index
596 } else {
597 *stride * (*steps - 1)
598 }
599 }
600 }
601 }
602
603 #[inline]
604 fn len(&self) -> usize {
605 match self {
606 OffsetStride::Empty => 0,
607 OffsetStride::Zero => 1,
608 OffsetStride::Striding(_stride, steps) => *steps,
609 OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
610 }
611 }
612 }
613
614 pub struct OffsetOptimized {
615 strided: OffsetStride,
616 spilled: OffsetList,
617 }
618
619 impl BatchContainer for OffsetOptimized {
620 type Owned = usize;
621 type ReadItem<'a> = usize;
622
623 #[inline]
624 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
625 item
626 }
627
628 #[inline]
629 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
630 self.push_into(item)
631 }
632
633 #[inline]
634 fn push_own(&mut self, item: &Self::Owned) {
635 self.push_into(*item)
636 }
637
638 fn clear(&mut self) {
639 self.strided = OffsetStride::Empty;
640 self.spilled.clear();
641 }
642
643 fn with_capacity(_size: usize) -> Self {
644 Self {
645 strided: OffsetStride::Empty,
646 spilled: OffsetList::with_capacity(0),
647 }
648 }
649
650 fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
651 Self {
652 strided: OffsetStride::Empty,
653 spilled: OffsetList::with_capacity(0),
654 }
655 }
656
657 #[inline]
658 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
659 item
660 }
661
662 #[inline]
663 fn index(&self, index: usize) -> Self::ReadItem<'_> {
664 if index < self.strided.len() {
665 self.strided.index(index)
666 } else {
667 self.spilled.index(index - self.strided.len())
668 }
669 }
670
671 #[inline]
672 fn len(&self) -> usize {
673 self.strided.len() + self.spilled.len()
674 }
675 }
676
677 impl PushInto<usize> for OffsetOptimized {
678 #[inline]
679 fn push_into(&mut self, item: usize) {
680 if !self.spilled.is_empty() {
681 self.spilled.push(item);
682 } else {
683 let inserted = self.strided.push(item);
684 if !inserted {
685 self.spilled.push(item);
686 }
687 }
688 }
689 }
690
691 impl OffsetOptimized {
692 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
693 crate::row_spine::offset_list_size(&self.spilled, callback);
694 }
695 }
696}
697
698#[inline]
700pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
701 #[inline(always)]
704 fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
705 let size_of_t = std::mem::size_of::<T>();
706 callback(data.len() * size_of_t, data.capacity() * size_of_t);
707 }
708
709 vec_size(&data.smol, &mut callback);
710 vec_size(&data.chonk, callback);
711}