1use std::rc::Rc;
9use std::cmp::Ordering;
10
11use serde::{Deserialize, Serialize};
12
13use crate::Hashable;
14use crate::containers::TimelyStack;
15use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::implementations::spine_fueled::Spine;
18use crate::trace::rc_blanket_impls::RcBuilder;
19
20use super::{Layout, Vector, TStack};
21
22use self::val_batch::{RhhValBatch, RhhValBuilder};
23
24pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
26pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
36pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41pub trait HashOrdered: Hashable { }
46
47impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { }
48
49#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
51pub struct HashWrapper<T: std::hash::Hash + Hashable> {
52 pub inner: T
54}
55
56impl<T: PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> PartialOrd for HashWrapper<T> {
57 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
58 let this_hash = self.inner.hashed();
59 let that_hash = other.inner.hashed();
60 (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner))
61 }
62}
63
64impl<T: Ord + PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> Ord for HashWrapper<T> {
65 fn cmp(&self, other: &Self) -> Ordering {
66 self.partial_cmp(other).unwrap()
67 }
68}
69
70impl<T: std::hash::Hash + Hashable> HashOrdered for HashWrapper<T> { }
71
72impl<T: std::hash::Hash + Hashable> Hashable for HashWrapper<T> {
73 type Output = T::Output;
74 fn hashed(&self) -> Self::Output { self.inner.hashed() }
75}
76
77impl<T: std::hash::Hash + Hashable> HashOrdered for &HashWrapper<T> { }
78
79impl<T: std::hash::Hash + Hashable> Hashable for &HashWrapper<T> {
80 type Output = T::Output;
81 fn hashed(&self) -> Self::Output { self.inner.hashed() }
82}
83
84mod val_batch {
85
86 use std::convert::TryInto;
87 use std::marker::PhantomData;
88 use serde::{Deserialize, Serialize};
89 use timely::container::PushInto;
90 use timely::progress::{Antichain, frontier::AntichainRef};
91
92 use crate::hashable::Hashable;
93 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
94 use crate::trace::implementations::{BatchContainer, BuilderInput};
95 use crate::trace::implementations::layout;
96
97 use super::{Layout, HashOrdered};
98
99 #[derive(Debug, Serialize, Deserialize)]
114 pub struct RhhValStorage<L: Layout>
115 where
116 layout::Key<L>: Default + HashOrdered,
117 {
118
119 pub key_capacity: usize,
123 pub divisor: u64,
126 pub key_count: usize,
128
129 pub keys: L::KeyContainer,
131 pub keys_offs: L::OffsetContainer,
135 pub vals: L::ValContainer,
137 pub vals_offs: L::OffsetContainer,
146 pub times: L::TimeContainer,
148 pub diffs: L::DiffContainer,
150 }
151
152 impl<L: Layout> RhhValStorage<L>
153 where
154 layout::Key<L>: Default + HashOrdered,
155 for<'a> layout::KeyRef<'a, L>: HashOrdered,
156 {
157 fn values_for_key(&self, index: usize) -> (usize, usize) {
159 let lower = self.keys_offs.index(index);
160 let upper = self.keys_offs.index(index+1);
161 assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
163 (lower, upper)
164 }
165 fn updates_for_value(&self, index: usize) -> (usize, usize) {
167 let mut lower = self.vals_offs.index(index);
168 let upper = self.vals_offs.index(index+1);
169 if lower == upper {
172 assert!(lower > 0);
173 lower -= 1;
174 }
175 (lower, upper)
176 }
177
178 fn insert_key(&mut self, key: layout::KeyRef<'_, L>, offset: Option<usize>) {
188 let desired = self.desired_location(&key);
189 while self.keys.len() < desired {
192 let current_offset = self.keys_offs.index(self.keys.len());
194 self.keys.push_own(&<layout::Key<L> as Default>::default());
195 self.keys_offs.push_ref(current_offset);
196 }
197
198 self.keys.push_ref(key);
201 if let Some(offset) = offset {
202 self.keys_offs.push_ref(offset);
203 }
204 self.key_count += 1;
205 }
206
207 fn insert_key_own(&mut self, key: &layout::Key<L>, offset: Option<usize>) {
209 let mut key_con = L::KeyContainer::with_capacity(1);
210 key_con.push_own(&key);
211 self.insert_key(key_con.index(0), offset)
212 }
213
214 fn desired_location<K: Hashable>(&self, key: &K) -> usize {
216 if self.divisor == 0 { 0 }
217 else {
218 (key.hashed().into() / self.divisor).try_into().expect("divisor not large enough to force u64 into uisze")
219 }
220 }
221
222 fn advance_key(&self, index: usize, key: layout::KeyRef<'_, L>) -> bool {
224 !self.live_key(index) || self.keys.index(index).lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
226 }
227
228 fn live_key(&self, index: usize) -> bool {
230 self.keys_offs.index(index) != self.keys_offs.index(index+1)
231 }
232
233 fn advance_to_live_key(&self, index: &mut usize) {
235 while *index < self.keys.len() && !self.live_key(*index) {
236 *index += 1;
237 }
238 }
239
240 fn divisor_for_capacity(capacity: usize) -> u64 {
247 let capacity: u64 = capacity.try_into().expect("usize exceeds u64");
248 if capacity == 0 || capacity == 1 { 0 }
249 else {
250 ((1 << 63) / capacity) << 1
251 }
252 }
253 }
254
255 #[derive(Serialize, Deserialize)]
260 #[serde(bound = "
261 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
262 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
263 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
264 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
265 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
266 ")]
267 pub struct RhhValBatch<L: Layout>
268 where
269 layout::Key<L>: Default + HashOrdered,
270 {
271 pub storage: RhhValStorage<L>,
273 pub description: Description<layout::Time<L>>,
275 pub updates: usize,
281 }
282
283 impl<L: Layout> WithLayout for RhhValBatch<L>
284 where
285 layout::Key<L>: Default + HashOrdered,
286 for<'a> layout::KeyRef<'a, L>: HashOrdered,
287 {
288 type Layout = L;
289 }
290
291 impl<L: Layout> BatchReader for RhhValBatch<L>
292 where
293 layout::Key<L>: Default + HashOrdered,
294 for<'a> layout::KeyRef<'a, L>: HashOrdered,
295 {
296 type Cursor = RhhValCursor<L>;
297 fn cursor(&self) -> Self::Cursor {
298 let mut cursor = RhhValCursor {
299 key_cursor: 0,
300 val_cursor: 0,
301 phantom: std::marker::PhantomData,
302 };
303 cursor.step_key(self);
304 cursor
305 }
306 fn len(&self) -> usize {
307 self.updates
310 }
311 fn description(&self) -> &Description<layout::Time<L>> { &self.description }
312 }
313
314 impl<L: Layout> Batch for RhhValBatch<L>
315 where
316 layout::Key<L>: Default + HashOrdered,
317 for<'a> layout::KeyRef<'a, L>: HashOrdered,
318 {
319 type Merger = RhhValMerger<L>;
320
321 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
322 RhhValMerger::new(self, other, compaction_frontier)
323 }
324
325 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
326 use timely::progress::Timestamp;
327 Self {
328 storage: RhhValStorage {
329 keys: L::KeyContainer::with_capacity(0),
330 keys_offs: L::OffsetContainer::with_capacity(0),
331 vals: L::ValContainer::with_capacity(0),
332 vals_offs: L::OffsetContainer::with_capacity(0),
333 times: L::TimeContainer::with_capacity(0),
334 diffs: L::DiffContainer::with_capacity(0),
335 key_count: 0,
336 key_capacity: 0,
337 divisor: 0,
338 },
339 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
340 updates: 0,
341 }
342 }
343 }
344
345 pub struct RhhValMerger<L: Layout>
347 where
348 layout::Key<L>: Default + HashOrdered,
349 {
350 key_cursor1: usize,
352 key_cursor2: usize,
354 result: RhhValStorage<L>,
356 description: Description<layout::Time<L>>,
358
359 update_stash: Vec<(layout::Time<L>, layout::Diff<L>)>,
364 singletons: usize,
366 }
367
368 impl<L: Layout> Merger<RhhValBatch<L>> for RhhValMerger<L>
369 where
370 layout::Key<L>: Default + HashOrdered,
371 RhhValBatch<L>: Batch<Time=layout::Time<L>>,
372 for<'a> layout::KeyRef<'a, L>: HashOrdered,
373 {
374 fn new(batch1: &RhhValBatch<L>, batch2: &RhhValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
375
376 assert!(batch1.upper() == batch2.lower());
377 use crate::lattice::Lattice;
378 let mut since = batch1.description().since().join(batch2.description().since());
379 since = since.join(&compaction_frontier.to_owned());
380
381 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
382
383 let max_cap = batch1.len() + batch2.len();
386 let rhh_cap = 2 * max_cap;
387
388 let batch1 = &batch1.storage;
389 let batch2 = &batch2.storage;
390
391 let mut storage = RhhValStorage {
392 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
393 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
394 vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
395 vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
396 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
397 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
398 key_count: 0,
399 key_capacity: rhh_cap,
400 divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
401 };
402
403 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
405 keys_offs.push_ref(0);
406 let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
407 vals_offs.push_ref(0);
408
409 RhhValMerger {
410 key_cursor1: 0,
411 key_cursor2: 0,
412 result: storage,
413 description,
414 update_stash: Vec::new(),
415 singletons: 0,
416 }
417 }
418 fn done(self) -> RhhValBatch<L> {
419 RhhValBatch {
420 updates: self.result.times.len() + self.singletons,
421 storage: self.result,
422 description: self.description,
423 }
424 }
425 fn work(&mut self, source1: &RhhValBatch<L>, source2: &RhhValBatch<L>, fuel: &mut isize) {
426
427 let starting_updates = self.result.times.len();
429 let mut effort = 0isize;
430
431 source1.storage.advance_to_live_key(&mut self.key_cursor1);
432 source2.storage.advance_to_live_key(&mut self.key_cursor2);
433
434 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
436 self.merge_key(&source1.storage, &source2.storage);
437 source1.storage.advance_to_live_key(&mut self.key_cursor1);
438 source2.storage.advance_to_live_key(&mut self.key_cursor2);
439 effort = (self.result.times.len() - starting_updates) as isize;
441 }
442
443 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
446 self.copy_key(&source1.storage, self.key_cursor1);
447 self.key_cursor1 += 1;
448 source1.storage.advance_to_live_key(&mut self.key_cursor1);
449 effort = (self.result.times.len() - starting_updates) as isize;
450 }
451 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
452 self.copy_key(&source2.storage, self.key_cursor2);
453 self.key_cursor2 += 1;
454 source2.storage.advance_to_live_key(&mut self.key_cursor2);
455 effort = (self.result.times.len() - starting_updates) as isize;
456 }
457
458 *fuel -= effort;
459 }
460 }
461
462 impl<L: Layout> RhhValMerger<L>
464 where
465 layout::Key<L>: Default + HashOrdered,
466 for<'a> layout::KeyRef<'a, L>: HashOrdered,
467 {
468 fn copy_key(&mut self, source: &RhhValStorage<L>, cursor: usize) {
476 let init_vals = self.result.vals.len();
478 let (mut lower, upper) = source.values_for_key(cursor);
479 while lower < upper {
480 self.stash_updates_for_val(source, lower);
481 if let Some(off) = self.consolidate_updates() {
482 self.result.vals_offs.push_ref(off);
483 self.result.vals.push_ref(source.vals.index(lower));
484 }
485 lower += 1;
486 }
487
488 if self.result.vals.len() > init_vals {
490 self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len()));
491 }
492 }
493 fn merge_key(&mut self, source1: &RhhValStorage<L>, source2: &RhhValStorage<L>) {
498
499 use ::std::cmp::Ordering;
500 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
501 Ordering::Less => {
502 self.copy_key(source1, self.key_cursor1);
503 self.key_cursor1 += 1;
504 },
505 Ordering::Equal => {
506 let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
508 let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
509 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
510 self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off));
511 }
512 self.key_cursor1 += 1;
514 self.key_cursor2 += 1;
515 },
516 Ordering::Greater => {
517 self.copy_key(source2, self.key_cursor2);
518 self.key_cursor2 += 1;
519 },
520 }
521 }
522 fn merge_vals(
527 &mut self,
528 (source1, mut lower1, upper1): (&RhhValStorage<L>, usize, usize),
529 (source2, mut lower2, upper2): (&RhhValStorage<L>, usize, usize),
530 ) -> Option<usize> {
531 let init_vals = self.result.vals.len();
533 while lower1 < upper1 && lower2 < upper2 {
534 use ::std::cmp::Ordering;
538 match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
539 Ordering::Less => {
540 self.stash_updates_for_val(source1, lower1);
542 if let Some(off) = self.consolidate_updates() {
543 self.result.vals_offs.push_ref(off);
544 self.result.vals.push_ref(source1.vals.index(lower1));
545 }
546 lower1 += 1;
547 },
548 Ordering::Equal => {
549 self.stash_updates_for_val(source1, lower1);
550 self.stash_updates_for_val(source2, lower2);
551 if let Some(off) = self.consolidate_updates() {
552 self.result.vals_offs.push_ref(off);
553 self.result.vals.push_ref(source1.vals.index(lower1));
554 }
555 lower1 += 1;
556 lower2 += 1;
557 },
558 Ordering::Greater => {
559 self.stash_updates_for_val(source2, lower2);
561 if let Some(off) = self.consolidate_updates() {
562 self.result.vals_offs.push_ref(off);
563 self.result.vals.push_ref(source2.vals.index(lower2));
564 }
565 lower2 += 1;
566 },
567 }
568 }
569 while lower1 < upper1 {
571 self.stash_updates_for_val(source1, lower1);
572 if let Some(off) = self.consolidate_updates() {
573 self.result.vals_offs.push_ref(off);
574 self.result.vals.push_ref(source1.vals.index(lower1));
575 }
576 lower1 += 1;
577 }
578 while lower2 < upper2 {
579 self.stash_updates_for_val(source2, lower2);
580 if let Some(off) = self.consolidate_updates() {
581 self.result.vals_offs.push_ref(off);
582 self.result.vals.push_ref(source2.vals.index(lower2));
583 }
584 lower2 += 1;
585 }
586
587 if self.result.vals.len() > init_vals {
589 Some(self.result.vals.len())
590 } else {
591 None
592 }
593 }
594
595 fn stash_updates_for_val(&mut self, source: &RhhValStorage<L>, index: usize) {
597 let (lower, upper) = source.updates_for_value(index);
598 for i in lower .. upper {
599 let time = source.times.index(i);
601 let diff = source.diffs.index(i);
602 let mut new_time = L::TimeContainer::into_owned(time);
603 use crate::lattice::Lattice;
604 new_time.advance_by(self.description.since().borrow());
605 self.update_stash.push((new_time, L::DiffContainer::into_owned(diff)));
606 }
607 }
608
609 fn consolidate_updates(&mut self) -> Option<usize> {
611 use crate::consolidation;
612 consolidation::consolidate(&mut self.update_stash);
613 if !self.update_stash.is_empty() {
614 let time_diff = self.result.times.last().zip(self.result.diffs.last());
617 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
618 *t1 == L::TimeContainer::into_owned(t2) && *d1 == L::DiffContainer::into_owned(d2)
620 });
621 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
622 self.update_stash.clear();
624 self.singletons += 1;
625 }
626 else {
627 for (time, diff) in self.update_stash.drain(..) {
629 self.result.times.push_own(&time);
630 self.result.diffs.push_own(&diff);
631 }
632 }
633 Some(self.result.times.len())
634 } else {
635 None
636 }
637 }
638 }
639
640
641 pub struct RhhValCursor<L: Layout>
649 where
650 layout::Key<L>: Default + HashOrdered,
651 {
652 key_cursor: usize,
654 val_cursor: usize,
656 phantom: PhantomData<L>,
658 }
659
660 use crate::trace::implementations::WithLayout;
661 impl<L: Layout> WithLayout for RhhValCursor<L>
662 where
663 layout::Key<L>: Default + HashOrdered,
664 for<'a> layout::KeyRef<'a, L>: HashOrdered,
665 {
666 type Layout = L;
667 }
668
669 impl<L: Layout> Cursor for RhhValCursor<L>
670 where
671 layout::Key<L>: Default + HashOrdered,
672 for<'a> layout::KeyRef<'a, L>: HashOrdered,
673 {
674 type Storage = RhhValBatch<L>;
675
676 fn get_key<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
677 fn get_val<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Val<'a>> { if self.val_valid(storage) { storage.storage.vals.get(self.val_cursor) } else { None } }
678 fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
679 fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
680 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
681 let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
682 for index in lower .. upper {
683 let time = storage.storage.times.index(index);
684 let diff = storage.storage.diffs.index(index);
685 logic(time, diff);
686 }
687 }
688 fn key_valid(&self, storage: &RhhValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
689 fn val_valid(&self, storage: &RhhValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
690 fn step_key(&mut self, storage: &RhhValBatch<L>){
691 self.key_cursor += 1;
693 storage.storage.advance_to_live_key(&mut self.key_cursor);
694
695 if self.key_valid(storage) {
696 self.rewind_vals(storage);
697 }
698 else {
699 self.key_cursor = storage.storage.keys.len();
700 }
701 }
702 fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
703 let desired = storage.storage.desired_location(&key);
705 if self.key_cursor < desired {
707 self.key_cursor = desired;
708 }
709 while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) {
713 self.key_cursor += 1;
716 }
717
718 if self.key_valid(storage) {
719 self.rewind_vals(storage);
720 }
721 }
722 fn step_val(&mut self, storage: &RhhValBatch<L>) {
723 self.val_cursor += 1;
724 if !self.val_valid(storage) {
725 self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
726 }
727 }
728 fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
729 self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
730 }
731 fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
732 self.key_cursor = 0;
733 storage.storage.advance_to_live_key(&mut self.key_cursor);
734
735 if self.key_valid(storage) {
736 self.rewind_vals(storage)
737 }
738 }
739 fn rewind_vals(&mut self, storage: &RhhValBatch<L>) {
740 self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
741 }
742 }
743
744 pub struct RhhValBuilder<L: Layout, CI>
746 where
747 layout::Key<L>: Default + HashOrdered,
748 {
749 result: RhhValStorage<L>,
750 singleton: Option<(layout::Time<L>, layout::Diff<L>)>,
751 singletons: usize,
756 _marker: PhantomData<CI>,
757 }
758
759 impl<L: Layout, CI> RhhValBuilder<L, CI>
760 where
761 layout::Key<L>: Default + HashOrdered,
762 {
763 fn push_update(&mut self, time: layout::Time<L>, diff: layout::Diff<L>) {
775 if self.result.times.last().map(|t| L::TimeContainer::into_owned(t) == time).unwrap_or(false) && self.result.diffs.last().map(|d| L::DiffContainer::into_owned(d) == diff).unwrap_or(false) {
778 assert!(self.singleton.is_none());
779 self.singleton = Some((time, diff));
780 }
781 else {
782 if let Some((time, diff)) = self.singleton.take() {
784 self.result.times.push_own(&time);
785 self.result.diffs.push_own(&diff);
786 }
787 self.result.times.push_own(&time);
788 self.result.diffs.push_own(&diff);
789 }
790 }
791 }
792
793 impl<L: Layout, CI> Builder for RhhValBuilder<L, CI>
794 where
795 layout::Key<L>: Default + HashOrdered,
796 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Key<'a> = layout::Key<L>, Time=layout::Time<L>, Diff=layout::Diff<L>>,
797 for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
798 for<'a> layout::KeyRef<'a, L>: HashOrdered,
799 {
800 type Input = CI;
801 type Time = layout::Time<L>;
802 type Output = RhhValBatch<L>;
803
804 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
805
806 let rhh_capacity = 2 * keys;
808 let divisor = RhhValStorage::<L>::divisor_for_capacity(rhh_capacity);
809 let keys = rhh_capacity + 10;
813
814 Self {
816 result: RhhValStorage {
817 keys: L::KeyContainer::with_capacity(keys),
818 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
819 vals: L::ValContainer::with_capacity(vals),
820 vals_offs: L::OffsetContainer::with_capacity(vals + 1),
821 times: L::TimeContainer::with_capacity(upds),
822 diffs: L::DiffContainer::with_capacity(upds),
823 key_count: 0,
824 key_capacity: rhh_capacity,
825 divisor,
826 },
827 singleton: None,
828 singletons: 0,
829 _marker: PhantomData,
830 }
831 }
832
833 #[inline]
834 fn push(&mut self, chunk: &mut Self::Input) {
835 for item in chunk.drain() {
836 let (key, val, time, diff) = CI::into_parts(item);
837 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
839 if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
841 self.push_update(time, diff);
842 } else {
843 self.result.vals_offs.push_ref(self.result.times.len());
845 if self.singleton.take().is_some() { self.singletons += 1; }
846 self.push_update(time, diff);
847 self.result.vals.push_into(val);
848 }
849 } else {
850 self.result.vals_offs.push_ref(self.result.times.len());
852 if self.singleton.take().is_some() { self.singletons += 1; }
853 self.result.keys_offs.push_ref(self.result.vals.len());
854 self.push_update(time, diff);
855 self.result.vals.push_into(val);
856 self.result.insert_key_own(&key, None);
858 }
859 }
860 }
861
862 #[inline(never)]
863 fn done(mut self, description: Description<Self::Time>) -> RhhValBatch<L> {
864 self.result.vals_offs.push_ref(self.result.times.len());
866 if self.singleton.take().is_some() { self.singletons += 1; }
868 self.result.keys_offs.push_ref(self.result.vals.len());
869 RhhValBatch {
870 updates: self.result.times.len() + self.singletons,
871 storage: self.result,
872 description,
873 }
874 }
875
876 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
877 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
878 let mut builder = Self::with_capacity(keys, vals, upds);
879 for mut chunk in chain.drain(..) {
880 builder.push(&mut chunk);
881 }
882
883 builder.done(description)
884 }
885 }
886
887}
888
889mod key_batch {
890
891 }