1pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod ord_neu;
45pub mod rhh;
46pub mod huffman_container;
47pub mod chunker;
48
49pub use self::ord_neu::OrdValSpine as ValSpine;
51pub use self::ord_neu::OrdValBatcher as ValBatcher;
52pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
53pub use self::ord_neu::OrdKeySpine as KeySpine;
54pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
55pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
56
57use std::convert::TryInto;
58
59use columnation::Columnation;
60use serde::{Deserialize, Serialize};
61use timely::Container;
62use timely::container::PushInto;
63use timely::progress::Timestamp;
64
65use crate::containers::TimelyStack;
66use crate::lattice::Lattice;
67use crate::difference::Semigroup;
68
69pub trait Update {
71 type Key: Ord + Clone + 'static;
73 type Val: Ord + Clone + 'static;
75 type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
77 type Diff: Ord + Semigroup + 'static;
79}
80
81impl<K,V,T,R> Update for ((K, V), T, R)
82where
83 K: Ord+Clone+'static,
84 V: Ord+Clone+'static,
85 T: Ord+Clone+Lattice+timely::progress::Timestamp,
86 R: Ord+Semigroup+'static,
87{
88 type Key = K;
89 type Val = V;
90 type Time = T;
91 type Diff = R;
92}
93
94pub trait Layout {
96 type KeyContainer: BatchContainer;
98 type ValContainer: BatchContainer;
100 type TimeContainer: BatchContainer<Owned: Lattice + timely::progress::Timestamp>;
102 type DiffContainer: BatchContainer<Owned: Semigroup + 'static>;
104 type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
106}
107
108pub trait WithLayout {
110 type Layout: Layout;
112}
113
114pub trait LayoutExt : WithLayout<Layout: Layout<KeyContainer = Self::KeyContainer, ValContainer = Self::ValContainer, TimeContainer = Self::TimeContainer, DiffContainer = Self::DiffContainer>> {
116 type KeyOwn;
118 type Key<'a>: Copy + Ord;
120 type ValOwn: Clone + Ord;
122 type Val<'a>: Copy + Ord;
124 type Time: Lattice + timely::progress::Timestamp;
126 type TimeGat<'a>: Copy + Ord;
128 type Diff: Semigroup + 'static;
130 type DiffGat<'a>: Copy + Ord;
132
133 type KeyContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Key<'a>, Owned = Self::KeyOwn>;
135 type ValContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Val<'a>, Owned = Self::ValOwn>;
137 type TimeContainer: for<'a> BatchContainer<ReadItem<'a> = Self::TimeGat<'a>, Owned = Self::Time>;
139 type DiffContainer: for<'a> BatchContainer<ReadItem<'a> = Self::DiffGat<'a>, Owned = Self::Diff>;
141
142 fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn;
144 fn owned_val(val: Self::Val<'_>) -> Self::ValOwn;
146 fn owned_time(time: Self::TimeGat<'_>) -> Self::Time;
148 fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff;
150
151 fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time);
153}
154
155impl<L: WithLayout> LayoutExt for L {
156 type KeyOwn = <<L::Layout as Layout>::KeyContainer as BatchContainer>::Owned;
157 type Key<'a> = <<L::Layout as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
158 type ValOwn = <<L::Layout as Layout>::ValContainer as BatchContainer>::Owned;
159 type Val<'a> = <<L::Layout as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
160 type Time = <<L::Layout as Layout>::TimeContainer as BatchContainer>::Owned;
161 type TimeGat<'a> = <<L::Layout as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
162 type Diff = <<L::Layout as Layout>::DiffContainer as BatchContainer>::Owned;
163 type DiffGat<'a> = <<L::Layout as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
164
165 type KeyContainer = <L::Layout as Layout>::KeyContainer;
166 type ValContainer = <L::Layout as Layout>::ValContainer;
167 type TimeContainer = <L::Layout as Layout>::TimeContainer;
168 type DiffContainer = <L::Layout as Layout>::DiffContainer;
169
170 #[inline(always)] fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn { <Self::Layout as Layout>::KeyContainer::into_owned(key) }
171 #[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { <Self::Layout as Layout>::ValContainer::into_owned(val) }
172 #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { <Self::Layout as Layout>::TimeContainer::into_owned(time) }
173 #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { <Self::Layout as Layout>::DiffContainer::into_owned(diff) }
174 #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { <Self::Layout as Layout>::TimeContainer::clone_onto(time, onto) }
175
176}
177
178impl<KC, VC, TC, DC, OC> Layout for (KC, VC, TC, DC, OC)
181where
182 KC: BatchContainer,
183 VC: BatchContainer,
184 TC: BatchContainer<Owned: Lattice + timely::progress::Timestamp>,
185 DC: BatchContainer<Owned: Semigroup>,
186 OC: for<'a> BatchContainer<ReadItem<'a> = usize>,
187{
188 type KeyContainer = KC;
189 type ValContainer = VC;
190 type TimeContainer = TC;
191 type DiffContainer = DC;
192 type OffsetContainer = OC;
193}
194
195pub mod layout {
199 use crate::trace::implementations::{BatchContainer, Layout};
200
201 pub type Key<L> = <<L as Layout>::KeyContainer as BatchContainer>::Owned;
203 pub type KeyRef<'a, L> = <<L as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
205 pub type Val<L> = <<L as Layout>::ValContainer as BatchContainer>::Owned;
207 pub type ValRef<'a, L> = <<L as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
209 pub type Time<L> = <<L as Layout>::TimeContainer as BatchContainer>::Owned;
211 pub type TimeRef<'a, L> = <<L as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
213 pub type Diff<L> = <<L as Layout>::DiffContainer as BatchContainer>::Owned;
215 pub type DiffRef<'a, L> = <<L as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
217}
218
219pub struct Vector<U: Update> {
221 phantom: std::marker::PhantomData<U>,
222}
223
224impl<U: Update<Diff: Ord>> Layout for Vector<U> {
225 type KeyContainer = Vec<U::Key>;
226 type ValContainer = Vec<U::Val>;
227 type TimeContainer = Vec<U::Time>;
228 type DiffContainer = Vec<U::Diff>;
229 type OffsetContainer = OffsetList;
230}
231
232pub struct TStack<U: Update> {
234 phantom: std::marker::PhantomData<U>,
235}
236
237impl<U> Layout for TStack<U>
238where
239 U: Update<
240 Key: Columnation,
241 Val: Columnation,
242 Time: Columnation,
243 Diff: Columnation + Ord,
244 >,
245{
246 type KeyContainer = TimelyStack<U::Key>;
247 type ValContainer = TimelyStack<U::Val>;
248 type TimeContainer = TimelyStack<U::Time>;
249 type DiffContainer = TimelyStack<U::Diff>;
250 type OffsetContainer = OffsetList;
251}
252
253#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
255pub struct OffsetList {
256 pub zero_prefix: usize,
258 pub smol: Vec<u32>,
260 pub chonk: Vec<u64>,
262}
263
264impl std::fmt::Debug for OffsetList {
265 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266 f.debug_list().entries(self.into_iter()).finish()
267 }
268}
269
270impl OffsetList {
271 pub fn with_capacity(cap: usize) -> Self {
273 Self {
274 zero_prefix: 0,
275 smol: Vec::with_capacity(cap),
276 chonk: Vec::new(),
277 }
278 }
279 pub fn push(&mut self, offset: usize) {
281 if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
282 self.zero_prefix += 1;
283 }
284 else if self.chonk.is_empty() {
285 if let Ok(smol) = offset.try_into() {
286 self.smol.push(smol);
287 }
288 else {
289 self.chonk.push(offset.try_into().unwrap())
290 }
291 }
292 else {
293 self.chonk.push(offset.try_into().unwrap())
294 }
295 }
296 pub fn index(&self, index: usize) -> usize {
298 if index < self.zero_prefix {
299 0
300 }
301 else if index - self.zero_prefix < self.smol.len() {
302 self.smol[index - self.zero_prefix].try_into().unwrap()
303 }
304 else {
305 self.chonk[index - self.zero_prefix - self.smol.len()].try_into().unwrap()
306 }
307 }
308 pub fn len(&self) -> usize {
310 self.zero_prefix + self.smol.len() + self.chonk.len()
311 }
312}
313
314impl<'a> IntoIterator for &'a OffsetList {
315 type Item = usize;
316 type IntoIter = OffsetListIter<'a>;
317
318 fn into_iter(self) -> Self::IntoIter {
319 OffsetListIter {list: self, index: 0 }
320 }
321}
322
323pub struct OffsetListIter<'a> {
325 list: &'a OffsetList,
326 index: usize,
327}
328
329impl<'a> Iterator for OffsetListIter<'a> {
330 type Item = usize;
331
332 fn next(&mut self) -> Option<Self::Item> {
333 if self.index < self.list.len() {
334 let res = Some(self.list.index(self.index));
335 self.index += 1;
336 res
337 } else {
338 None
339 }
340 }
341}
342
343impl PushInto<usize> for OffsetList {
344 fn push_into(&mut self, item: usize) {
345 self.push(item);
346 }
347}
348
349impl BatchContainer for OffsetList {
350 type Owned = usize;
351 type ReadItem<'a> = usize;
352
353 #[inline(always)]
354 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item }
355 #[inline(always)]
356 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
357
358 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
359 fn push_own(&mut self, item: &Self::Owned) { self.push_into(*item) }
360
361 fn clear(&mut self) { self.zero_prefix = 0; self.smol.clear(); self.chonk.clear(); }
362
363 fn with_capacity(size: usize) -> Self {
364 Self::with_capacity(size)
365 }
366
367 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
368 Self::with_capacity(cont1.len() + cont2.len())
369 }
370
371 fn index(&self, index: usize) -> Self::ReadItem<'_> {
372 self.index(index)
373 }
374
375 fn len(&self) -> usize {
376 self.len()
377 }
378}
379
380pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: Container {
382 type Key<'a>: Ord;
384 type Val<'a>: Ord;
386 type Time;
388 type Diff;
390
391 fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
393
394 fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
396
397 fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
399
400 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
402}
403
404impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
405where
406 K: Ord + Clone + 'static,
407 KBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a K>>,
408 V: Ord + Clone + 'static,
409 VBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a V>>,
410 T: Timestamp + Lattice + Clone + 'static,
411 R: Ord + Semigroup + 'static,
412{
413 type Key<'a> = K;
414 type Val<'a> = V;
415 type Time = T;
416 type Diff = R;
417
418 fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
419 (key, val, time, diff)
420 }
421
422 fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
423 KBC::reborrow(other) == this
424 }
425
426 fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
427 VBC::reborrow(other) == this
428 }
429
430 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
431 let mut keys = 0;
432 let mut vals = 0;
433 let mut upds = 0;
434 let mut prev_keyval = None;
435 for link in chain.iter() {
436 for ((key, val), _, _) in link.iter() {
437 if let Some((p_key, p_val)) = prev_keyval {
438 if p_key != key {
439 keys += 1;
440 vals += 1;
441 } else if p_val != val {
442 vals += 1;
443 }
444 } else {
445 keys += 1;
446 vals += 1;
447 }
448 upds += 1;
449 prev_keyval = Some((key, val));
450 }
451 }
452 (keys, vals, upds)
453 }
454}
455
456impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
457where
458 K: for<'a> BatchContainer<
459 ReadItem<'a>: PartialEq<&'a K::Owned>,
460 Owned: Ord + Columnation + Clone + 'static,
461 >,
462 V: for<'a> BatchContainer<
463 ReadItem<'a>: PartialEq<&'a V::Owned>,
464 Owned: Ord + Columnation + Clone + 'static,
465 >,
466 T: Timestamp + Lattice + Columnation + Clone + 'static,
467 R: Ord + Clone + Semigroup + Columnation + 'static,
468{
469 type Key<'a> = &'a K::Owned;
470 type Val<'a> = &'a V::Owned;
471 type Time = T;
472 type Diff = R;
473
474 fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
475 (key, val, time.clone(), diff.clone())
476 }
477
478 fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
479 K::reborrow(other) == *this
480 }
481
482 fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
483 V::reborrow(other) == *this
484 }
485
486 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
487 let mut keys = 0;
488 let mut vals = 0;
489 let mut upds = 0;
490 let mut prev_keyval = None;
491 for link in chain.iter() {
492 for ((key, val), _, _) in link.iter() {
493 if let Some((p_key, p_val)) = prev_keyval {
494 if p_key != key {
495 keys += 1;
496 vals += 1;
497 } else if p_val != val {
498 vals += 1;
499 }
500 } else {
501 keys += 1;
502 vals += 1;
503 }
504 upds += 1;
505 prev_keyval = Some((key, val));
506 }
507 }
508 (keys, vals, upds)
509 }
510}
511
512pub use self::containers::{BatchContainer, SliceContainer};
513
514pub mod containers {
516
517 use columnation::Columnation;
518 use timely::container::PushInto;
519
520 use crate::containers::TimelyStack;
521
522 pub trait BatchContainer: 'static {
524 type Owned: Clone + Ord;
526
527 type ReadItem<'a>: Copy + Ord;
529
530
531 #[must_use]
533 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned;
534 #[inline(always)]
536 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
537 *other = Self::into_owned(item);
538 }
539
540 fn push_ref(&mut self, item: Self::ReadItem<'_>);
542 fn push_own(&mut self, item: &Self::Owned);
544
545 fn clear(&mut self);
547
548 fn with_capacity(size: usize) -> Self;
550 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
552
553 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
555
556 fn index(&self, index: usize) -> Self::ReadItem<'_>;
558
559 fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
561 if index < self.len() {
562 Some(self.index(index))
563 }
564 else { None }
565 }
566
567 fn len(&self) -> usize;
569 fn last(&self) -> Option<Self::ReadItem<'_>> {
571 if self.len() > 0 {
572 Some(self.index(self.len()-1))
573 }
574 else {
575 None
576 }
577 }
578 fn is_empty(&self) -> bool { self.len() == 0 }
580
581 fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
588
589 let small_limit = 8;
590
591 if end > start + small_limit && function(self.index(start + small_limit)) {
593
594 let mut index = small_limit + 1;
596 if start + index < end && function(self.index(start + index)) {
597
598 let mut step = 1;
600 while start + index + step < end && function(self.index(start + index + step)) {
601 index += step;
602 step <<= 1;
603 }
604
605 step >>= 1;
607 while step > 0 {
608 if start + index + step < end && function(self.index(start + index + step)) {
609 index += step;
610 }
611 step >>= 1;
612 }
613
614 index += 1;
615 }
616
617 index
618 }
619 else {
620 let limit = std::cmp::min(end, start + small_limit);
621 (start .. limit).filter(|x| function(self.index(*x))).count()
622 }
623 }
624 }
625
626 impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
629 type Owned = T;
630 type ReadItem<'a> = &'a T;
631
632 #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
633 #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
634
635 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
636
637 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
638 fn push_own(&mut self, item: &Self::Owned) { self.push_into(item.clone()) }
639
640 fn clear(&mut self) { self.clear() }
641
642 fn with_capacity(size: usize) -> Self {
643 Vec::with_capacity(size)
644 }
645 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
646 Vec::with_capacity(cont1.len() + cont2.len())
647 }
648 fn index(&self, index: usize) -> Self::ReadItem<'_> {
649 &self[index]
650 }
651 fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
652 <[T]>::get(&self, index)
653 }
654 fn len(&self) -> usize {
655 self[..].len()
656 }
657 }
658
659 impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
662 type Owned = T;
663 type ReadItem<'a> = &'a T;
664
665 #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
666 #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
667
668 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
669
670 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
671 fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
672
673 fn clear(&mut self) { self.clear() }
674
675 fn with_capacity(size: usize) -> Self {
676 Self::with_capacity(size)
677 }
678 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
679 let mut new = Self::default();
680 new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
681 new
682 }
683 fn index(&self, index: usize) -> Self::ReadItem<'_> {
684 &self[index]
685 }
686 fn len(&self) -> usize {
687 self[..].len()
688 }
689 }
690
691 pub struct SliceContainer<B> {
693 offsets: Vec<usize>,
698 inner: Vec<B>,
700 }
701
702 impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
703 fn push_into(&mut self, item: &[B]) {
704 for x in item.iter() {
705 self.inner.push_into(x);
706 }
707 self.offsets.push(self.inner.len());
708 }
709 }
710
711 impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
712 fn push_into(&mut self, item: &Vec<B>) {
713 self.push_into(&item[..]);
714 }
715 }
716
717 impl<B> BatchContainer for SliceContainer<B>
718 where
719 B: Ord + Clone + Sized + 'static,
720 {
721 type Owned = Vec<B>;
722 type ReadItem<'a> = &'a [B];
723
724 #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.to_vec() }
725 #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from_slice(item); }
726
727 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
728
729 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
730 fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
731
732 fn clear(&mut self) {
733 self.offsets.clear();
734 self.offsets.push(0);
735 self.inner.clear();
736 }
737
738 fn with_capacity(size: usize) -> Self {
739 let mut offsets = Vec::with_capacity(size + 1);
740 offsets.push(0);
741 Self {
742 offsets,
743 inner: Vec::with_capacity(size),
744 }
745 }
746 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
747 let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
748 offsets.push(0);
749 Self {
750 offsets,
751 inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
752 }
753 }
754 fn index(&self, index: usize) -> Self::ReadItem<'_> {
755 let lower = self.offsets[index];
756 let upper = self.offsets[index+1];
757 &self.inner[lower .. upper]
758 }
759 fn len(&self) -> usize {
760 self.offsets.len() - 1
761 }
762 }
763
764 impl<B> Default for SliceContainer<B> {
766 fn default() -> Self {
767 Self {
768 offsets: vec![0],
769 inner: Default::default(),
770 }
771 }
772 }
773}