1use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Update, Layout, Vector, TStack, Preferred};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
36pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
43pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
45pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
47
48pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
53pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
55pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
57
58pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
60pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>>;
62pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;
64
65pub mod val_batch {
71
72 use std::marker::PhantomData;
73 use serde::{Deserialize, Serialize};
74 use timely::container::PushInto;
75 use timely::progress::{Antichain, frontier::AntichainRef};
76
77 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
78 use crate::trace::implementations::{BatchContainer, BuilderInput};
79 use crate::IntoOwned;
80
81 use super::{Layout, Update};
82
83 #[derive(Debug, Serialize, Deserialize)]
85 pub struct OrdValStorage<L: Layout> {
86 pub keys: L::KeyContainer,
88 pub keys_offs: L::OffsetContainer,
92 pub vals: L::ValContainer,
94 pub vals_offs: L::OffsetContainer,
103 pub times: L::TimeContainer,
105 pub diffs: L::DiffContainer,
107 }
108
109 impl<L: Layout> OrdValStorage<L> {
110 fn values_for_key(&self, index: usize) -> (usize, usize) {
112 (self.keys_offs.index(index), self.keys_offs.index(index+1))
113 }
114 fn updates_for_value(&self, index: usize) -> (usize, usize) {
116 let mut lower = self.vals_offs.index(index);
117 let upper = self.vals_offs.index(index+1);
118 if lower == upper {
121 assert!(lower > 0);
122 lower -= 1;
123 }
124 (lower, upper)
125 }
126 }
127
128 #[derive(Serialize, Deserialize)]
133 #[serde(bound = "
134 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
135 L::ValContainer: Serialize + for<'a> Deserialize<'a>,
136 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
137 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
138 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
139 ")]
140 pub struct OrdValBatch<L: Layout> {
141 pub storage: OrdValStorage<L>,
143 pub description: Description<<L::Target as Update>::Time>,
145 pub updates: usize,
151 }
152
153 impl<L: Layout> BatchReader for OrdValBatch<L> {
154 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
155 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
156 type Time = <L::Target as Update>::Time;
157 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
158 type Diff = <L::Target as Update>::Diff;
159 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
160
161 type Cursor = OrdValCursor<L>;
162 fn cursor(&self) -> Self::Cursor {
163 OrdValCursor {
164 key_cursor: 0,
165 val_cursor: 0,
166 phantom: PhantomData,
167 }
168 }
169 fn len(&self) -> usize {
170 self.updates
173 }
174 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
175 }
176
177 impl<L: Layout> Batch for OrdValBatch<L> {
178 type Merger = OrdValMerger<L>;
179
180 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
181 OrdValMerger::new(self, other, compaction_frontier)
182 }
183
184 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
185 use timely::progress::Timestamp;
186 Self {
187 storage: OrdValStorage {
188 keys: L::KeyContainer::with_capacity(0),
189 keys_offs: L::OffsetContainer::with_capacity(0),
190 vals: L::ValContainer::with_capacity(0),
191 vals_offs: L::OffsetContainer::with_capacity(0),
192 times: L::TimeContainer::with_capacity(0),
193 diffs: L::DiffContainer::with_capacity(0),
194 },
195 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
196 updates: 0,
197 }
198 }
199 }
200
201 pub struct OrdValMerger<L: Layout> {
203 key_cursor1: usize,
205 key_cursor2: usize,
207 result: OrdValStorage<L>,
209 description: Description<<L::Target as Update>::Time>,
211
212 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
217 singletons: usize,
219 }
220
221 impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
222 where
223 OrdValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
224 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
225 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
226 {
227 fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
228
229 assert!(batch1.upper() == batch2.lower());
230 use crate::lattice::Lattice;
231 let mut since = batch1.description().since().join(batch2.description().since());
232 since = since.join(&compaction_frontier.to_owned());
233
234 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
235
236 let batch1 = &batch1.storage;
237 let batch2 = &batch2.storage;
238
239 let mut storage = OrdValStorage {
240 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
241 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
242 vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
243 vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
244 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
245 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
246 };
247
248 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
250 keys_offs.push(0);
251 let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
252 vals_offs.push(0);
253
254 OrdValMerger {
255 key_cursor1: 0,
256 key_cursor2: 0,
257 result: storage,
258 description,
259 update_stash: Vec::new(),
260 singletons: 0,
261 }
262 }
263 fn done(self) -> OrdValBatch<L> {
264 OrdValBatch {
265 updates: self.result.times.len() + self.singletons,
266 storage: self.result,
267 description: self.description,
268 }
269 }
270 fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
271
272 let starting_updates = self.result.times.len();
274 let mut effort = 0isize;
275
276 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
278 self.merge_key(&source1.storage, &source2.storage);
279 effort = (self.result.times.len() - starting_updates) as isize;
281 }
282
283 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
286 self.copy_key(&source1.storage, self.key_cursor1);
287 self.key_cursor1 += 1;
288 effort = (self.result.times.len() - starting_updates) as isize;
289 }
290 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
291 self.copy_key(&source2.storage, self.key_cursor2);
292 self.key_cursor2 += 1;
293 effort = (self.result.times.len() - starting_updates) as isize;
294 }
295
296 *fuel -= effort;
297 }
298 }
299
300 impl<L: Layout> OrdValMerger<L> {
302 fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
310 let init_vals = self.result.vals.len();
312 let (mut lower, upper) = source.values_for_key(cursor);
313 while lower < upper {
314 self.stash_updates_for_val(source, lower);
315 if let Some(off) = self.consolidate_updates() {
316 self.result.vals_offs.push(off);
317 self.result.vals.push(source.vals.index(lower));
318 }
319 lower += 1;
320 }
321
322 if self.result.vals.len() > init_vals {
324 self.result.keys.push(source.keys.index(cursor));
325 self.result.keys_offs.push(self.result.vals.len());
326 }
327 }
328 fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
333 use ::std::cmp::Ordering;
334 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
335 Ordering::Less => {
336 self.copy_key(source1, self.key_cursor1);
337 self.key_cursor1 += 1;
338 },
339 Ordering::Equal => {
340 let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
342 let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
343 if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
344 self.result.keys.push(source1.keys.index(self.key_cursor1));
345 self.result.keys_offs.push(off);
346 }
347 self.key_cursor1 += 1;
349 self.key_cursor2 += 1;
350 },
351 Ordering::Greater => {
352 self.copy_key(source2, self.key_cursor2);
353 self.key_cursor2 += 1;
354 },
355 }
356 }
357 fn merge_vals(
362 &mut self,
363 (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
364 (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
365 ) -> Option<usize> {
366 let init_vals = self.result.vals.len();
368 while lower1 < upper1 && lower2 < upper2 {
369 use ::std::cmp::Ordering;
373 match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
374 Ordering::Less => {
375 self.stash_updates_for_val(source1, lower1);
377 if let Some(off) = self.consolidate_updates() {
378 self.result.vals_offs.push(off);
379 self.result.vals.push(source1.vals.index(lower1));
380 }
381 lower1 += 1;
382 },
383 Ordering::Equal => {
384 self.stash_updates_for_val(source1, lower1);
385 self.stash_updates_for_val(source2, lower2);
386 if let Some(off) = self.consolidate_updates() {
387 self.result.vals_offs.push(off);
388 self.result.vals.push(source1.vals.index(lower1));
389 }
390 lower1 += 1;
391 lower2 += 1;
392 },
393 Ordering::Greater => {
394 self.stash_updates_for_val(source2, lower2);
396 if let Some(off) = self.consolidate_updates() {
397 self.result.vals_offs.push(off);
398 self.result.vals.push(source2.vals.index(lower2));
399 }
400 lower2 += 1;
401 },
402 }
403 }
404 while lower1 < upper1 {
406 self.stash_updates_for_val(source1, lower1);
407 if let Some(off) = self.consolidate_updates() {
408 self.result.vals_offs.push(off);
409 self.result.vals.push(source1.vals.index(lower1));
410 }
411 lower1 += 1;
412 }
413 while lower2 < upper2 {
414 self.stash_updates_for_val(source2, lower2);
415 if let Some(off) = self.consolidate_updates() {
416 self.result.vals_offs.push(off);
417 self.result.vals.push(source2.vals.index(lower2));
418 }
419 lower2 += 1;
420 }
421
422 if self.result.vals.len() > init_vals {
424 Some(self.result.vals.len())
425 } else {
426 None
427 }
428 }
429
430 fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
432 let (lower, upper) = source.updates_for_value(index);
433 for i in lower .. upper {
434 let time = source.times.index(i);
436 let diff = source.diffs.index(i);
437 use crate::lattice::Lattice;
438 let mut new_time: <L::Target as Update>::Time = time.into_owned();
439 new_time.advance_by(self.description.since().borrow());
440 self.update_stash.push((new_time, diff.into_owned()));
441 }
442 }
443
444 fn consolidate_updates(&mut self) -> Option<usize> {
446 use crate::consolidation;
447 consolidation::consolidate(&mut self.update_stash);
448 if !self.update_stash.is_empty() {
449 let time_diff = self.result.times.last().zip(self.result.diffs.last());
452 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
453 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
454 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
455 t1.eq(&t2) && d1.eq(&d2)
456 });
457 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
458 self.update_stash.clear();
460 self.singletons += 1;
461 }
462 else {
463 for (time, diff) in self.update_stash.drain(..) {
465 self.result.times.push(time);
466 self.result.diffs.push(diff);
467 }
468 }
469 Some(self.result.times.len())
470 } else {
471 None
472 }
473 }
474 }
475
476 pub struct OrdValCursor<L: Layout> {
478 key_cursor: usize,
480 val_cursor: usize,
482 phantom: PhantomData<L>,
484 }
485
486 impl<L: Layout> Cursor for OrdValCursor<L> {
487
488 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
489 type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
490 type Time = <L::Target as Update>::Time;
491 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
492 type Diff = <L::Target as Update>::Diff;
493 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
494
495 type Storage = OrdValBatch<L>;
496
497 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
498 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
499
500 fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
501 fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
502 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
503 let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
504 for index in lower .. upper {
505 let time = storage.storage.times.index(index);
506 let diff = storage.storage.diffs.index(index);
507 logic(time, diff);
508 }
509 }
510 fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
511 fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
512 fn step_key(&mut self, storage: &OrdValBatch<L>){
513 self.key_cursor += 1;
514 if self.key_valid(storage) {
515 self.rewind_vals(storage);
516 }
517 else {
518 self.key_cursor = storage.storage.keys.len();
519 }
520 }
521 fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
522 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
523 if self.key_valid(storage) {
524 self.rewind_vals(storage);
525 }
526 }
527 fn step_val(&mut self, storage: &OrdValBatch<L>) {
528 self.val_cursor += 1;
529 if !self.val_valid(storage) {
530 self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
531 }
532 }
533 fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
534 self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
535 }
536 fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
537 self.key_cursor = 0;
538 if self.key_valid(storage) {
539 self.rewind_vals(storage)
540 }
541 }
542 fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
543 self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
544 }
545 }
546
547 pub struct OrdValBuilder<L: Layout, CI> {
549 pub result: OrdValStorage<L>,
553 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
554 singletons: usize,
559 _marker: PhantomData<CI>,
560 }
561
562 impl<L: Layout, CI> OrdValBuilder<L, CI> {
563 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
575 if self.result.times.last().map(|t| t == <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) &&
577 self.result.diffs.last().map(|d| d == <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true)
578 {
579 assert!(self.singleton.is_none());
580 self.singleton = Some((time, diff));
581 }
582 else {
583 if let Some((time, diff)) = self.singleton.take() {
585 self.result.times.push(time);
586 self.result.diffs.push(diff);
587 }
588 self.result.times.push(time);
589 self.result.diffs.push(diff);
590 }
591 }
592 }
593
594 impl<L, CI> Builder for OrdValBuilder<L, CI>
595 where
596 L: for<'a> Layout<
597 KeyContainer: PushInto<CI::Key<'a>>,
598 ValContainer: PushInto<CI::Val<'a>>,
599 >,
600 CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
601 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
602 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
603 {
604
605 type Input = CI;
606 type Time = <L::Target as Update>::Time;
607 type Output = OrdValBatch<L>;
608
609 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
610 Self {
612 result: OrdValStorage {
613 keys: L::KeyContainer::with_capacity(keys),
614 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
615 vals: L::ValContainer::with_capacity(vals),
616 vals_offs: L::OffsetContainer::with_capacity(vals + 1),
617 times: L::TimeContainer::with_capacity(upds),
618 diffs: L::DiffContainer::with_capacity(upds),
619 },
620 singleton: None,
621 singletons: 0,
622 _marker: PhantomData,
623 }
624 }
625
626 #[inline]
627 fn push(&mut self, chunk: &mut Self::Input) {
628 for item in chunk.drain() {
629 let (key, val, time, diff) = CI::into_parts(item);
630 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
632 if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
634 self.push_update(time, diff);
635 } else {
636 self.result.vals_offs.push(self.result.times.len());
638 if self.singleton.take().is_some() { self.singletons += 1; }
639 self.push_update(time, diff);
640 self.result.vals.push(val);
641 }
642 } else {
643 self.result.vals_offs.push(self.result.times.len());
645 if self.singleton.take().is_some() { self.singletons += 1; }
646 self.result.keys_offs.push(self.result.vals.len());
647 self.push_update(time, diff);
648 self.result.vals.push(val);
649 self.result.keys.push(key);
650 }
651 }
652 }
653
654 #[inline(never)]
655 fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
656 self.result.vals_offs.push(self.result.times.len());
658 if self.singleton.take().is_some() { self.singletons += 1; }
660 self.result.keys_offs.push(self.result.vals.len());
661 OrdValBatch {
662 updates: self.result.times.len() + self.singletons,
663 storage: self.result,
664 description,
665 }
666 }
667
668 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
669 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
670 let mut builder = Self::with_capacity(keys, vals, upds);
671 for mut chunk in chain.drain(..) {
672 builder.push(&mut chunk);
673 }
674
675 builder.done(description)
676 }
677 }
678}
679
680mod key_batch {
681
682 use std::marker::PhantomData;
683 use serde::{Deserialize, Serialize};
684 use timely::container::PushInto;
685 use timely::progress::{Antichain, frontier::AntichainRef};
686
687 use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
688 use crate::trace::implementations::{BatchContainer, BuilderInput};
689 use crate::IntoOwned;
690
691 use super::{Layout, Update};
692
693 #[derive(Debug, Serialize, Deserialize)]
695 pub struct OrdKeyStorage<L: Layout> {
696 pub keys: L::KeyContainer,
698 pub keys_offs: L::OffsetContainer,
707 pub times: L::TimeContainer,
709 pub diffs: L::DiffContainer,
711 }
712
713 impl<L: Layout> OrdKeyStorage<L> {
714 fn updates_for_key(&self, index: usize) -> (usize, usize) {
716 let mut lower = self.keys_offs.index(index);
717 let upper = self.keys_offs.index(index+1);
718 if lower == upper {
721 assert!(lower > 0);
722 lower -= 1;
723 }
724 (lower, upper)
725 }
726 }
727
728 #[derive(Serialize, Deserialize)]
733 #[serde(bound = "
734 L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
735 L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
736 L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
737 L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
738 ")]
739 pub struct OrdKeyBatch<L: Layout> {
740 pub storage: OrdKeyStorage<L>,
742 pub description: Description<<L::Target as Update>::Time>,
744 pub updates: usize,
750 }
751
752 impl<L: Layout> BatchReader for OrdKeyBatch<L> {
753
754 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
755 type Val<'a> = &'a ();
756 type Time = <L::Target as Update>::Time;
757 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
758 type Diff = <L::Target as Update>::Diff;
759 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
760
761 type Cursor = OrdKeyCursor<L>;
762 fn cursor(&self) -> Self::Cursor {
763 OrdKeyCursor {
764 key_cursor: 0,
765 val_stepped: false,
766 phantom: std::marker::PhantomData,
767 }
768 }
769 fn len(&self) -> usize {
770 self.updates
773 }
774 fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
775 }
776
777 impl<L: Layout> Batch for OrdKeyBatch<L> {
778 type Merger = OrdKeyMerger<L>;
779
780 fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
781 OrdKeyMerger::new(self, other, compaction_frontier)
782 }
783
784 fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
785 use timely::progress::Timestamp;
786 Self {
787 storage: OrdKeyStorage {
788 keys: L::KeyContainer::with_capacity(0),
789 keys_offs: L::OffsetContainer::with_capacity(0),
790 times: L::TimeContainer::with_capacity(0),
791 diffs: L::DiffContainer::with_capacity(0),
792 },
793 description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
794 updates: 0,
795 }
796 }
797 }
798
799 pub struct OrdKeyMerger<L: Layout> {
801 key_cursor1: usize,
803 key_cursor2: usize,
805 result: OrdKeyStorage<L>,
807 description: Description<<L::Target as Update>::Time>,
809
810 update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
815 singletons: usize,
817 }
818
819 impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
820 where
821 OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>,
822 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
823 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
824 {
825 fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
826
827 assert!(batch1.upper() == batch2.lower());
828 use crate::lattice::Lattice;
829 let mut since = batch1.description().since().join(batch2.description().since());
830 since = since.join(&compaction_frontier.to_owned());
831
832 let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
833
834 let batch1 = &batch1.storage;
835 let batch2 = &batch2.storage;
836
837 let mut storage = OrdKeyStorage {
838 keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
839 keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
840 times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
841 diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
842 };
843
844 let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
845 keys_offs.push(0);
846
847 OrdKeyMerger {
848 key_cursor1: 0,
849 key_cursor2: 0,
850 result: storage,
851 description,
852 update_stash: Vec::new(),
853 singletons: 0,
854 }
855 }
856 fn done(self) -> OrdKeyBatch<L> {
857 OrdKeyBatch {
858 updates: self.result.times.len() + self.singletons,
859 storage: self.result,
860 description: self.description,
861 }
862 }
863 fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
864
865 let starting_updates = self.result.times.len();
867 let mut effort = 0isize;
868
869 while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
871 self.merge_key(&source1.storage, &source2.storage);
872 effort = (self.result.times.len() - starting_updates) as isize;
874 }
875
876 while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
879 self.copy_key(&source1.storage, self.key_cursor1);
880 self.key_cursor1 += 1;
881 effort = (self.result.times.len() - starting_updates) as isize;
882 }
883 while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
884 self.copy_key(&source2.storage, self.key_cursor2);
885 self.key_cursor2 += 1;
886 effort = (self.result.times.len() - starting_updates) as isize;
887 }
888
889 *fuel -= effort;
890 }
891 }
892
893 impl<L: Layout> OrdKeyMerger<L> {
895 fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
903 self.stash_updates_for_key(source, cursor);
904 if let Some(off) = self.consolidate_updates() {
905 self.result.keys_offs.push(off);
906 self.result.keys.push(source.keys.index(cursor));
907 }
908 }
909 fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
914 use ::std::cmp::Ordering;
915 match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
916 Ordering::Less => {
917 self.copy_key(source1, self.key_cursor1);
918 self.key_cursor1 += 1;
919 },
920 Ordering::Equal => {
921 self.stash_updates_for_key(source1, self.key_cursor1);
923 self.stash_updates_for_key(source2, self.key_cursor2);
924 if let Some(off) = self.consolidate_updates() {
925 self.result.keys_offs.push(off);
926 self.result.keys.push(source1.keys.index(self.key_cursor1));
927 }
928 self.key_cursor1 += 1;
930 self.key_cursor2 += 1;
931 },
932 Ordering::Greater => {
933 self.copy_key(source2, self.key_cursor2);
934 self.key_cursor2 += 1;
935 },
936 }
937 }
938
939 fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
941 let (lower, upper) = source.updates_for_key(index);
942 for i in lower .. upper {
943 let time = source.times.index(i);
945 let diff = source.diffs.index(i);
946 use crate::lattice::Lattice;
947 let mut new_time = time.into_owned();
948 new_time.advance_by(self.description.since().borrow());
949 self.update_stash.push((new_time, diff.into_owned()));
950 }
951 }
952
953 fn consolidate_updates(&mut self) -> Option<usize> {
955 use crate::consolidation;
956 consolidation::consolidate(&mut self.update_stash);
957 if !self.update_stash.is_empty() {
958 let time_diff = self.result.times.last().zip(self.result.diffs.last());
961 let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
962 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
963 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
964 t1.eq(&t2) && d1.eq(&d2)
965 });
966 if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
967 self.update_stash.clear();
969 self.singletons += 1;
970 }
971 else {
972 for (time, diff) in self.update_stash.drain(..) {
974 self.result.times.push(time);
975 self.result.diffs.push(diff);
976 }
977 }
978 Some(self.result.times.len())
979 } else {
980 None
981 }
982 }
983 }
984
985 pub struct OrdKeyCursor<L: Layout> {
987 key_cursor: usize,
989 val_stepped: bool,
991 phantom: PhantomData<L>,
993 }
994
995 impl<L: Layout> Cursor for OrdKeyCursor<L> {
996 type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
997 type Val<'a> = &'a ();
998 type Time = <L::Target as Update>::Time;
999 type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
1000 type Diff = <L::Target as Update>::Diff;
1001 type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
1002
1003 type Storage = OrdKeyBatch<L>;
1004
1005 fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
1006 fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a ()> { if self.val_valid(storage) { Some(&()) } else { None } }
1007
1008 fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
1009 fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
1010 fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1011 let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
1012 for index in lower .. upper {
1013 let time = storage.storage.times.index(index);
1014 let diff = storage.storage.diffs.index(index);
1015 logic(time, diff);
1016 }
1017 }
1018 fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1019 fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1020 fn step_key(&mut self, storage: &Self::Storage){
1021 self.key_cursor += 1;
1022 if self.key_valid(storage) {
1023 self.rewind_vals(storage);
1024 }
1025 else {
1026 self.key_cursor = storage.storage.keys.len();
1027 }
1028 }
1029 fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1030 self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1031 if self.key_valid(storage) {
1032 self.rewind_vals(storage);
1033 }
1034 }
1035 fn step_val(&mut self, _storage: &Self::Storage) {
1036 self.val_stepped = true;
1037 }
1038 fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1039 fn rewind_keys(&mut self, storage: &Self::Storage) {
1040 self.key_cursor = 0;
1041 if self.key_valid(storage) {
1042 self.rewind_vals(storage)
1043 }
1044 }
1045 fn rewind_vals(&mut self, _storage: &Self::Storage) {
1046 self.val_stepped = false;
1047 }
1048 }
1049
1050 pub struct OrdKeyBuilder<L: Layout, CI> {
1052 pub result: OrdKeyStorage<L>,
1056 singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
1057 singletons: usize,
1062 _marker: PhantomData<CI>,
1063 }
1064
1065 impl<L: Layout, CI> OrdKeyBuilder<L, CI> {
1066 fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
1078 let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
1080 let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
1081 if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
1082 assert!(self.singleton.is_none());
1083 self.singleton = Some((time, diff));
1084 }
1085 else {
1086 if let Some((time, diff)) = self.singleton.take() {
1088 self.result.times.push(time);
1089 self.result.diffs.push(diff);
1090 }
1091 self.result.times.push(time);
1092 self.result.diffs.push(diff);
1093 }
1094 }
1095 }
1096
1097 impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1098 where
1099 L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1100 for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
1101 for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
1102 CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
1103 {
1104
1105 type Input = CI;
1106 type Time = <L::Target as Update>::Time;
1107 type Output = OrdKeyBatch<L>;
1108
1109 fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1110 Self {
1112 result: OrdKeyStorage {
1113 keys: L::KeyContainer::with_capacity(keys),
1114 keys_offs: L::OffsetContainer::with_capacity(keys + 1),
1115 times: L::TimeContainer::with_capacity(upds),
1116 diffs: L::DiffContainer::with_capacity(upds),
1117 },
1118 singleton: None,
1119 singletons: 0,
1120 _marker: PhantomData,
1121 }
1122 }
1123
1124 #[inline]
1125 fn push(&mut self, chunk: &mut Self::Input) {
1126 for item in chunk.drain() {
1127 let (key, _val, time, diff) = CI::into_parts(item);
1128 if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1130 self.push_update(time, diff);
1131 } else {
1132 self.result.keys_offs.push(self.result.times.len());
1134 if self.singleton.take().is_some() { self.singletons += 1; }
1136 self.push_update(time, diff);
1137 self.result.keys.push(key);
1138 }
1139 }
1140 }
1141
1142 #[inline(never)]
1143 fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1144 self.result.keys_offs.push(self.result.times.len());
1146 if self.singleton.take().is_some() { self.singletons += 1; }
1148 OrdKeyBatch {
1149 updates: self.result.times.len() + self.singletons,
1150 storage: self.result,
1151 description,
1152 }
1153 }
1154
1155 fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1156 let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1157 let mut builder = Self::with_capacity(keys, vals, upds);
1158 for mut chunk in chain.drain(..) {
1159 builder.push(&mut chunk);
1160 }
1161
1162 builder.done(description)
1163 }
1164 }
1165
1166}