differential_dataflow/trace/implementations/
mod.rs1pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod chainless_batcher;
45pub mod ord_neu;
46pub mod rhh;
47pub mod huffman_container;
48pub mod chunker;
49
50pub use self::ord_neu::OrdValSpine as ValSpine;
52pub use self::ord_neu::OrdValBatcher as ValBatcher;
53pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
54pub use self::ord_neu::OrdKeySpine as KeySpine;
55pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
56pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
57
58use std::convert::TryInto;
59
60use serde::{Deserialize, Serialize};
61use timely::container::{DrainContainer, PushInto};
62use timely::progress::Timestamp;
63
64use crate::lattice::Lattice;
65use crate::difference::Semigroup;
66
67pub trait Update {
69 type Key: Ord + Clone + 'static;
71 type Val: Ord + Clone + 'static;
73 type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
75 type Diff: Ord + Semigroup + 'static;
77}
78
79impl<K,V,T,R> Update for ((K, V), T, R)
80where
81 K: Ord+Clone+'static,
82 V: Ord+Clone+'static,
83 T: Ord+Clone+Lattice+timely::progress::Timestamp,
84 R: Ord+Semigroup+'static,
85{
86 type Key = K;
87 type Val = V;
88 type Time = T;
89 type Diff = R;
90}
91
92pub trait Layout {
94 type KeyContainer: BatchContainer;
96 type ValContainer: BatchContainer;
98 type TimeContainer: BatchContainer<Owned: Lattice + timely::progress::Timestamp>;
100 type DiffContainer: BatchContainer<Owned: Semigroup + 'static>;
102 type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
104}
105
106pub trait WithLayout {
108 type Layout: Layout;
110}
111
112pub trait LayoutExt : WithLayout<Layout: Layout<KeyContainer = Self::KeyContainer, ValContainer = Self::ValContainer, TimeContainer = Self::TimeContainer, DiffContainer = Self::DiffContainer>> {
114 type Key<'a>: Copy + Ord;
116 type ValOwn: Clone + Ord;
118 type Val<'a>: Copy + Ord;
120 type Time: Lattice + timely::progress::Timestamp;
122 type TimeGat<'a>: Copy + Ord;
124 type Diff: Semigroup + 'static;
126 type DiffGat<'a>: Copy + Ord;
128
129 type KeyContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Key<'a>>;
131 type ValContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Val<'a>, Owned = Self::ValOwn>;
133 type TimeContainer: for<'a> BatchContainer<ReadItem<'a> = Self::TimeGat<'a>, Owned = Self::Time>;
135 type DiffContainer: for<'a> BatchContainer<ReadItem<'a> = Self::DiffGat<'a>, Owned = Self::Diff>;
137
138 fn owned_val(val: Self::Val<'_>) -> Self::ValOwn;
140 fn owned_time(time: Self::TimeGat<'_>) -> Self::Time;
142 fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff;
144
145 fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time);
147}
148
149impl<L: WithLayout> LayoutExt for L {
150 type Key<'a> = <<L::Layout as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
151 type ValOwn = <<L::Layout as Layout>::ValContainer as BatchContainer>::Owned;
152 type Val<'a> = <<L::Layout as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
153 type Time = <<L::Layout as Layout>::TimeContainer as BatchContainer>::Owned;
154 type TimeGat<'a> = <<L::Layout as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
155 type Diff = <<L::Layout as Layout>::DiffContainer as BatchContainer>::Owned;
156 type DiffGat<'a> = <<L::Layout as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
157
158 type KeyContainer = <L::Layout as Layout>::KeyContainer;
159 type ValContainer = <L::Layout as Layout>::ValContainer;
160 type TimeContainer = <L::Layout as Layout>::TimeContainer;
161 type DiffContainer = <L::Layout as Layout>::DiffContainer;
162
163 #[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { <Self::Layout as Layout>::ValContainer::into_owned(val) }
164 #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { <Self::Layout as Layout>::TimeContainer::into_owned(time) }
165 #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { <Self::Layout as Layout>::DiffContainer::into_owned(diff) }
166 #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { <Self::Layout as Layout>::TimeContainer::clone_onto(time, onto) }
167
168}
169
170impl<KC, VC, TC, DC, OC> Layout for (KC, VC, TC, DC, OC)
173where
174 KC: BatchContainer,
175 VC: BatchContainer,
176 TC: BatchContainer<Owned: Lattice + timely::progress::Timestamp>,
177 DC: BatchContainer<Owned: Semigroup>,
178 OC: for<'a> BatchContainer<ReadItem<'a> = usize>,
179{
180 type KeyContainer = KC;
181 type ValContainer = VC;
182 type TimeContainer = TC;
183 type DiffContainer = DC;
184 type OffsetContainer = OC;
185}
186
187pub mod layout {
191 use crate::trace::implementations::{BatchContainer, Layout};
192
193 pub type Key<L> = <<L as Layout>::KeyContainer as BatchContainer>::Owned;
195 pub type KeyRef<'a, L> = <<L as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
197 pub type Val<L> = <<L as Layout>::ValContainer as BatchContainer>::Owned;
199 pub type ValRef<'a, L> = <<L as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
201 pub type Time<L> = <<L as Layout>::TimeContainer as BatchContainer>::Owned;
203 pub type TimeRef<'a, L> = <<L as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
205 pub type Diff<L> = <<L as Layout>::DiffContainer as BatchContainer>::Owned;
207 pub type DiffRef<'a, L> = <<L as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
209}
210
211pub struct Vector<U: Update> {
213 phantom: std::marker::PhantomData<U>,
214}
215
216impl<U: Update<Diff: Ord>> Layout for Vector<U> {
217 type KeyContainer = Vec<U::Key>;
218 type ValContainer = Vec<U::Val>;
219 type TimeContainer = Vec<U::Time>;
220 type DiffContainer = Vec<U::Diff>;
221 type OffsetContainer = OffsetList;
222}
223
224#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
226pub struct OffsetList {
227 pub zero_prefix: usize,
229 pub smol: Vec<u32>,
231 pub chonk: Vec<u64>,
233}
234
235impl std::fmt::Debug for OffsetList {
236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 f.debug_list().entries(self.into_iter()).finish()
238 }
239}
240
241impl OffsetList {
242 pub fn with_capacity(cap: usize) -> Self {
244 Self {
245 zero_prefix: 0,
246 smol: Vec::with_capacity(cap),
247 chonk: Vec::new(),
248 }
249 }
250 pub fn push(&mut self, offset: usize) {
252 if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
253 self.zero_prefix += 1;
254 }
255 else if self.chonk.is_empty() {
256 if let Ok(smol) = offset.try_into() {
257 self.smol.push(smol);
258 }
259 else {
260 self.chonk.push(offset.try_into().unwrap())
261 }
262 }
263 else {
264 self.chonk.push(offset.try_into().unwrap())
265 }
266 }
267 pub fn index(&self, index: usize) -> usize {
269 if index < self.zero_prefix {
270 0
271 }
272 else if index - self.zero_prefix < self.smol.len() {
273 self.smol[index - self.zero_prefix].try_into().unwrap()
274 }
275 else {
276 self.chonk[index - self.zero_prefix - self.smol.len()].try_into().unwrap()
277 }
278 }
279 pub fn len(&self) -> usize {
281 self.zero_prefix + self.smol.len() + self.chonk.len()
282 }
283}
284
285impl<'a> IntoIterator for &'a OffsetList {
286 type Item = usize;
287 type IntoIter = OffsetListIter<'a>;
288
289 fn into_iter(self) -> Self::IntoIter {
290 OffsetListIter {list: self, index: 0 }
291 }
292}
293
294pub struct OffsetListIter<'a> {
296 list: &'a OffsetList,
297 index: usize,
298}
299
300impl<'a> Iterator for OffsetListIter<'a> {
301 type Item = usize;
302
303 fn next(&mut self) -> Option<Self::Item> {
304 if self.index < self.list.len() {
305 let res = Some(self.list.index(self.index));
306 self.index += 1;
307 res
308 } else {
309 None
310 }
311 }
312}
313
314impl PushInto<usize> for OffsetList {
315 fn push_into(&mut self, item: usize) {
316 self.push(item);
317 }
318}
319
320impl BatchContainer for OffsetList {
321 type Owned = usize;
322 type ReadItem<'a> = usize;
323
324 #[inline(always)]
325 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item }
326 #[inline(always)]
327 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
328
329 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
330 fn push_own(&mut self, item: &Self::Owned) { self.push_into(*item) }
331
332 fn clear(&mut self) { self.zero_prefix = 0; self.smol.clear(); self.chonk.clear(); }
333
334 fn with_capacity(size: usize) -> Self {
335 Self::with_capacity(size)
336 }
337
338 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
339 Self::with_capacity(cont1.len() + cont2.len())
340 }
341
342 fn index(&self, index: usize) -> Self::ReadItem<'_> {
343 self.index(index)
344 }
345
346 fn len(&self) -> usize {
347 self.len()
348 }
349}
350
351pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: DrainContainer + Sized {
353 type Key<'a>: Ord;
355 type Val<'a>: Ord;
357 type Time;
359 type Diff;
361
362 fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
364
365 fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
367
368 fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
370
371 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
373}
374
375impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
376where
377 K: Ord + Clone + 'static,
378 KBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a K>>,
379 V: Ord + Clone + 'static,
380 VBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a V>>,
381 T: Timestamp + Lattice + Clone + 'static,
382 R: Ord + Semigroup + 'static,
383{
384 type Key<'a> = K;
385 type Val<'a> = V;
386 type Time = T;
387 type Diff = R;
388
389 fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
390 (key, val, time, diff)
391 }
392
393 fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
394 KBC::reborrow(other) == this
395 }
396
397 fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
398 VBC::reborrow(other) == this
399 }
400
401 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
402 let mut keys = 0;
403 let mut vals = 0;
404 let mut upds = 0;
405 let mut prev_keyval = None;
406 for link in chain.iter() {
407 for ((key, val), _, _) in link.iter() {
408 if let Some((p_key, p_val)) = prev_keyval {
409 if p_key != key {
410 keys += 1;
411 vals += 1;
412 } else if p_val != val {
413 vals += 1;
414 }
415 } else {
416 keys += 1;
417 vals += 1;
418 }
419 upds += 1;
420 prev_keyval = Some((key, val));
421 }
422 }
423 (keys, vals, upds)
424 }
425}
426
427pub use self::containers::{BatchContainer, SliceContainer};
428
429pub mod containers {
431
432 use timely::container::PushInto;
433
434 pub trait BatchContainer: 'static {
436 type Owned: Clone + Ord;
438
439 type ReadItem<'a>: Copy + Ord;
441
442
443 #[must_use]
445 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned;
446 #[inline(always)]
448 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
449 *other = Self::into_owned(item);
450 }
451
452 fn push_ref(&mut self, item: Self::ReadItem<'_>);
454 fn push_own(&mut self, item: &Self::Owned);
456
457 fn clear(&mut self);
459
460 fn with_capacity(size: usize) -> Self;
462 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
464
465 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
467
468 fn index(&self, index: usize) -> Self::ReadItem<'_>;
470
471 fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
473 if index < self.len() {
474 Some(self.index(index))
475 }
476 else { None }
477 }
478
479 fn len(&self) -> usize;
481 fn last(&self) -> Option<Self::ReadItem<'_>> {
483 if self.len() > 0 {
484 Some(self.index(self.len()-1))
485 }
486 else {
487 None
488 }
489 }
490 fn is_empty(&self) -> bool { self.len() == 0 }
492
493 fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
500
501 let small_limit = 8;
502
503 if end > start + small_limit && function(self.index(start + small_limit)) {
505
506 let mut index = small_limit + 1;
508 if start + index < end && function(self.index(start + index)) {
509
510 let mut step = 1;
512 while start + index + step < end && function(self.index(start + index + step)) {
513 index += step;
514 step <<= 1;
515 }
516
517 step >>= 1;
519 while step > 0 {
520 if start + index + step < end && function(self.index(start + index + step)) {
521 index += step;
522 }
523 step >>= 1;
524 }
525
526 index += 1;
527 }
528
529 index
530 }
531 else {
532 let limit = std::cmp::min(end, start + small_limit);
533 (start .. limit).filter(|x| function(self.index(*x))).count()
534 }
535 }
536 }
537
538 impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
541 type Owned = T;
542 type ReadItem<'a> = &'a T;
543
544 #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
545 #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
546
547 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
548
549 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
550 fn push_own(&mut self, item: &Self::Owned) { self.push_into(item.clone()) }
551
552 fn clear(&mut self) { self.clear() }
553
554 fn with_capacity(size: usize) -> Self {
555 Vec::with_capacity(size)
556 }
557 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
558 Vec::with_capacity(cont1.len() + cont2.len())
559 }
560 fn index(&self, index: usize) -> Self::ReadItem<'_> {
561 &self[index]
562 }
563 fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
564 <[T]>::get(&self, index)
565 }
566 fn len(&self) -> usize {
567 self[..].len()
568 }
569 }
570
571 pub struct SliceContainer<B> {
573 offsets: Vec<usize>,
578 inner: Vec<B>,
580 }
581
582 impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
583 fn push_into(&mut self, item: &[B]) {
584 for x in item.iter() {
585 self.inner.push_into(x);
586 }
587 self.offsets.push(self.inner.len());
588 }
589 }
590
591 impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
592 fn push_into(&mut self, item: &Vec<B>) {
593 self.push_into(&item[..]);
594 }
595 }
596
597 impl<B> BatchContainer for SliceContainer<B>
598 where
599 B: Ord + Clone + Sized + 'static,
600 {
601 type Owned = Vec<B>;
602 type ReadItem<'a> = &'a [B];
603
604 #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.to_vec() }
605 #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from_slice(item); }
606
607 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
608
609 fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
610 fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
611
612 fn clear(&mut self) {
613 self.offsets.clear();
614 self.offsets.push(0);
615 self.inner.clear();
616 }
617
618 fn with_capacity(size: usize) -> Self {
619 let mut offsets = Vec::with_capacity(size + 1);
620 offsets.push(0);
621 Self {
622 offsets,
623 inner: Vec::with_capacity(size),
624 }
625 }
626 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
627 let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
628 offsets.push(0);
629 Self {
630 offsets,
631 inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
632 }
633 }
634 fn index(&self, index: usize) -> Self::ReadItem<'_> {
635 let lower = self.offsets[index];
636 let upper = self.offsets[index+1];
637 &self.inner[lower .. upper]
638 }
639 fn len(&self) -> usize {
640 self.offsets.len() - 1
641 }
642 }
643
644 impl<B> Default for SliceContainer<B> {
646 fn default() -> Self {
647 Self {
648 offsets: vec![0],
649 inner: Default::default(),
650 }
651 }
652 }
653}