1pub use self::container::DatumContainer;
11pub use self::container::DatumSeq;
12pub use self::offset_opt::OffsetOptimized;
13pub use self::spines::{
14 RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher,
15 RowValBuilder, RowValSpine,
16};
17use differential_dataflow::trace::implementations::OffsetList;
18
19mod spines {
21 use std::rc::Rc;
22
23 use differential_dataflow::containers::{Columnation, TimelyStack};
24 use differential_dataflow::trace::implementations::Layout;
25 use differential_dataflow::trace::implementations::Update;
26 use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
27 use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
28 use differential_dataflow::trace::implementations::spine_fueled::Spine;
29 use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
30 use mz_repr::Row;
31
32 use crate::row_spine::{DatumContainer, OffsetOptimized};
33 use crate::typedefs::{KeyBatcher, KeyValBatcher};
34
35 pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
36 pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
37 pub type RowRowBuilder<T, R> =
38 RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, TimelyStack<((Row, Row), T, R)>>>;
39
40 pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
41 pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
42 pub type RowValBuilder<V, T, R> =
43 RcBuilder<OrdValBuilder<RowValLayout<((Row, V), T, R)>, TimelyStack<((Row, V), T, R)>>>;
44
45 pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
46 pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
47 pub type RowBuilder<T, R> =
48 RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, TimelyStack<((Row, ()), T, R)>>>;
49
50 pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
52 phantom: std::marker::PhantomData<U>,
53 }
54 pub struct RowValLayout<U: Update<Key = Row>> {
55 phantom: std::marker::PhantomData<U>,
56 }
57 pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
58 phantom: std::marker::PhantomData<U>,
59 }
60
61 impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
62 where
63 U::Time: Columnation,
64 U::Diff: Columnation,
65 {
66 type Target = U;
67 type KeyContainer = DatumContainer;
68 type ValContainer = DatumContainer;
69 type TimeContainer = TimelyStack<U::Time>;
70 type DiffContainer = TimelyStack<U::Diff>;
71 type OffsetContainer = OffsetOptimized;
72 }
73 impl<U: Update<Key = Row>> Layout for RowValLayout<U>
74 where
75 U::Val: Columnation,
76 U::Time: Columnation,
77 U::Diff: Columnation,
78 {
79 type Target = U;
80 type KeyContainer = DatumContainer;
81 type ValContainer = TimelyStack<U::Val>;
82 type TimeContainer = TimelyStack<U::Time>;
83 type DiffContainer = TimelyStack<U::Diff>;
84 type OffsetContainer = OffsetOptimized;
85 }
86 impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
87 where
88 U::Time: Columnation,
89 U::Diff: Columnation,
90 {
91 type Target = U;
92 type KeyContainer = DatumContainer;
93 type ValContainer = TimelyStack<()>;
94 type TimeContainer = TimelyStack<U::Time>;
95 type DiffContainer = TimelyStack<U::Diff>;
96 type OffsetContainer = OffsetOptimized;
97 }
98}
99
100mod container {
102
103 use std::cmp::Ordering;
104
105 use differential_dataflow::IntoOwned;
106 use differential_dataflow::trace::implementations::BatchContainer;
107 use timely::container::PushInto;
108
109 use mz_repr::{Datum, Row, RowPacker, read_datum};
110
111 use super::bytes_container::BytesContainer;
112
113 pub struct DatumContainer {
118 bytes: BytesContainer,
119 }
120
121 impl DatumContainer {
122 #[inline]
124 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
125 self.bytes.heap_size(callback)
126 }
127 }
128
129 impl BatchContainer for DatumContainer {
130 type Owned = Row;
131 type ReadItem<'a> = DatumSeq<'a>;
132
133 fn with_capacity(size: usize) -> Self {
134 Self {
135 bytes: BytesContainer::with_capacity(size),
136 }
137 }
138
139 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
140 Self {
141 bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
142 }
143 }
144
145 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
146 item
147 }
148
149 fn index(&self, index: usize) -> Self::ReadItem<'_> {
150 DatumSeq {
151 bytes: self.bytes.index(index),
152 }
153 }
154
155 fn len(&self) -> usize {
156 self.bytes.len()
157 }
158 }
159
160 impl PushInto<Row> for DatumContainer {
161 fn push_into(&mut self, item: Row) {
162 self.push_into(&item);
163 }
164 }
165
166 impl PushInto<&Row> for DatumContainer {
167 fn push_into(&mut self, item: &Row) {
168 let item: DatumSeq<'_> = IntoOwned::borrow_as(item);
169 self.push_into(item);
170 }
171 }
172
173 impl PushInto<DatumSeq<'_>> for DatumContainer {
174 fn push_into(&mut self, item: DatumSeq<'_>) {
175 self.bytes.push_into(item.as_bytes())
176 }
177 }
178
179 #[derive(Debug)]
180 pub struct DatumSeq<'a> {
181 bytes: &'a [u8],
182 }
183
184 impl<'a> DatumSeq<'a> {
185 pub fn copy_into(&self, row: &mut RowPacker) {
186 unsafe { row.extend_by_slice_unchecked(self.bytes) }
188 }
189 fn as_bytes(&self) -> &'a [u8] {
190 self.bytes
191 }
192 }
193
194 impl<'a> Copy for DatumSeq<'a> {}
195 impl<'a> Clone for DatumSeq<'a> {
196 fn clone(&self) -> Self {
197 *self
198 }
199 }
200
201 impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
202 fn eq(&self, other: &DatumSeq<'a>) -> bool {
203 self.bytes.eq(other.bytes)
204 }
205 }
206 impl<'a> PartialEq<&Row> for DatumSeq<'a> {
207 fn eq(&self, other: &&Row) -> bool {
208 self.bytes.eq(other.data())
209 }
210 }
211 impl<'a> Eq for DatumSeq<'a> {}
212 impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
213 fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
214 Some(self.cmp(other))
215 }
216 }
217 impl<'a> Ord for DatumSeq<'a> {
218 fn cmp(&self, other: &Self) -> Ordering {
219 match self.bytes.len().cmp(&other.bytes.len()) {
220 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
221 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
222 std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
223 }
224 }
225 }
226 impl<'a> IntoOwned<'a> for DatumSeq<'a> {
227 type Owned = Row;
228 fn into_owned(self) -> Self::Owned {
229 unsafe { Row::from_bytes_unchecked(self.bytes) }
231 }
232 fn clone_onto(self, other: &mut Self::Owned) {
233 let mut packer = other.packer();
234 self.copy_into(&mut packer);
235 }
236 fn borrow_as(other: &'a Self::Owned) -> Self {
237 Self {
238 bytes: other.data(),
239 }
240 }
241 }
242
243 impl<'a> Iterator for DatumSeq<'a> {
244 type Item = Datum<'a>;
245 fn next(&mut self) -> Option<Self::Item> {
246 if self.bytes.is_empty() {
247 None
248 } else {
249 let result = unsafe { read_datum(&mut self.bytes) };
250 Some(result)
251 }
252 }
253 }
254
255 use mz_repr::fixed_length::ToDatumIter;
256 impl<'long> ToDatumIter for DatumSeq<'long> {
257 type DatumIter<'short>
258 = DatumSeq<'short>
259 where
260 Self: 'short;
261 fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
262 *self
263 }
264 }
265
266 #[cfg(test)]
267 mod tests {
268 use crate::row_spine::DatumContainer;
269 use differential_dataflow::trace::implementations::BatchContainer;
270 use mz_repr::adt::date::Date;
271 use mz_repr::adt::interval::Interval;
272 use mz_repr::{Datum, Row, ScalarType};
273
274 #[mz_ore::test]
275 #[cfg_attr(miri, ignore)] fn test_round_trip() {
277 fn round_trip(datums: Vec<Datum>) {
278 let row = Row::pack(datums.clone());
279
280 let mut container = DatumContainer::with_capacity(row.byte_len());
281 container.push(&row);
282
283 println!("{:?}", container.index(0).bytes);
286
287 let datums2 = container.index(0).collect::<Vec<_>>();
288 assert_eq!(datums, datums2);
289 }
290
291 round_trip(vec![]);
292 round_trip(
293 ScalarType::enumerate()
294 .iter()
295 .flat_map(|r#type| r#type.interesting_datums())
296 .collect(),
297 );
298 round_trip(vec![
299 Datum::Null,
300 Datum::Null,
301 Datum::False,
302 Datum::True,
303 Datum::Int16(-21),
304 Datum::Int32(-42),
305 Datum::Int64(-2_147_483_648 - 42),
306 Datum::UInt8(0),
307 Datum::UInt8(1),
308 Datum::UInt16(0),
309 Datum::UInt16(1),
310 Datum::UInt16(1 << 8),
311 Datum::UInt32(0),
312 Datum::UInt32(1),
313 Datum::UInt32(1 << 8),
314 Datum::UInt32(1 << 16),
315 Datum::UInt32(1 << 24),
316 Datum::UInt64(0),
317 Datum::UInt64(1),
318 Datum::UInt64(1 << 8),
319 Datum::UInt64(1 << 16),
320 Datum::UInt64(1 << 24),
321 Datum::UInt64(1 << 32),
322 Datum::UInt64(1 << 40),
323 Datum::UInt64(1 << 48),
324 Datum::UInt64(1 << 56),
325 Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
326 Datum::Interval(Interval {
327 months: 312,
328 ..Default::default()
329 }),
330 Datum::Interval(Interval::new(0, 0, 1_012_312)),
331 Datum::Bytes(&[]),
332 Datum::Bytes(&[0, 2, 1, 255]),
333 Datum::String(""),
334 Datum::String("العَرَبِيَّة"),
335 ]);
336 }
337 }
338}
339
340mod bytes_container {
341
342 use differential_dataflow::trace::implementations::BatchContainer;
343 use timely::container::PushInto;
344
345 use mz_ore::region::Region;
346
347 pub struct BytesContainer {
349 length: usize,
351 batches: Vec<BytesBatch>,
352 }
353
354 impl BytesContainer {
355 #[inline]
357 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
358 callback(
360 self.batches.len() * std::mem::size_of::<BytesBatch>(),
361 self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
362 );
363 for batch in self.batches.iter() {
364 batch.offsets.heap_size(&mut callback);
365 callback(batch.storage.len(), batch.storage.capacity());
366 }
367 }
368 }
369
370 impl BatchContainer for BytesContainer {
371 type Owned = Vec<u8>;
372 type ReadItem<'a> = &'a [u8];
373
374 fn with_capacity(size: usize) -> Self {
375 Self {
376 length: 0,
377 batches: vec![BytesBatch::with_capacities(size, size)],
378 }
379 }
380
381 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
382 let mut item_cap = 1;
383 let mut byte_cap = 0;
384 for batch in cont1.batches.iter() {
385 item_cap += batch.offsets.len() - 1;
386 byte_cap += batch.storage.len();
387 }
388 for batch in cont2.batches.iter() {
389 item_cap += batch.offsets.len() - 1;
390 byte_cap += batch.storage.len();
391 }
392 Self {
393 length: 0,
394 batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
395 }
396 }
397
398 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
399 item
400 }
401
402 fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
403 for batch in self.batches.iter() {
404 if index < batch.len() {
405 return batch.index(index);
406 }
407 index -= batch.len();
408 }
409 panic!("Index out of bounds");
410 }
411
412 #[inline(always)]
413 fn len(&self) -> usize {
414 self.length
415 }
416 }
417
418 impl PushInto<&[u8]> for BytesContainer {
419 fn push_into(&mut self, item: &[u8]) {
420 self.length += 1;
421 if let Some(batch) = self.batches.last_mut() {
422 let success = batch.try_push(item);
423 if !success {
424 let item_cap = 2 * batch.offsets.len();
426 let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
427 let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
428 assert!(new_batch.try_push(item));
429 self.batches.push(new_batch);
430 }
431 }
432 }
433 }
434
435 pub struct BytesBatch {
439 offsets: crate::row_spine::OffsetOptimized,
440 storage: Region<u8>,
441 len: usize,
442 }
443
444 impl BytesBatch {
445 fn try_push(&mut self, slice: &[u8]) -> bool {
448 if self.storage.len() + slice.len() <= self.storage.capacity() {
449 self.storage.extend_from_slice(slice);
450 self.offsets.push(self.storage.len());
451 self.len += 1;
452 true
453 } else {
454 false
455 }
456 }
457 fn index(&self, index: usize) -> &[u8] {
458 let lower = self.offsets.index(index);
459 let upper = self.offsets.index(index + 1);
460 &self.storage[lower..upper]
461 }
462 #[inline(always)]
463 fn len(&self) -> usize {
464 debug_assert_eq!(self.len, self.offsets.len() - 1);
465 self.len
466 }
467
468 fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
469 let mut offsets = crate::row_spine::OffsetOptimized::with_capacity(item_cap + 1);
471 offsets.push(0);
472 Self {
473 offsets,
474 storage: Region::new_auto(byte_cap.next_power_of_two()),
475 len: 0,
476 }
477 }
478 }
479}
480
481mod offset_opt {
482 use differential_dataflow::trace::implementations::BatchContainer;
483 use differential_dataflow::trace::implementations::OffsetList;
484 use timely::container::PushInto;
485
486 enum OffsetStride {
487 Empty,
488 Zero,
489 Striding(usize, usize),
490 Saturated(usize, usize, usize),
491 }
492
493 impl OffsetStride {
494 fn push(&mut self, item: usize) -> bool {
496 match self {
497 OffsetStride::Empty => {
498 if item == 0 {
499 *self = OffsetStride::Zero;
500 true
501 } else {
502 false
503 }
504 }
505 OffsetStride::Zero => {
506 *self = OffsetStride::Striding(item, 2);
507 true
508 }
509 OffsetStride::Striding(stride, count) => {
510 if item == *stride * *count {
511 *count += 1;
512 true
513 } else if item == *stride * (*count - 1) {
514 *self = OffsetStride::Saturated(*stride, *count, 1);
515 true
516 } else {
517 false
518 }
519 }
520 OffsetStride::Saturated(stride, count, reps) => {
521 if item == *stride * (*count - 1) {
522 *reps += 1;
523 true
524 } else {
525 false
526 }
527 }
528 }
529 }
530
531 fn index(&self, index: usize) -> usize {
532 match self {
533 OffsetStride::Empty => {
534 panic!("Empty OffsetStride")
535 }
536 OffsetStride::Zero => 0,
537 OffsetStride::Striding(stride, _steps) => *stride * index,
538 OffsetStride::Saturated(stride, steps, _reps) => {
539 if index < *steps {
540 *stride * index
541 } else {
542 *stride * (*steps - 1)
543 }
544 }
545 }
546 }
547
548 fn len(&self) -> usize {
549 match self {
550 OffsetStride::Empty => 0,
551 OffsetStride::Zero => 1,
552 OffsetStride::Striding(_stride, steps) => *steps,
553 OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
554 }
555 }
556 }
557
558 pub struct OffsetOptimized {
559 strided: OffsetStride,
560 spilled: OffsetList,
561 }
562
563 impl BatchContainer for OffsetOptimized {
564 type Owned = usize;
565 type ReadItem<'a> = usize;
566
567 fn with_capacity(_size: usize) -> Self {
568 Self {
569 strided: OffsetStride::Empty,
570 spilled: OffsetList::with_capacity(0),
571 }
572 }
573
574 fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
575 Self {
576 strided: OffsetStride::Empty,
577 spilled: OffsetList::with_capacity(0),
578 }
579 }
580
581 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
582 item
583 }
584
585 fn index(&self, index: usize) -> Self::ReadItem<'_> {
586 if index < self.strided.len() {
587 self.strided.index(index)
588 } else {
589 self.spilled.index(index - self.strided.len())
590 }
591 }
592
593 fn len(&self) -> usize {
594 self.strided.len() + self.spilled.len()
595 }
596 }
597
598 impl PushInto<usize> for OffsetOptimized {
599 fn push_into(&mut self, item: usize) {
600 if !self.spilled.is_empty() {
601 self.spilled.push(item);
602 } else {
603 let inserted = self.strided.push(item);
604 if !inserted {
605 self.spilled.push(item);
606 }
607 }
608 }
609 }
610
611 impl OffsetOptimized {
612 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
613 crate::row_spine::offset_list_size(&self.spilled, callback);
614 }
615 }
616}
617
618#[inline]
620pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
621 #[inline(always)]
624 fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
625 let size_of_t = std::mem::size_of::<T>();
626 callback(data.len() * size_of_t, data.capacity() * size_of_t);
627 }
628
629 vec_size(&data.smol, &mut callback);
630 vec_size(&data.chonk, callback);
631}