1use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Layout, Vector, TStack};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
36pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
43pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
45pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
47
48pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
53pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
55pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
57
58pub use layers::{Vals, Upds};
62pub mod layers {
71
72 use serde::{Deserialize, Serialize};
73 use crate::trace::implementations::BatchContainer;
74
75 #[derive(Debug, Serialize, Deserialize)]
77 pub struct Vals<O, V> {
78 pub offs: O,
82 pub vals: V,
84 }
85
86 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
87 fn default() -> Self { Self::with_capacity(0, 0) }
88 }
89
90 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
91 pub fn bounds(&self, index: usize) -> (usize, usize) {
93 (self.offs.index(index), self.offs.index(index+1))
94 }
95 pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
102 self.get_abs(self.bounds(list_idx).0 + item_idx)
103 }
104
105 pub fn len(&self) -> usize { self.offs.len() - 1 }
107 pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
109 self.vals.index(index)
110 }
111 pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
113 let mut offs = <O as BatchContainer>::with_capacity(o_size);
114 offs.push_ref(0);
115 Self {
116 offs,
117 vals: <V as BatchContainer>::with_capacity(v_size),
118 }
119 }
120 pub fn merge_capacity(this: &Self, that: &Self) -> Self {
122 let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
123 offs.push_ref(0);
124 Self {
125 offs,
126 vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
127 }
128 }
129 }
130
131 #[derive(Debug, Serialize, Deserialize)]
136 pub struct Upds<O, T, D> {
137 pub offs: O,
139 pub times: T,
141 pub diffs: D,
143 }
144
145 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {
146 fn default() -> Self { Self::with_capacity(0, 0) }
147 }
148 impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Upds<O, T, D> {
149 pub fn bounds(&self, index: usize) -> (usize, usize) {
151 let mut lower = self.offs.index(index);
152 let upper = self.offs.index(index+1);
153 if lower == upper {
156 assert!(lower > 0);
157 lower -= 1;
158 }
159 (lower, upper)
160 }
161 pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
168 self.get_abs(self.bounds(list_idx).0 + item_idx)
169 }
170
171 pub fn len(&self) -> usize { self.offs.len() - 1 }
173 pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
175 (self.times.index(index), self.diffs.index(index))
176 }
177 pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
179 let mut offs = <O as BatchContainer>::with_capacity(o_size);
180 offs.push_ref(0);
181 Self {
182 offs,
183 times: <T as BatchContainer>::with_capacity(u_size),
184 diffs: <D as BatchContainer>::with_capacity(u_size),
185 }
186 }
187 pub fn merge_capacity(this: &Self, that: &Self) -> Self {
189 let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
190 offs.push_ref(0);
191 Self {
192 offs,
193 times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
194 diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
195 }
196 }
197 }
198
199 pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
201 stash: Vec<(T::Owned, D::Owned)>,
206 total: usize,
210
211 time_con: T,
213 diff_con: D,
215 }
216
217 impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
218 fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
219 }
220
221
222 impl<T, D> UpdsBuilder<T, D>
223 where
224 T: BatchContainer<Owned: Ord>,
225 D: BatchContainer<Owned: crate::difference::Semigroup>,
226 {
227 pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
229 self.stash.push((time, diff));
230 }
231
232 pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
236 use crate::consolidation;
237 consolidation::consolidate(&mut self.stash);
238 if self.stash.is_empty() { return false; }
240 if self.stash.len() == 1 {
242 let (time, diff) = self.stash.last().unwrap();
243 self.time_con.clear(); self.time_con.push_own(time);
244 self.diff_con.clear(); self.diff_con.push_own(diff);
245 if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
246 self.total += 1;
247 self.stash.clear();
248 upds.offs.push_ref(upds.times.len());
249 return true;
250 }
251 }
252 self.total += self.stash.len();
254 for (time, diff) in self.stash.drain(..) {
255 upds.times.push_own(&time);
256 upds.diffs.push_own(&diff);
257 }
258 upds.offs.push_ref(upds.times.len());
259 true
260 }
261
262 pub fn total(&self) -> usize { self.total }
264 }
265}
266
267pub mod val_batch {
269
270 use std::marker::PhantomData;
271 use serde::{Deserialize, Serialize};
272 use timely::container::PushInto;
273 use timely::progress::{Antichain, frontier::AntichainRef};
274
275 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
276 use crate::trace::implementations::{BatchContainer, BuilderInput};
277 use crate::trace::implementations::layout;
278
279 use super::{Layout, Vals, Upds, layers::UpdsBuilder};
280
281 #[derive(Debug, Serialize, Deserialize)]
283 #[serde(bound = "
284 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
285 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
286 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
287 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
288 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
289 ")]
290 pub struct OrdValStorage<L: Layout> {
291 pub keys: L::KeyContainer,
293 pub vals: Vals<L::OffsetContainer, L::ValContainer>,
295 pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
297 }
298
299 #[derive(Serialize, Deserialize)]
304 #[serde(bound = "
305 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
306 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
307 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
308 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
309 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
310 ")]
311 pub struct OrdValBatch<L: Layout> {
312 pub storage: OrdValStorage<L>,
314 pub description: Description<layout::Time<L>>,
316 pub updates: usize,
322 }
323
324 impl<L: Layout> WithLayout for OrdValBatch<L> {
325 type Layout = L;
326 }
327
328 impl<L: Layout> BatchReader for OrdValBatch<L> {
329
330 type Cursor = OrdValCursor<L>;
331 fn cursor(&self) -> Self::Cursor {
332 OrdValCursor {
333 key_cursor: 0,
334 val_cursor: 0,
335 phantom: PhantomData,
336 }
337 }
338 fn len(&self) -> usize {
339 self.updates
342 }
343 fn description(&self) -> &Description<layout::Time<L>> { &self.description }
344 }
345
346 impl<L: Layout> Batch for OrdValBatch<L> {
347 type Merger = OrdValMerger<L>;
348
349 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
350 OrdValMerger::new(self, other, compaction_frontier)
351 }
352
353 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
354 use timely::progress::Timestamp;
355 Self {
356 storage: OrdValStorage {
357 keys: L::KeyContainer::with_capacity(0),
358 vals: Default::default(),
359 upds: Default::default(),
360 },
361 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
362 updates: 0,
363 }
364 }
365 }
366
367 pub struct OrdValMerger<L: Layout> {
369 key_cursor1: usize,
371 key_cursor2: usize,
373 result: OrdValStorage<L>,
375 description: Description<layout::Time<L>>,
377 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
379 }
380
381 impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
382 where
383 OrdValBatch<L>: Batch<Time=layout::Time<L>>,
384 {
385 fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
386
387 assert!(batch1.upper() == batch2.lower());
388 use crate::lattice::Lattice;
389 let mut since = batch1.description().since().join(batch2.description().since());
390 since = since.join(&compaction_frontier.to_owned());
391
392 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
393
394 let batch1 = &batch1.storage;
395 let batch2 = &batch2.storage;
396
397 OrdValMerger {
398 key_cursor1: 0,
399 key_cursor2: 0,
400 result: OrdValStorage {
401 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
402 vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
403 upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
404 },
405 description,
406 staging: UpdsBuilder::default(),
407 }
408 }
409 fn done(self) -> OrdValBatch<L> {
410 OrdValBatch {
411 updates: self.staging.total(),
412 storage: self.result,
413 description: self.description,
414 }
415 }
416 fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
417
418 let starting_updates = self.staging.total();
420 let mut effort = 0isize;
421
422 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
424 self.merge_key(&source1.storage, &source2.storage);
425 effort = (self.staging.total() - starting_updates) as isize;
427 }
428
429 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
432 self.copy_key(&source1.storage, self.key_cursor1);
433 self.key_cursor1 += 1;
434 effort = (self.staging.total() - starting_updates) as isize;
435 }
436 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
437 self.copy_key(&source2.storage, self.key_cursor2);
438 self.key_cursor2 += 1;
439 effort = (self.staging.total() - starting_updates) as isize;
440 }
441
442 *fuel -= effort;
443 }
444 }
445
446 impl<L: Layout> OrdValMerger<L> {
448 fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
456 let init_vals = self.result.vals.vals.len();
458 let (mut lower, upper) = source.vals.bounds(cursor);
459 while lower < upper {
460 self.stash_updates_for_val(source, lower);
461 if self.staging.seal(&mut self.result.upds) {
462 self.result.vals.vals.push_ref(source.vals.get_abs(lower));
463 }
464 lower += 1;
465 }
466
467 if self.result.vals.vals.len() > init_vals {
469 self.result.keys.push_ref(source.keys.index(cursor));
470 self.result.vals.offs.push_ref(self.result.vals.vals.len());
471 }
472 }
473 fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
478 use ::std::cmp::Ordering;
479 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
480 Ordering::Less => {
481 self.copy_key(source1, self.key_cursor1);
482 self.key_cursor1 += 1;
483 },
484 Ordering::Equal => {
485 let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
487 let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
488 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
489 self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
490 self.result.vals.offs.push_ref(off);
491 }
492 self.key_cursor1 += 1;
494 self.key_cursor2 += 1;
495 },
496 Ordering::Greater => {
497 self.copy_key(source2, self.key_cursor2);
498 self.key_cursor2 += 1;
499 },
500 }
501 }
502 fn merge_vals(
507 &mut self,
508 (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
509 (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
510 ) -> Option<usize> {
511 let init_vals = self.result.vals.vals.len();
513 while lower1 < upper1 && lower2 < upper2 {
514 use ::std::cmp::Ordering;
518 match source1.vals.get_abs(lower1).cmp(&source2.vals.get_abs(lower2)) {
519 Ordering::Less => {
520 self.stash_updates_for_val(source1, lower1);
522 if self.staging.seal(&mut self.result.upds) {
523 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
524 }
525 lower1 += 1;
526 },
527 Ordering::Equal => {
528 self.stash_updates_for_val(source1, lower1);
529 self.stash_updates_for_val(source2, lower2);
530 if self.staging.seal(&mut self.result.upds) {
531 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
532 }
533 lower1 += 1;
534 lower2 += 1;
535 },
536 Ordering::Greater => {
537 self.stash_updates_for_val(source2, lower2);
539 if self.staging.seal(&mut self.result.upds) {
540 self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
541 }
542 lower2 += 1;
543 },
544 }
545 }
546 while lower1 < upper1 {
548 self.stash_updates_for_val(source1, lower1);
549 if self.staging.seal(&mut self.result.upds) {
550 self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
551 }
552 lower1 += 1;
553 }
554 while lower2 < upper2 {
555 self.stash_updates_for_val(source2, lower2);
556 if self.staging.seal(&mut self.result.upds) {
557 self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
558 }
559 lower2 += 1;
560 }
561
562 if self.result.vals.vals.len() > init_vals {
564 Some(self.result.vals.vals.len())
565 } else {
566 None
567 }
568 }
569
570 fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
572 let (lower, upper) = source.upds.bounds(index);
573 for i in lower .. upper {
574 let (time, diff) = source.upds.get_abs(i);
576 use crate::lattice::Lattice;
577 let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
578 new_time.advance_by(self.description.since().borrow());
579 self.staging.push(new_time, L::DiffContainer::into_owned(diff));
580 }
581 }
582 }
583
584 pub struct OrdValCursor<L: Layout> {
586 key_cursor: usize,
588 val_cursor: usize,
590 phantom: PhantomData<L>,
592 }
593
594 use crate::trace::implementations::WithLayout;
595 impl<L: Layout> WithLayout for OrdValCursor<L> {
596 type Layout = L;
597 }
598
599 impl<L: Layout> Cursor for OrdValCursor<L> {
600
601 type Storage = OrdValBatch<L>;
602
603 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
604 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
605
606 fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
607 fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.get_abs(self.val_cursor) }
608 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
609 let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
610 for index in lower .. upper {
611 let (time, diff) = storage.storage.upds.get_abs(index);
612 logic(time, diff);
613 }
614 }
615 fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
616 fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1 }
617 fn step_key(&mut self, storage: &OrdValBatch<L>){
618 self.key_cursor += 1;
619 if self.key_valid(storage) {
620 self.rewind_vals(storage);
621 }
622 else {
623 self.key_cursor = storage.storage.keys.len();
624 }
625 }
626 fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
627 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
628 if self.key_valid(storage) {
629 self.rewind_vals(storage);
630 }
631 }
632 fn step_val(&mut self, storage: &OrdValBatch<L>) {
633 self.val_cursor += 1;
634 if !self.val_valid(storage) {
635 self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
636 }
637 }
638 fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
639 self.val_cursor += storage.storage.vals.vals.advance(self.val_cursor, storage.storage.vals.bounds(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
640 }
641 fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
642 self.key_cursor = 0;
643 if self.key_valid(storage) {
644 self.rewind_vals(storage)
645 }
646 }
647 fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
648 self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
649 }
650 }
651
652 pub struct OrdValBuilder<L: Layout, CI> {
654 pub result: OrdValStorage<L>,
658 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
659 _marker: PhantomData<CI>,
660 }
661
662 impl<L, CI> Builder for OrdValBuilder<L, CI>
663 where
664 L: for<'a> Layout<
665 KeyContainer: PushInto<CI::Key<'a>>,
666 ValContainer: PushInto<CI::Val<'a>>,
667 >,
668 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
669 {
670
671 type Input = CI;
672 type Time = layout::Time<L>;
673 type Output = OrdValBatch<L>;
674
675 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
676 Self {
677 result: OrdValStorage {
678 keys: L::KeyContainer::with_capacity(keys),
679 vals: Vals::with_capacity(keys + 1, vals),
680 upds: Upds::with_capacity(vals + 1, upds),
681 },
682 staging: UpdsBuilder::default(),
683 _marker: PhantomData,
684 }
685 }
686
687 #[inline]
688 fn push(&mut self, chunk: &mut Self::Input) {
689 for item in chunk.drain() {
690 let (key, val, time, diff) = CI::into_parts(item);
691
692 if self.result.keys.is_empty() {
694 self.result.vals.vals.push_into(val);
695 self.result.keys.push_into(key);
696 self.staging.push(time, diff);
697 }
698 else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
700 if self.result.vals.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
702 self.staging.push(time, diff);
703 } else {
704 self.staging.seal(&mut self.result.upds);
706 self.staging.push(time, diff);
707 self.result.vals.vals.push_into(val);
708 }
709 } else {
710 self.staging.seal(&mut self.result.upds);
712 self.staging.push(time, diff);
713 self.result.vals.offs.push_ref(self.result.vals.vals.len());
714 self.result.vals.vals.push_into(val);
715 self.result.keys.push_into(key);
716 }
717 }
718 }
719
720 #[inline(never)]
721 fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
722 self.staging.seal(&mut self.result.upds);
723 self.result.vals.offs.push_ref(self.result.vals.vals.len());
724 OrdValBatch {
725 updates: self.staging.total(),
726 storage: self.result,
727 description,
728 }
729 }
730
731 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
732 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
733 let mut builder = Self::with_capacity(keys, vals, upds);
734 for mut chunk in chain.drain(..) {
735 builder.push(&mut chunk);
736 }
737
738 builder.done(description)
739 }
740 }
741}
742
743pub mod key_batch {
745
746 use std::marker::PhantomData;
747 use serde::{Deserialize, Serialize};
748 use timely::container::PushInto;
749 use timely::progress::{Antichain, frontier::AntichainRef};
750
751 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
752 use crate::trace::implementations::{BatchContainer, BuilderInput};
753 use crate::trace::implementations::layout;
754
755 use super::{Layout, Upds, layers::UpdsBuilder};
756
757 #[derive(Debug, Serialize, Deserialize)]
759 #[serde(bound = "
760 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
761 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
762 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
763 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
764 ")]
765 pub struct OrdKeyStorage<L: Layout> {
766 pub keys: L::KeyContainer,
768 pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
770 }
771
772 #[derive(Serialize, Deserialize)]
777 #[serde(bound = "
778 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
779 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
780 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
781 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
782 ")]
783 pub struct OrdKeyBatch<L: Layout> {
784 pub storage: OrdKeyStorage<L>,
786 pub description: Description<layout::Time<L>>,
788 pub updates: usize,
794 }
795
796 impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> WithLayout for OrdKeyBatch<L> {
797 type Layout = L;
798 }
799
800 impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> BatchReader for OrdKeyBatch<L> {
801
802 type Cursor = OrdKeyCursor<L>;
803 fn cursor(&self) -> Self::Cursor {
804 OrdKeyCursor {
805 key_cursor: 0,
806 val_stepped: false,
807 phantom: std::marker::PhantomData,
808 }
809 }
810 fn len(&self) -> usize {
811 self.updates
814 }
815 fn description(&self) -> &Description<layout::Time<L>> { &self.description }
816 }
817
818 impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> Batch for OrdKeyBatch<L> {
819 type Merger = OrdKeyMerger<L>;
820
821 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
822 OrdKeyMerger::new(self, other, compaction_frontier)
823 }
824
825 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
826 use timely::progress::Timestamp;
827 Self {
828 storage: OrdKeyStorage {
829 keys: L::KeyContainer::with_capacity(0),
830 upds: Upds::default(),
831 },
832 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
833 updates: 0,
834 }
835 }
836 }
837
838 pub struct OrdKeyMerger<L: Layout> {
840 key_cursor1: usize,
842 key_cursor2: usize,
844 result: OrdKeyStorage<L>,
846 description: Description<layout::Time<L>>,
848
849 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
851 }
852
853 impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
854 where
855 OrdKeyBatch<L>: Batch<Time=layout::Time<L>>,
856 {
857 fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
858
859 assert!(batch1.upper() == batch2.lower());
860 use crate::lattice::Lattice;
861 let mut since = batch1.description().since().join(batch2.description().since());
862 since = since.join(&compaction_frontier.to_owned());
863
864 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
865
866 let batch1 = &batch1.storage;
867 let batch2 = &batch2.storage;
868
869 OrdKeyMerger {
870 key_cursor1: 0,
871 key_cursor2: 0,
872 result: OrdKeyStorage {
873 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
874 upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
875 },
876 description,
877 staging: UpdsBuilder::default(),
878 }
879 }
880 fn done(self) -> OrdKeyBatch<L> {
881 OrdKeyBatch {
882 updates: self.staging.total(),
883 storage: self.result,
884 description: self.description,
885 }
886 }
887 fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
888
889 let starting_updates = self.staging.total();
891 let mut effort = 0isize;
892
893 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
895 self.merge_key(&source1.storage, &source2.storage);
896 effort = (self.staging.total() - starting_updates) as isize;
898 }
899
900 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
903 self.copy_key(&source1.storage, self.key_cursor1);
904 self.key_cursor1 += 1;
905 effort = (self.staging.total() - starting_updates) as isize;
906 }
907 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
908 self.copy_key(&source2.storage, self.key_cursor2);
909 self.key_cursor2 += 1;
910 effort = (self.staging.total() - starting_updates) as isize;
911 }
912
913 *fuel -= effort;
914 }
915 }
916
917 impl<L: Layout> OrdKeyMerger<L> {
919 fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
927 self.stash_updates_for_key(source, cursor);
928 if self.staging.seal(&mut self.result.upds) {
929 self.result.keys.push_ref(source.keys.index(cursor));
930 }
931 }
932 fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
937 use ::std::cmp::Ordering;
938 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
939 Ordering::Less => {
940 self.copy_key(source1, self.key_cursor1);
941 self.key_cursor1 += 1;
942 },
943 Ordering::Equal => {
944 self.stash_updates_for_key(source1, self.key_cursor1);
946 self.stash_updates_for_key(source2, self.key_cursor2);
947 if self.staging.seal(&mut self.result.upds) {
948 self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
949 }
950 self.key_cursor1 += 1;
952 self.key_cursor2 += 1;
953 },
954 Ordering::Greater => {
955 self.copy_key(source2, self.key_cursor2);
956 self.key_cursor2 += 1;
957 },
958 }
959 }
960
961 fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
963 let (lower, upper) = source.upds.bounds(index);
964 for i in lower .. upper {
965 let (time, diff) = source.upds.get_abs(i);
967 use crate::lattice::Lattice;
968 let mut new_time = L::TimeContainer::into_owned(time);
969 new_time.advance_by(self.description.since().borrow());
970 self.staging.push(new_time, L::DiffContainer::into_owned(diff));
971 }
972 }
973 }
974
975 pub struct OrdKeyCursor<L: Layout> {
977 key_cursor: usize,
979 val_stepped: bool,
981 phantom: PhantomData<L>,
983 }
984
985 use crate::trace::implementations::WithLayout;
986 impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> WithLayout for OrdKeyCursor<L> {
987 type Layout = L;
988 }
989
990 impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> Cursor for OrdKeyCursor<L> {
991
992 type Storage = OrdKeyBatch<L>;
993
994 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
995 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a ()> { if self.val_valid(storage) { Some(&()) } else { None } }
996
997 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
998 fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
999 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1000 let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
1001 for index in lower .. upper {
1002 let (time, diff) = storage.storage.upds.get_abs(index);
1003 logic(time, diff);
1004 }
1005 }
1006 fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1007 fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1008 fn step_key(&mut self, storage: &Self::Storage){
1009 self.key_cursor += 1;
1010 if self.key_valid(storage) {
1011 self.rewind_vals(storage);
1012 }
1013 else {
1014 self.key_cursor = storage.storage.keys.len();
1015 }
1016 }
1017 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1018 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1019 if self.key_valid(storage) {
1020 self.rewind_vals(storage);
1021 }
1022 }
1023 fn step_val(&mut self, _storage: &Self::Storage) {
1024 self.val_stepped = true;
1025 }
1026 fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1027 fn rewind_keys(&mut self, storage: &Self::Storage) {
1028 self.key_cursor = 0;
1029 if self.key_valid(storage) {
1030 self.rewind_vals(storage)
1031 }
1032 }
1033 fn rewind_vals(&mut self, _storage: &Self::Storage) {
1034 self.val_stepped = false;
1035 }
1036 }
1037
1038 pub struct OrdKeyBuilder<L: Layout, CI> {
1040 pub result: OrdKeyStorage<L>,
1044 staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
1045 _marker: PhantomData<CI>,
1046 }
1047
1048 impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1049 where
1050 L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1051 CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
1052 {
1053
1054 type Input = CI;
1055 type Time = layout::Time<L>;
1056 type Output = OrdKeyBatch<L>;
1057
1058 fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1059 Self {
1060 result: OrdKeyStorage {
1061 keys: L::KeyContainer::with_capacity(keys),
1062 upds: Upds::with_capacity(keys+1, upds),
1063 },
1064 staging: UpdsBuilder::default(),
1065 _marker: PhantomData,
1066 }
1067 }
1068
1069 #[inline]
1070 fn push(&mut self, chunk: &mut Self::Input) {
1071 for item in chunk.drain() {
1072 let (key, _val, time, diff) = CI::into_parts(item);
1073 if self.result.keys.is_empty() {
1074 self.result.keys.push_into(key);
1075 self.staging.push(time, diff);
1076 }
1077 else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1079 self.staging.push(time, diff);
1080 } else {
1081 self.staging.seal(&mut self.result.upds);
1082 self.staging.push(time, diff);
1083 self.result.keys.push_into(key);
1084 }
1085 }
1086 }
1087
1088 #[inline(never)]
1089 fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1090 self.staging.seal(&mut self.result.upds);
1091 OrdKeyBatch {
1092 updates: self.staging.total(),
1093 storage: self.result,
1094 description,
1095 }
1096 }
1097
1098 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1099 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1100 let mut builder = Self::with_capacity(keys, vals, upds);
1101 for mut chunk in chain.drain(..) {
1102 builder.push(&mut chunk);
1103 }
1104
1105 builder.done(description)
1106 }
1107 }
1108
1109}