1pub use self::container::DatumContainer;
11pub use self::container::DatumSeq;
12pub use self::offset_opt::OffsetOptimized;
13pub use self::spines::{
14 RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher,
15 RowValBuilder, RowValSpine,
16};
17use differential_dataflow::trace::implementations::OffsetList;
18
19mod spines {
21 use std::rc::Rc;
22
23 use differential_dataflow::containers::{Columnation, TimelyStack};
24 use differential_dataflow::trace::implementations::Layout;
25 use differential_dataflow::trace::implementations::Update;
26 use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
27 use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
28 use differential_dataflow::trace::implementations::spine_fueled::Spine;
29 use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
30 use mz_repr::Row;
31
32 use crate::row_spine::{DatumContainer, OffsetOptimized};
33 use crate::typedefs::{KeyBatcher, KeyValBatcher};
34
35 pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
36 pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
37 pub type RowRowBuilder<T, R> =
38 RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, TimelyStack<((Row, Row), T, R)>>>;
39
40 pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
41 pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
42 pub type RowValBuilder<V, T, R> =
43 RcBuilder<OrdValBuilder<RowValLayout<((Row, V), T, R)>, TimelyStack<((Row, V), T, R)>>>;
44
45 pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
46 pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
47 pub type RowBuilder<T, R> =
48 RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, TimelyStack<((Row, ()), T, R)>>>;
49
50 pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
52 phantom: std::marker::PhantomData<U>,
53 }
54 pub struct RowValLayout<U: Update<Key = Row>> {
55 phantom: std::marker::PhantomData<U>,
56 }
57 pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
58 phantom: std::marker::PhantomData<U>,
59 }
60
61 impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
62 where
63 U::Time: Columnation,
64 U::Diff: Columnation,
65 {
66 type Target = U;
67 type KeyContainer = DatumContainer;
68 type ValContainer = DatumContainer;
69 type TimeContainer = TimelyStack<U::Time>;
70 type DiffContainer = TimelyStack<U::Diff>;
71 type OffsetContainer = OffsetOptimized;
72 }
73 impl<U: Update<Key = Row>> Layout for RowValLayout<U>
74 where
75 U::Val: Columnation,
76 U::Time: Columnation,
77 U::Diff: Columnation,
78 {
79 type Target = U;
80 type KeyContainer = DatumContainer;
81 type ValContainer = TimelyStack<U::Val>;
82 type TimeContainer = TimelyStack<U::Time>;
83 type DiffContainer = TimelyStack<U::Diff>;
84 type OffsetContainer = OffsetOptimized;
85 }
86 impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
87 where
88 U::Time: Columnation,
89 U::Diff: Columnation,
90 {
91 type Target = U;
92 type KeyContainer = DatumContainer;
93 type ValContainer = TimelyStack<()>;
94 type TimeContainer = TimelyStack<U::Time>;
95 type DiffContainer = TimelyStack<U::Diff>;
96 type OffsetContainer = OffsetOptimized;
97 }
98}
99
100mod container {
102
103 use std::cmp::Ordering;
104
105 use differential_dataflow::IntoOwned;
106 use differential_dataflow::trace::implementations::BatchContainer;
107 use timely::container::PushInto;
108
109 use mz_repr::{Datum, Row, RowPacker, read_datum};
110
111 use super::bytes_container::BytesContainer;
112
113 pub struct DatumContainer {
118 bytes: BytesContainer,
119 }
120
121 impl DatumContainer {
122 #[inline]
124 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
125 self.bytes.heap_size(callback)
126 }
127 }
128
129 impl BatchContainer for DatumContainer {
130 type Owned = Row;
131 type ReadItem<'a> = DatumSeq<'a>;
132
133 fn with_capacity(size: usize) -> Self {
134 Self {
135 bytes: BytesContainer::with_capacity(size),
136 }
137 }
138
139 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
140 Self {
141 bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
142 }
143 }
144
145 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
146 item
147 }
148
149 fn index(&self, index: usize) -> Self::ReadItem<'_> {
150 DatumSeq {
151 bytes: self.bytes.index(index),
152 }
153 }
154
155 fn len(&self) -> usize {
156 self.bytes.len()
157 }
158 }
159
160 impl PushInto<Row> for DatumContainer {
161 fn push_into(&mut self, item: Row) {
162 self.push_into(&item);
163 }
164 }
165
166 impl PushInto<&Row> for DatumContainer {
167 fn push_into(&mut self, item: &Row) {
168 let item: DatumSeq<'_> = IntoOwned::borrow_as(item);
169 self.push_into(item);
170 }
171 }
172
173 impl PushInto<DatumSeq<'_>> for DatumContainer {
174 fn push_into(&mut self, item: DatumSeq<'_>) {
175 self.bytes.push_into(item.as_bytes())
176 }
177 }
178
179 #[derive(Debug)]
180 pub struct DatumSeq<'a> {
181 bytes: &'a [u8],
182 }
183
184 impl<'a> DatumSeq<'a> {
185 pub fn copy_into(&self, row: &mut RowPacker) {
186 unsafe { row.extend_by_slice_unchecked(self.bytes) }
188 }
189 fn as_bytes(&self) -> &'a [u8] {
190 self.bytes
191 }
192 }
193
194 impl<'a> Copy for DatumSeq<'a> {}
195 impl<'a> Clone for DatumSeq<'a> {
196 fn clone(&self) -> Self {
197 *self
198 }
199 }
200
201 impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
202 fn eq(&self, other: &DatumSeq<'a>) -> bool {
203 self.bytes.eq(other.bytes)
204 }
205 }
206 impl<'a> PartialEq<&Row> for DatumSeq<'a> {
207 fn eq(&self, other: &&Row) -> bool {
208 self.bytes.eq(other.data())
209 }
210 }
211 impl<'a> Eq for DatumSeq<'a> {}
212 impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
213 fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
214 Some(self.cmp(other))
215 }
216 }
217 impl<'a> Ord for DatumSeq<'a> {
218 fn cmp(&self, other: &Self) -> Ordering {
219 match self.bytes.len().cmp(&other.bytes.len()) {
220 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
221 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
222 std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
223 }
224 }
225 }
226 impl<'a> IntoOwned<'a> for DatumSeq<'a> {
227 type Owned = Row;
228 fn into_owned(self) -> Self::Owned {
229 unsafe { Row::from_bytes_unchecked(self.bytes) }
231 }
232 fn clone_onto(self, other: &mut Self::Owned) {
233 let mut packer = other.packer();
234 self.copy_into(&mut packer);
235 }
236 fn borrow_as(other: &'a Self::Owned) -> Self {
237 Self {
238 bytes: other.data(),
239 }
240 }
241 }
242
243 impl<'a> Iterator for DatumSeq<'a> {
244 type Item = Datum<'a>;
245 fn next(&mut self) -> Option<Self::Item> {
246 if self.bytes.is_empty() {
247 None
248 } else {
249 let result = unsafe { read_datum(&mut self.bytes) };
250 Some(result)
251 }
252 }
253 }
254
255 use mz_repr::fixed_length::ToDatumIter;
256 impl<'long> ToDatumIter for DatumSeq<'long> {
257 type DatumIter<'short>
258 = DatumSeq<'short>
259 where
260 Self: 'short;
261 fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
262 *self
263 }
264 }
265
266 #[cfg(test)]
267 mod tests {
268 use crate::row_spine::DatumContainer;
269 use differential_dataflow::trace::implementations::BatchContainer;
270 use mz_repr::adt::date::Date;
271 use mz_repr::adt::interval::Interval;
272 use mz_repr::{Datum, Row, ScalarType};
273
274 #[mz_ore::test]
275 #[cfg_attr(miri, ignore)] fn test_round_trip() {
277 fn round_trip(datums: Vec<Datum>) {
278 let row = Row::pack(datums.clone());
279
280 let mut container = DatumContainer::with_capacity(row.byte_len());
281 container.push(&row);
282
283 println!("{:?}", container.index(0).bytes);
286
287 let datums2 = container.index(0).collect::<Vec<_>>();
288 assert_eq!(datums, datums2);
289 }
290
291 round_trip(vec![]);
292 round_trip(
293 ScalarType::enumerate()
294 .iter()
295 .flat_map(|r#type| r#type.interesting_datums())
296 .collect(),
297 );
298 round_trip(vec![
299 Datum::Null,
300 Datum::Null,
301 Datum::False,
302 Datum::True,
303 Datum::Int16(-21),
304 Datum::Int32(-42),
305 Datum::Int64(-2_147_483_648 - 42),
306 Datum::UInt8(0),
307 Datum::UInt8(1),
308 Datum::UInt16(0),
309 Datum::UInt16(1),
310 Datum::UInt16(1 << 8),
311 Datum::UInt32(0),
312 Datum::UInt32(1),
313 Datum::UInt32(1 << 8),
314 Datum::UInt32(1 << 16),
315 Datum::UInt32(1 << 24),
316 Datum::UInt64(0),
317 Datum::UInt64(1),
318 Datum::UInt64(1 << 8),
319 Datum::UInt64(1 << 16),
320 Datum::UInt64(1 << 24),
321 Datum::UInt64(1 << 32),
322 Datum::UInt64(1 << 40),
323 Datum::UInt64(1 << 48),
324 Datum::UInt64(1 << 56),
325 Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
326 Datum::Interval(Interval {
327 months: 312,
328 ..Default::default()
329 }),
330 Datum::Interval(Interval::new(0, 0, 1_012_312)),
331 Datum::Bytes(&[]),
332 Datum::Bytes(&[0, 2, 1, 255]),
333 Datum::String(""),
334 Datum::String("العَرَبِيَّة"),
335 ]);
336 }
337 }
338}
339
340mod bytes_container {
341
342 use differential_dataflow::trace::implementations::BatchContainer;
343 use timely::container::PushInto;
344
345 use mz_ore::region::Region;
346
347 pub struct BytesContainer {
349 batches: Vec<BytesBatch>,
350 }
351
352 impl BytesContainer {
353 #[inline]
355 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
356 callback(
358 self.batches.len() * std::mem::size_of::<BytesBatch>(),
359 self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
360 );
361 for batch in self.batches.iter() {
362 batch.offsets.heap_size(&mut callback);
363 callback(batch.storage.len(), batch.storage.capacity());
364 }
365 }
366 }
367
368 impl BatchContainer for BytesContainer {
369 type Owned = Vec<u8>;
370 type ReadItem<'a> = &'a [u8];
371
372 fn with_capacity(size: usize) -> Self {
373 Self {
374 batches: vec![BytesBatch::with_capacities(size, size)],
375 }
376 }
377
378 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
379 let mut item_cap = 1;
380 let mut byte_cap = 0;
381 for batch in cont1.batches.iter() {
382 item_cap += batch.offsets.len() - 1;
383 byte_cap += batch.storage.len();
384 }
385 for batch in cont2.batches.iter() {
386 item_cap += batch.offsets.len() - 1;
387 byte_cap += batch.storage.len();
388 }
389 Self {
390 batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
391 }
392 }
393
394 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
395 item
396 }
397
398 fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
399 for batch in self.batches.iter() {
400 if index < batch.len() {
401 return batch.index(index);
402 }
403 index -= batch.len();
404 }
405 panic!("Index out of bounds");
406 }
407
408 fn len(&self) -> usize {
409 let mut result = 0;
410 for batch in self.batches.iter() {
411 result += batch.len();
412 }
413 result
414 }
415 }
416
417 impl PushInto<&[u8]> for BytesContainer {
418 fn push_into(&mut self, item: &[u8]) {
419 if let Some(batch) = self.batches.last_mut() {
420 let success = batch.try_push(item);
421 if !success {
422 let item_cap = 2 * batch.offsets.len();
424 let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
425 let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
426 assert!(new_batch.try_push(item));
427 self.batches.push(new_batch);
428 }
429 }
430 }
431 }
432
433 pub struct BytesBatch {
437 offsets: crate::row_spine::OffsetOptimized,
438 storage: Region<u8>,
439 }
440
441 impl BytesBatch {
442 fn try_push(&mut self, slice: &[u8]) -> bool {
445 if self.storage.len() + slice.len() <= self.storage.capacity() {
446 self.storage.extend_from_slice(slice);
447 self.offsets.push(self.storage.len());
448 true
449 } else {
450 false
451 }
452 }
453 fn index(&self, index: usize) -> &[u8] {
454 let lower = self.offsets.index(index);
455 let upper = self.offsets.index(index + 1);
456 &self.storage[lower..upper]
457 }
458 fn len(&self) -> usize {
459 self.offsets.len() - 1
460 }
461
462 fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
463 let mut offsets = crate::row_spine::OffsetOptimized::with_capacity(item_cap + 1);
465 offsets.push(0);
466 Self {
467 offsets,
468 storage: Region::new_auto(byte_cap.next_power_of_two()),
469 }
470 }
471 }
472}
473
474mod offset_opt {
475 use differential_dataflow::trace::implementations::BatchContainer;
476 use differential_dataflow::trace::implementations::OffsetList;
477 use timely::container::PushInto;
478
479 enum OffsetStride {
480 Empty,
481 Zero,
482 Striding(usize, usize),
483 Saturated(usize, usize, usize),
484 }
485
486 impl OffsetStride {
487 fn push(&mut self, item: usize) -> bool {
489 match self {
490 OffsetStride::Empty => {
491 if item == 0 {
492 *self = OffsetStride::Zero;
493 true
494 } else {
495 false
496 }
497 }
498 OffsetStride::Zero => {
499 *self = OffsetStride::Striding(item, 2);
500 true
501 }
502 OffsetStride::Striding(stride, count) => {
503 if item == *stride * *count {
504 *count += 1;
505 true
506 } else if item == *stride * (*count - 1) {
507 *self = OffsetStride::Saturated(*stride, *count, 1);
508 true
509 } else {
510 false
511 }
512 }
513 OffsetStride::Saturated(stride, count, reps) => {
514 if item == *stride * (*count - 1) {
515 *reps += 1;
516 true
517 } else {
518 false
519 }
520 }
521 }
522 }
523
524 fn index(&self, index: usize) -> usize {
525 match self {
526 OffsetStride::Empty => {
527 panic!("Empty OffsetStride")
528 }
529 OffsetStride::Zero => 0,
530 OffsetStride::Striding(stride, _steps) => *stride * index,
531 OffsetStride::Saturated(stride, steps, _reps) => {
532 if index < *steps {
533 *stride * index
534 } else {
535 *stride * (*steps - 1)
536 }
537 }
538 }
539 }
540
541 fn len(&self) -> usize {
542 match self {
543 OffsetStride::Empty => 0,
544 OffsetStride::Zero => 1,
545 OffsetStride::Striding(_stride, steps) => *steps,
546 OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
547 }
548 }
549 }
550
551 pub struct OffsetOptimized {
552 strided: OffsetStride,
553 spilled: OffsetList,
554 }
555
556 impl BatchContainer for OffsetOptimized {
557 type Owned = usize;
558 type ReadItem<'a> = usize;
559
560 fn with_capacity(_size: usize) -> Self {
561 Self {
562 strided: OffsetStride::Empty,
563 spilled: OffsetList::with_capacity(0),
564 }
565 }
566
567 fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
568 Self {
569 strided: OffsetStride::Empty,
570 spilled: OffsetList::with_capacity(0),
571 }
572 }
573
574 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
575 item
576 }
577
578 fn index(&self, index: usize) -> Self::ReadItem<'_> {
579 if index < self.strided.len() {
580 self.strided.index(index)
581 } else {
582 self.spilled.index(index - self.strided.len())
583 }
584 }
585
586 fn len(&self) -> usize {
587 self.strided.len() + self.spilled.len()
588 }
589 }
590
591 impl PushInto<usize> for OffsetOptimized {
592 fn push_into(&mut self, item: usize) {
593 if !self.spilled.is_empty() {
594 self.spilled.push(item);
595 } else {
596 let inserted = self.strided.push(item);
597 if !inserted {
598 self.spilled.push(item);
599 }
600 }
601 }
602 }
603
604 impl OffsetOptimized {
605 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
606 crate::row_spine::offset_list_size(&self.spilled, callback);
607 }
608 }
609}
610
611#[inline]
613pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
614 #[inline(always)]
617 fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
618 let size_of_t = std::mem::size_of::<T>();
619 callback(data.len() * size_of_t, data.capacity() * size_of_t);
620 }
621
622 vec_size(&data.smol, &mut callback);
623 vec_size(&data.chonk, callback);
624}