1pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod ord_neu;
45pub mod rhh;
46pub mod huffman_container;
47pub mod chunker;
48
49pub use self::ord_neu::OrdValSpine as ValSpine;
51pub use self::ord_neu::OrdValBatcher as ValBatcher;
52pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
53pub use self::ord_neu::OrdKeySpine as KeySpine;
54pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
55pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
56
57use std::borrow::{ToOwned};
58use std::convert::TryInto;
59
60use columnation::Columnation;
61use serde::{Deserialize, Serialize};
62use timely::Container;
63use timely::container::PushInto;
64use timely::progress::Timestamp;
65
66use crate::containers::TimelyStack;
67use crate::lattice::Lattice;
68use crate::difference::Semigroup;
69
70pub trait Update {
72 type Key: Ord + Clone + 'static;
74 type Val: Ord + Clone + 'static;
76 type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
78 type Diff: Ord + Semigroup + 'static;
80}
81
82impl<K,V,T,R> Update for ((K, V), T, R)
83where
84 K: Ord+Clone+'static,
85 V: Ord+Clone+'static,
86 T: Ord+Clone+Lattice+timely::progress::Timestamp,
87 R: Ord+Semigroup+'static,
88{
89 type Key = K;
90 type Val = V;
91 type Time = T;
92 type Diff = R;
93}
94
95pub trait Layout {
97 type Target: Update + ?Sized;
99 type KeyContainer: BatchContainer + PushInto<<Self::Target as Update>::Key>;
102 type ValContainer: BatchContainer;
104 type TimeContainer: BatchContainer<Owned = <Self::Target as Update>::Time> + PushInto<<Self::Target as Update>::Time>;
106 type DiffContainer: BatchContainer<Owned = <Self::Target as Update>::Diff> + PushInto<<Self::Target as Update>::Diff>;
108 type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
110}
111
112pub struct Vector<U: Update> {
114 phantom: std::marker::PhantomData<U>,
115}
116
117impl<U: Update> Layout for Vector<U>
118where
119 U::Diff: Ord,
120{
121 type Target = U;
122 type KeyContainer = Vec<U::Key>;
123 type ValContainer = Vec<U::Val>;
124 type TimeContainer = Vec<U::Time>;
125 type DiffContainer = Vec<U::Diff>;
126 type OffsetContainer = OffsetList;
127}
128
129pub struct TStack<U: Update> {
131 phantom: std::marker::PhantomData<U>,
132}
133
134impl<U: Update> Layout for TStack<U>
135where
136 U::Key: Columnation,
137 U::Val: Columnation,
138 U::Time: Columnation,
139 U::Diff: Columnation + Ord,
140{
141 type Target = U;
142 type KeyContainer = TimelyStack<U::Key>;
143 type ValContainer = TimelyStack<U::Val>;
144 type TimeContainer = TimelyStack<U::Time>;
145 type DiffContainer = TimelyStack<U::Diff>;
146 type OffsetContainer = OffsetList;
147}
148
149pub trait PreferredContainer : ToOwned {
153 type Container: BatchContainer + PushInto<Self::Owned>;
155}
156
157impl<T: Ord + Clone + 'static> PreferredContainer for T {
158 type Container = Vec<T>;
159}
160
161impl<T: Ord + Clone + 'static> PreferredContainer for [T] {
162 type Container = SliceContainer<T>;
163}
164
165pub struct Preferred<K: ?Sized, V: ?Sized, T, D> {
167 phantom: std::marker::PhantomData<(Box<K>, Box<V>, T, D)>,
168}
169
170impl<K,V,T,R> Update for Preferred<K, V, T, R>
171where
172 K: ToOwned + ?Sized,
173 K::Owned: Ord+Clone+'static,
174 V: ToOwned + ?Sized,
175 V::Owned: Ord+Clone+'static,
176 T: Ord+Clone+Lattice+timely::progress::Timestamp,
177 R: Ord+Clone+Semigroup+'static,
178{
179 type Key = K::Owned;
180 type Val = V::Owned;
181 type Time = T;
182 type Diff = R;
183}
184
185impl<K, V, T, D> Layout for Preferred<K, V, T, D>
186where
187 K: Ord+ToOwned+PreferredContainer + ?Sized,
188 K::Owned: Ord+Clone+'static,
189 V: Ord+ToOwned+PreferredContainer + ?Sized,
190 V::Owned: Ord+Clone+'static,
191 T: Ord+Clone+Lattice+timely::progress::Timestamp,
192 D: Ord+Clone+Semigroup+'static,
193{
194 type Target = Preferred<K, V, T, D>;
195 type KeyContainer = K::Container;
196 type ValContainer = V::Container;
197 type TimeContainer = Vec<T>;
198 type DiffContainer = Vec<D>;
199 type OffsetContainer = OffsetList;
200}
201
202#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
204pub struct OffsetList {
205 pub zero_prefix: usize,
207 pub smol: Vec<u32>,
209 pub chonk: Vec<u64>,
211}
212
213impl std::fmt::Debug for OffsetList {
214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 f.debug_list().entries(self.into_iter()).finish()
216 }
217}
218
219impl OffsetList {
220 pub fn with_capacity(cap: usize) -> Self {
222 Self {
223 zero_prefix: 0,
224 smol: Vec::with_capacity(cap),
225 chonk: Vec::new(),
226 }
227 }
228 pub fn push(&mut self, offset: usize) {
230 if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
231 self.zero_prefix += 1;
232 }
233 else if self.chonk.is_empty() {
234 if let Ok(smol) = offset.try_into() {
235 self.smol.push(smol);
236 }
237 else {
238 self.chonk.push(offset.try_into().unwrap())
239 }
240 }
241 else {
242 self.chonk.push(offset.try_into().unwrap())
243 }
244 }
245 pub fn index(&self, index: usize) -> usize {
247 if index < self.zero_prefix {
248 0
249 }
250 else if index - self.zero_prefix < self.smol.len() {
251 self.smol[index - self.zero_prefix].try_into().unwrap()
252 }
253 else {
254 self.chonk[index - self.zero_prefix - self.smol.len()].try_into().unwrap()
255 }
256 }
257 pub fn len(&self) -> usize {
259 self.zero_prefix + self.smol.len() + self.chonk.len()
260 }
261}
262
263impl<'a> IntoIterator for &'a OffsetList {
264 type Item = usize;
265 type IntoIter = OffsetListIter<'a>;
266
267 fn into_iter(self) -> Self::IntoIter {
268 OffsetListIter {list: self, index: 0 }
269 }
270}
271
272pub struct OffsetListIter<'a> {
274 list: &'a OffsetList,
275 index: usize,
276}
277
278impl<'a> Iterator for OffsetListIter<'a> {
279 type Item = usize;
280
281 fn next(&mut self) -> Option<Self::Item> {
282 if self.index < self.list.len() {
283 let res = Some(self.list.index(self.index));
284 self.index += 1;
285 res
286 } else {
287 None
288 }
289 }
290}
291
292impl PushInto<usize> for OffsetList {
293 fn push_into(&mut self, item: usize) {
294 self.push(item);
295 }
296}
297
298impl BatchContainer for OffsetList {
299 type Owned = usize;
300 type ReadItem<'a> = usize;
301
302 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
303
304 fn with_capacity(size: usize) -> Self {
305 Self::with_capacity(size)
306 }
307
308 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
309 Self::with_capacity(cont1.len() + cont2.len())
310 }
311
312 fn index(&self, index: usize) -> Self::ReadItem<'_> {
313 self.index(index)
314 }
315
316 fn len(&self) -> usize {
317 self.len()
318 }
319}
320
321pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: Container {
323 type Key<'a>: Ord;
325 type Val<'a>: Ord;
327 type Time;
329 type Diff;
331
332 fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
334
335 fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
337
338 fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
340
341 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
343}
344
345impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
346where
347 K: Ord + Clone + 'static,
348 KBC: BatchContainer,
349 for<'a> KBC::ReadItem<'a>: PartialEq<&'a K>,
350 V: Ord + Clone + 'static,
351 VBC: BatchContainer,
352 for<'a> VBC::ReadItem<'a>: PartialEq<&'a V>,
353 T: Timestamp + Lattice + Clone + 'static,
354 R: Ord + Semigroup + 'static,
355{
356 type Key<'a> = K;
357 type Val<'a> = V;
358 type Time = T;
359 type Diff = R;
360
361 fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
362 (key, val, time, diff)
363 }
364
365 fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
366 KBC::reborrow(other) == this
367 }
368
369 fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
370 VBC::reborrow(other) == this
371 }
372
373 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
374 let mut keys = 0;
375 let mut vals = 0;
376 let mut upds = 0;
377 let mut prev_keyval = None;
378 for link in chain.iter() {
379 for ((key, val), _, _) in link.iter() {
380 if let Some((p_key, p_val)) = prev_keyval {
381 if p_key != key {
382 keys += 1;
383 vals += 1;
384 } else if p_val != val {
385 vals += 1;
386 }
387 } else {
388 keys += 1;
389 vals += 1;
390 }
391 upds += 1;
392 prev_keyval = Some((key, val));
393 }
394 }
395 (keys, vals, upds)
396 }
397}
398
399impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
400where
401 K: BatchContainer,
402 for<'a> K::ReadItem<'a>: PartialEq<&'a K::Owned>,
403 K::Owned: Ord + Columnation + Clone + 'static,
404 V: BatchContainer,
405 for<'a> V::ReadItem<'a>: PartialEq<&'a V::Owned>,
406 V::Owned: Ord + Columnation + Clone + 'static,
407 T: Timestamp + Lattice + Columnation + Clone + 'static,
408 R: Ord + Clone + Semigroup + Columnation + 'static,
409{
410 type Key<'a> = &'a K::Owned;
411 type Val<'a> = &'a V::Owned;
412 type Time = T;
413 type Diff = R;
414
415 fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
416 (key, val, time.clone(), diff.clone())
417 }
418
419 fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
420 K::reborrow(other) == *this
421 }
422
423 fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
424 V::reborrow(other) == *this
425 }
426
427 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
428 let mut keys = 0;
429 let mut vals = 0;
430 let mut upds = 0;
431 let mut prev_keyval = None;
432 for link in chain.iter() {
433 for ((key, val), _, _) in link.iter() {
434 if let Some((p_key, p_val)) = prev_keyval {
435 if p_key != key {
436 keys += 1;
437 vals += 1;
438 } else if p_val != val {
439 vals += 1;
440 }
441 } else {
442 keys += 1;
443 vals += 1;
444 }
445 upds += 1;
446 prev_keyval = Some((key, val));
447 }
448 }
449 (keys, vals, upds)
450 }
451}
452
453pub use self::containers::{BatchContainer, SliceContainer};
454
455pub mod containers {
457
458 use columnation::Columnation;
459 use timely::container::PushInto;
460
461 use crate::containers::TimelyStack;
462 use crate::IntoOwned;
463
464 pub trait BatchContainer: for<'a> PushInto<Self::ReadItem<'a>> + 'static {
466 type Owned;
468
469 type ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned = Self::Owned>;
471
472 fn push<D>(&mut self, item: D) where Self: PushInto<D> {
474 self.push_into(item);
475 }
476 fn with_capacity(size: usize) -> Self;
478 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
480
481 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
483
484 fn index(&self, index: usize) -> Self::ReadItem<'_>;
486 fn len(&self) -> usize;
488 fn last(&self) -> Option<Self::ReadItem<'_>> {
490 if self.len() > 0 {
491 Some(self.index(self.len()-1))
492 }
493 else {
494 None
495 }
496 }
497 fn is_empty(&self) -> bool { self.len() == 0 }
499
500 fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
507
508 let small_limit = 8;
509
510 if end > start + small_limit && function(self.index(start + small_limit)) {
512
513 let mut index = small_limit + 1;
515 if start + index < end && function(self.index(start + index)) {
516
517 let mut step = 1;
519 while start + index + step < end && function(self.index(start + index + step)) {
520 index += step;
521 step <<= 1;
522 }
523
524 step >>= 1;
526 while step > 0 {
527 if start + index + step < end && function(self.index(start + index + step)) {
528 index += step;
529 }
530 step >>= 1;
531 }
532
533 index += 1;
534 }
535
536 index
537 }
538 else {
539 let limit = std::cmp::min(end, start + small_limit);
540 (start .. limit).filter(|x| function(self.index(*x))).count()
541 }
542 }
543 }
544
545 impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
548 type Owned = T;
549 type ReadItem<'a> = &'a T;
550
551 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
552
553 fn with_capacity(size: usize) -> Self {
554 Vec::with_capacity(size)
555 }
556 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
557 Vec::with_capacity(cont1.len() + cont2.len())
558 }
559 fn index(&self, index: usize) -> Self::ReadItem<'_> {
560 &self[index]
561 }
562 fn len(&self) -> usize {
563 self[..].len()
564 }
565 }
566
567 impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
570 type Owned = T;
571 type ReadItem<'a> = &'a T;
572
573 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
574
575 fn with_capacity(size: usize) -> Self {
576 Self::with_capacity(size)
577 }
578 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
579 let mut new = Self::default();
580 new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
581 new
582 }
583 fn index(&self, index: usize) -> Self::ReadItem<'_> {
584 &self[index]
585 }
586 fn len(&self) -> usize {
587 self[..].len()
588 }
589 }
590
591 pub struct SliceContainer<B> {
593 offsets: Vec<usize>,
598 inner: Vec<B>,
600 }
601
602 impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
603 fn push_into(&mut self, item: &[B]) {
604 for x in item.iter() {
605 self.inner.push_into(x);
606 }
607 self.offsets.push(self.inner.len());
608 }
609 }
610
611 impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
612 fn push_into(&mut self, item: &Vec<B>) {
613 self.push_into(&item[..]);
614 }
615 }
616
617 impl<B> PushInto<Vec<B>> for SliceContainer<B> {
618 fn push_into(&mut self, item: Vec<B>) {
619 for x in item.into_iter() {
620 self.inner.push(x);
621 }
622 self.offsets.push(self.inner.len());
623 }
624 }
625
626 impl<B> BatchContainer for SliceContainer<B>
627 where
628 B: Ord + Clone + Sized + 'static,
629 {
630 type Owned = Vec<B>;
631 type ReadItem<'a> = &'a [B];
632
633 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
634
635 fn with_capacity(size: usize) -> Self {
636 let mut offsets = Vec::with_capacity(size + 1);
637 offsets.push(0);
638 Self {
639 offsets,
640 inner: Vec::with_capacity(size),
641 }
642 }
643 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
644 let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
645 offsets.push(0);
646 Self {
647 offsets,
648 inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
649 }
650 }
651 fn index(&self, index: usize) -> Self::ReadItem<'_> {
652 let lower = self.offsets[index];
653 let upper = self.offsets[index+1];
654 &self.inner[lower .. upper]
655 }
656 fn len(&self) -> usize {
657 self.offsets.len() - 1
658 }
659 }
660
661 impl<B> Default for SliceContainer<B> {
663 fn default() -> Self {
664 Self {
665 offsets: vec![0],
666 inner: Default::default(),
667 }
668 }
669 }
670}