1use std::rc::Rc;
9use std::cmp::Ordering;
10
11use serde::{Deserialize, Serialize};
12
13use crate::Hashable;
14use crate::containers::TimelyStack;
15use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::implementations::spine_fueled::Spine;
18use crate::trace::rc_blanket_impls::RcBuilder;
19
20use super::{Update, Layout, Vector, TStack};
21
22use self::val_batch::{RhhValBatch, RhhValBuilder};
23
24pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
26pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
36pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41pub trait HashOrdered: Hashable { }
46
47impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { }
48
49#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
51pub struct HashWrapper<T: std::hash::Hash + Hashable> {
52 pub inner: T
54}
55
56impl<T: PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> PartialOrd for HashWrapper<T> {
57 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
58 let this_hash = self.inner.hashed();
59 let that_hash = other.inner.hashed();
60 (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner))
61 }
62}
63
64impl<T: Ord + PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> Ord for HashWrapper<T> {
65 fn cmp(&self, other: &Self) -> Ordering {
66 self.partial_cmp(other).unwrap()
67 }
68}
69
70impl<T: std::hash::Hash + Hashable> HashOrdered for HashWrapper<T> { }
71
72impl<T: std::hash::Hash + Hashable> Hashable for HashWrapper<T> {
73 type Output = T::Output;
74 fn hashed(&self) -> Self::Output { self.inner.hashed() }
75}
76
77impl<T: std::hash::Hash + Hashable> HashOrdered for &HashWrapper<T> { }
78
79impl<T: std::hash::Hash + Hashable> Hashable for &HashWrapper<T> {
80 type Output = T::Output;
81 fn hashed(&self) -> Self::Output { self.inner.hashed() }
82}
83
84mod val_batch {
85
86 use std::convert::TryInto;
87 use std::marker::PhantomData;
88 use serde::{Deserialize, Serialize};
89 use timely::container::PushInto;
90 use timely::progress::{Antichain, frontier::AntichainRef};
91
92 use crate::hashable::Hashable;
93 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
94 use crate::trace::implementations::{BatchContainer, BuilderInput};
95 use crate::IntoOwned;
96
97 use super::{Layout, Update, HashOrdered};
98
99 #[derive(Debug, Serialize, Deserialize)]
114 pub struct RhhValStorage<L: Layout>
115 where
116 <L::Target as Update>::Key: Default + HashOrdered,
117 {
118
119 pub key_capacity: usize,
123 pub divisor: u64,
126 pub key_count: usize,
128
129 pub keys: L::KeyContainer,
131 pub keys_offs: L::OffsetContainer,
135 pub vals: L::ValContainer,
137 pub vals_offs: L::OffsetContainer,
146 pub times: L::TimeContainer,
148 pub diffs: L::DiffContainer,
150 }
151
152 impl<L: Layout> RhhValStorage<L>
153 where
154 <L::Target as Update>::Key: Default + HashOrdered,
155 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
156 {
157 fn values_for_key(&self, index: usize) -> (usize, usize) {
159 let lower = self.keys_offs.index(index);
160 let upper = self.keys_offs.index(index+1);
161 assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
163 (lower, upper)
164 }
165 fn updates_for_value(&self, index: usize) -> (usize, usize) {
167 let mut lower = self.vals_offs.index(index);
168 let upper = self.vals_offs.index(index+1);
169 if lower == upper {
172 assert!(lower > 0);
173 lower -= 1;
174 }
175 (lower, upper)
176 }
177
178 fn insert_key(&mut self, key: <L::KeyContainer as BatchContainer>::ReadItem<'_>, offset: Option<usize>) {
188 let desired = self.desired_location(&key);
189 while self.keys.len() < desired {
192 let current_offset = self.keys_offs.index(self.keys.len());
194 self.keys.push(<<L::Target as Update>::Key as Default>::default());
195 self.keys_offs.push(current_offset);
196 }
197
198 self.keys.push(key);
201 if let Some(offset) = offset {
202 self.keys_offs.push(offset);
203 }
204 self.key_count += 1;
205 }
206
207 fn desired_location<K: Hashable>(&self, key: &K) -> usize {
209 if self.divisor == 0 { 0 }
210 else {
211 (key.hashed().into() / self.divisor).try_into().expect("divisor not large enough to force u64 into uisze")
212 }
213 }
214
215 fn advance_key(&self, index: usize, key: <L::KeyContainer as BatchContainer>::ReadItem<'_>) -> bool {
217 !self.live_key(index) || self.keys.index(index).lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
219 }
220
221 fn live_key(&self, index: usize) -> bool {
223 self.keys_offs.index(index) != self.keys_offs.index(index+1)
224 }
225
226 fn advance_to_live_key(&self, index: &mut usize) {
228 while *index < self.keys.len() && !self.live_key(*index) {
229 *index += 1;
230 }
231 }
232
233 fn divisor_for_capacity(capacity: usize) -> u64 {
240 let capacity: u64 = capacity.try_into().expect("usize exceeds u64");
241 if capacity == 0 || capacity == 1 { 0 }
242 else {
243 ((1 << 63) / capacity) << 1
244 }
245 }
246 }
247
248 #[derive(Serialize, Deserialize)]
253 #[serde(bound = "
254 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
255 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
256 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
257 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
258 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
259 ")]
260 pub struct RhhValBatch<L: Layout>
261 where
262 <L::Target as Update>::Key: Default + HashOrdered,
263 {
264 pub storage: RhhValStorage<L>,
266 pub description: Description<<L::Target as Update>::Time>,
268 pub updates: usize,
274 }
275
276 impl<L: Layout> BatchReader for RhhValBatch<L>
277 where
278 <L::Target as Update>::Key: Default + HashOrdered,
279 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
280 {
281 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
282 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
283 type Time = <L::Target as Update>::Time;
284 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
285 type Diff = <L::Target as Update>::Diff;
286 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
287
288 type Cursor = RhhValCursor<L>;
289 fn cursor(&self) -> Self::Cursor {
290 let mut cursor = RhhValCursor {
291 key_cursor: 0,
292 val_cursor: 0,
293 phantom: std::marker::PhantomData,
294 };
295 cursor.step_key(self);
296 cursor
297 }
298 fn len(&self) -> usize {
299 self.updates
302 }
303 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
304 }
305
306 impl<L: Layout> Batch for RhhValBatch<L>
307 where
308 <L::Target as Update>::Key: Default + HashOrdered,
309 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
310 {
311 type Merger = RhhValMerger<L>;
312
313 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
314 RhhValMerger::new(self, other, compaction_frontier)
315 }
316
317 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
318 use timely::progress::Timestamp;
319 Self {
320 storage: RhhValStorage {
321 keys: L::KeyContainer::with_capacity(0),
322 keys_offs: L::OffsetContainer::with_capacity(0),
323 vals: L::ValContainer::with_capacity(0),
324 vals_offs: L::OffsetContainer::with_capacity(0),
325 times: L::TimeContainer::with_capacity(0),
326 diffs: L::DiffContainer::with_capacity(0),
327 key_count: 0,
328 key_capacity: 0,
329 divisor: 0,
330 },
331 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
332 updates: 0,
333 }
334 }
335 }
336
337 pub struct RhhValMerger<L: Layout>
339 where
340 <L::Target as Update>::Key: Default + HashOrdered,
341 {
342 key_cursor1: usize,
344 key_cursor2: usize,
346 result: RhhValStorage<L>,
348 description: Description<<L::Target as Update>::Time>,
350
351 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
356 singletons: usize,
358 }
359
360 impl<L: Layout> Merger<RhhValBatch<L>> for RhhValMerger<L>
361 where
362 <L::Target as Update>::Key: Default + HashOrdered,
363 RhhValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
364 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
365 {
366 fn new(batch1: &RhhValBatch<L>, batch2: &RhhValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
367
368 assert!(batch1.upper() == batch2.lower());
369 use crate::lattice::Lattice;
370 let mut since = batch1.description().since().join(batch2.description().since());
371 since = since.join(&compaction_frontier.to_owned());
372
373 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
374
375 let max_cap = batch1.len() + batch2.len();
378 let rhh_cap = 2 * max_cap;
379
380 let batch1 = &batch1.storage;
381 let batch2 = &batch2.storage;
382
383 let mut storage = RhhValStorage {
384 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
385 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
386 vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
387 vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
388 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
389 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
390 key_count: 0,
391 key_capacity: rhh_cap,
392 divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
393 };
394
395 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
397 keys_offs.push(0);
398 let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
399 vals_offs.push(0);
400
401 RhhValMerger {
402 key_cursor1: 0,
403 key_cursor2: 0,
404 result: storage,
405 description,
406 update_stash: Vec::new(),
407 singletons: 0,
408 }
409 }
410 fn done(self) -> RhhValBatch<L> {
411 RhhValBatch {
412 updates: self.result.times.len() + self.singletons,
413 storage: self.result,
414 description: self.description,
415 }
416 }
417 fn work(&mut self, source1: &RhhValBatch<L>, source2: &RhhValBatch<L>, fuel: &mut isize) {
418
419 let starting_updates = self.result.times.len();
421 let mut effort = 0isize;
422
423 source1.storage.advance_to_live_key(&mut self.key_cursor1);
424 source2.storage.advance_to_live_key(&mut self.key_cursor2);
425
426 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
428 self.merge_key(&source1.storage, &source2.storage);
429 source1.storage.advance_to_live_key(&mut self.key_cursor1);
430 source2.storage.advance_to_live_key(&mut self.key_cursor2);
431 effort = (self.result.times.len() - starting_updates) as isize;
433 }
434
435 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
438 self.copy_key(&source1.storage, self.key_cursor1);
439 self.key_cursor1 += 1;
440 source1.storage.advance_to_live_key(&mut self.key_cursor1);
441 effort = (self.result.times.len() - starting_updates) as isize;
442 }
443 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
444 self.copy_key(&source2.storage, self.key_cursor2);
445 self.key_cursor2 += 1;
446 source2.storage.advance_to_live_key(&mut self.key_cursor2);
447 effort = (self.result.times.len() - starting_updates) as isize;
448 }
449
450 *fuel -= effort;
451 }
452 }
453
454 impl<L: Layout> RhhValMerger<L>
456 where
457 <L::Target as Update>::Key: Default + HashOrdered,
458 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
459 {
460 fn copy_key(&mut self, source: &RhhValStorage<L>, cursor: usize) {
468 let init_vals = self.result.vals.len();
470 let (mut lower, upper) = source.values_for_key(cursor);
471 while lower < upper {
472 self.stash_updates_for_val(source, lower);
473 if let Some(off) = self.consolidate_updates() {
474 self.result.vals_offs.push(off);
475 self.result.vals.push(source.vals.index(lower));
476 }
477 lower += 1;
478 }
479
480 if self.result.vals.len() > init_vals {
482 self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len()));
483 }
484 }
485 fn merge_key(&mut self, source1: &RhhValStorage<L>, source2: &RhhValStorage<L>) {
490
491 use ::std::cmp::Ordering;
492 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
493 Ordering::Less => {
494 self.copy_key(source1, self.key_cursor1);
495 self.key_cursor1 += 1;
496 },
497 Ordering::Equal => {
498 let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
500 let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
501 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
502 self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off));
503 }
504 self.key_cursor1 += 1;
506 self.key_cursor2 += 1;
507 },
508 Ordering::Greater => {
509 self.copy_key(source2, self.key_cursor2);
510 self.key_cursor2 += 1;
511 },
512 }
513 }
514 fn merge_vals(
519 &mut self,
520 (source1, mut lower1, upper1): (&RhhValStorage<L>, usize, usize),
521 (source2, mut lower2, upper2): (&RhhValStorage<L>, usize, usize),
522 ) -> Option<usize> {
523 let init_vals = self.result.vals.len();
525 while lower1 < upper1 && lower2 < upper2 {
526 use ::std::cmp::Ordering;
530 match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
531 Ordering::Less => {
532 self.stash_updates_for_val(source1, lower1);
534 if let Some(off) = self.consolidate_updates() {
535 self.result.vals_offs.push(off);
536 self.result.vals.push(source1.vals.index(lower1));
537 }
538 lower1 += 1;
539 },
540 Ordering::Equal => {
541 self.stash_updates_for_val(source1, lower1);
542 self.stash_updates_for_val(source2, lower2);
543 if let Some(off) = self.consolidate_updates() {
544 self.result.vals_offs.push(off);
545 self.result.vals.push(source1.vals.index(lower1));
546 }
547 lower1 += 1;
548 lower2 += 1;
549 },
550 Ordering::Greater => {
551 self.stash_updates_for_val(source2, lower2);
553 if let Some(off) = self.consolidate_updates() {
554 self.result.vals_offs.push(off);
555 self.result.vals.push(source2.vals.index(lower2));
556 }
557 lower2 += 1;
558 },
559 }
560 }
561 while lower1 < upper1 {
563 self.stash_updates_for_val(source1, lower1);
564 if let Some(off) = self.consolidate_updates() {
565 self.result.vals_offs.push(off);
566 self.result.vals.push(source1.vals.index(lower1));
567 }
568 lower1 += 1;
569 }
570 while lower2 < upper2 {
571 self.stash_updates_for_val(source2, lower2);
572 if let Some(off) = self.consolidate_updates() {
573 self.result.vals_offs.push(off);
574 self.result.vals.push(source2.vals.index(lower2));
575 }
576 lower2 += 1;
577 }
578
579 if self.result.vals.len() > init_vals {
581 Some(self.result.vals.len())
582 } else {
583 None
584 }
585 }
586
587 fn stash_updates_for_val(&mut self, source: &RhhValStorage<L>, index: usize) {
589 let (lower, upper) = source.updates_for_value(index);
590 for i in lower .. upper {
591 let time = source.times.index(i);
593 let diff = source.diffs.index(i);
594 let mut new_time = time.into_owned();
595 use crate::lattice::Lattice;
596 new_time.advance_by(self.description.since().borrow());
597 self.update_stash.push((new_time, diff.into_owned()));
598 }
599 }
600
601 fn consolidate_updates(&mut self) -> Option<usize> {
603 use crate::consolidation;
604 consolidation::consolidate(&mut self.update_stash);
605 if !self.update_stash.is_empty() {
606 let time_diff = self.result.times.last().zip(self.result.diffs.last());
609 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
610 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
611 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
612 t1.eq(&t2) && d1.eq(&d2)
613 });
614 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
615 self.update_stash.clear();
617 self.singletons += 1;
618 }
619 else {
620 for (time, diff) in self.update_stash.drain(..) {
622 self.result.times.push(time);
623 self.result.diffs.push(diff);
624 }
625 }
626 Some(self.result.times.len())
627 } else {
628 None
629 }
630 }
631 }
632
633
634 pub struct RhhValCursor<L: Layout>
642 where
643 <L::Target as Update>::Key: Default + HashOrdered,
644 {
645 key_cursor: usize,
647 val_cursor: usize,
649 phantom: PhantomData<L>,
651 }
652
653 impl<L: Layout> Cursor for RhhValCursor<L>
654 where
655 <L::Target as Update>::Key: Default + HashOrdered,
656 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
657 {
658 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
659 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
660 type Time = <L::Target as Update>::Time;
661 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
662 type Diff = <L::Target as Update>::Diff;
663 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
664
665 type Storage = RhhValBatch<L>;
666
667 fn get_key<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
668 fn get_val<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Val<'a>> { if self.val_valid(storage) { storage.storage.vals.get(self.val_cursor) } else { None } }
669 fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
670 fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
671 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
672 let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
673 for index in lower .. upper {
674 let time = storage.storage.times.index(index);
675 let diff = storage.storage.diffs.index(index);
676 logic(time, diff);
677 }
678 }
679 fn key_valid(&self, storage: &RhhValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
680 fn val_valid(&self, storage: &RhhValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
681 fn step_key(&mut self, storage: &RhhValBatch<L>){
682 self.key_cursor += 1;
684 storage.storage.advance_to_live_key(&mut self.key_cursor);
685
686 if self.key_valid(storage) {
687 self.rewind_vals(storage);
688 }
689 else {
690 self.key_cursor = storage.storage.keys.len();
691 }
692 }
693 fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
694 let desired = storage.storage.desired_location(&key);
696 if self.key_cursor < desired {
698 self.key_cursor = desired;
699 }
700 while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) {
704 self.key_cursor += 1;
707 }
708
709 if self.key_valid(storage) {
710 self.rewind_vals(storage);
711 }
712 }
713 fn step_val(&mut self, storage: &RhhValBatch<L>) {
714 self.val_cursor += 1;
715 if !self.val_valid(storage) {
716 self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
717 }
718 }
719 fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
720 self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
721 }
722 fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
723 self.key_cursor = 0;
724 storage.storage.advance_to_live_key(&mut self.key_cursor);
725
726 if self.key_valid(storage) {
727 self.rewind_vals(storage)
728 }
729 }
730 fn rewind_vals(&mut self, storage: &RhhValBatch<L>) {
731 self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
732 }
733 }
734
735 pub struct RhhValBuilder<L: Layout, CI>
737 where
738 <L::Target as Update>::Key: Default + HashOrdered,
739 {
740 result: RhhValStorage<L>,
741 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
742 singletons: usize,
747 _marker: PhantomData<CI>,
748 }
749
750 impl<L: Layout, CI> RhhValBuilder<L, CI>
751 where
752 <L::Target as Update>::Key: Default + HashOrdered,
753 {
754 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
766 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
768 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
769 if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
770 assert!(self.singleton.is_none());
771 self.singleton = Some((time, diff));
772 }
773 else {
774 if let Some((time, diff)) = self.singleton.take() {
776 self.result.times.push(time);
777 self.result.diffs.push(diff);
778 }
779 self.result.times.push(time);
780 self.result.diffs.push(diff);
781 }
782 }
783 }
784
785 impl<L: Layout, CI> Builder for RhhValBuilder<L, CI>
786 where
787 <L::Target as Update>::Key: Default + HashOrdered,
788 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Key<'a> = <L::Target as Update>::Key, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
789 for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
790 for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered + IntoOwned<'a, Owned = <L::Target as Update>::Key>,
791 {
792 type Input = CI;
793 type Time = <L::Target as Update>::Time;
794 type Output = RhhValBatch<L>;
795
796 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
797
798 let rhh_capacity = 2 * keys;
800 let divisor = RhhValStorage::<L>::divisor_for_capacity(rhh_capacity);
801 let keys = rhh_capacity + 10;
805
806 Self {
808 result: RhhValStorage {
809 keys: L::KeyContainer::with_capacity(keys),
810 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
811 vals: L::ValContainer::with_capacity(vals),
812 vals_offs: L::OffsetContainer::with_capacity(vals + 1),
813 times: L::TimeContainer::with_capacity(upds),
814 diffs: L::DiffContainer::with_capacity(upds),
815 key_count: 0,
816 key_capacity: rhh_capacity,
817 divisor,
818 },
819 singleton: None,
820 singletons: 0,
821 _marker: PhantomData,
822 }
823 }
824
825 #[inline]
826 fn push(&mut self, chunk: &mut Self::Input) {
827 for item in chunk.drain() {
828 let (key, val, time, diff) = CI::into_parts(item);
829 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
831 if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
833 self.push_update(time, diff);
834 } else {
835 self.result.vals_offs.push(self.result.times.len());
837 if self.singleton.take().is_some() { self.singletons += 1; }
838 self.push_update(time, diff);
839 self.result.vals.push(val);
840 }
841 } else {
842 self.result.vals_offs.push(self.result.times.len());
844 if self.singleton.take().is_some() { self.singletons += 1; }
845 self.result.keys_offs.push(self.result.vals.len());
846 self.push_update(time, diff);
847 self.result.vals.push(val);
848 self.result.insert_key(IntoOwned::borrow_as(&key), None);
850 }
851 }
852 }
853
854 #[inline(never)]
855 fn done(mut self, description: Description<Self::Time>) -> RhhValBatch<L> {
856 self.result.vals_offs.push(self.result.times.len());
858 if self.singleton.take().is_some() { self.singletons += 1; }
860 self.result.keys_offs.push(self.result.vals.len());
861 RhhValBatch {
862 updates: self.result.times.len() + self.singletons,
863 storage: self.result,
864 description,
865 }
866 }
867
868 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
869 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
870 let mut builder = Self::with_capacity(keys, vals, upds);
871 for mut chunk in chain.drain(..) {
872 builder.push(&mut chunk);
873 }
874
875 builder.done(description)
876 }
877 }
878
879}
880
881mod key_batch {
882
883 }