differential_dataflow/trace/implementations/
chainless_batcher.rs1use timely::progress::frontier::AntichainRef;
4use timely::progress::{frontier::Antichain, Timestamp};
5
6use crate::logging::Logger;
7use crate::trace;
8
9pub trait BatcherStorage<T: Timestamp> : Default + Sized {
11 fn len(&self) -> usize;
13 fn merge(self, other: Self) -> Self;
17 fn split(&mut self, frontier: AntichainRef<T>) -> Self;
19 fn lower(&self, frontier: &mut Antichain<T>);
23}
24
25pub struct Batcher<T: Timestamp, S: BatcherStorage<T>> {
27 storages: Vec<S>,
29 lower: Antichain<T>,
31 prior: Antichain<T>,
33
34 _logger: Option<Logger>,
36 _operator_id: usize,
38}
39
40impl<T: Timestamp, S: BatcherStorage<T>> Batcher<T, S> {
41 fn tidy(&mut self) {
43 self.storages.retain(|x| x.len() > 0);
44 self.storages.sort_by_key(|x| x.len());
45 self.storages.reverse();
46 while let Some(pos) = (1..self.storages.len()).position(|i| self.storages[i-1].len() < 2 * self.storages[i].len()) {
47 while self.storages.len() > pos + 1 {
48 let x = self.storages.pop().unwrap();
49 let y = self.storages.pop().unwrap();
50 self.storages.push(x.merge(y));
51 self.storages.sort_by_key(|x| x.len());
52 self.storages.reverse();
53 }
54 }
55 }
56}
57
58impl<T: Timestamp, S: BatcherStorage<T>> trace::Batcher for Batcher<T, S> {
59 type Time = T;
60 type Input = S;
61 type Output = S;
62
63 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
64 Self {
65 storages: Vec::default(),
66 lower: Default::default(),
67 prior: Antichain::from_elem(T::minimum()),
68 _logger: logger,
69 _operator_id: operator_id,
70 }
71 }
72
73 fn push_container(&mut self, batch: &mut Self::Input) {
74 if batch.len() > 0 {
75 batch.lower(&mut self.lower);
76 self.storages.push(std::mem::take(batch));
77 self.tidy();
78 }
79 }
80
81 fn seal<B: trace::Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output {
82 let description = trace::Description::new(self.prior.clone(), upper.clone(), Antichain::new());
83 self.prior = upper.clone();
84 let mut stores = self.storages.iter_mut().rev();
85 if let Some(store) = stores.next() {
86 self.lower.clear();
87 let mut ship = store.split(upper.borrow());
88 store.lower(&mut self.lower);
89 for store in stores {
90 let split = store.split(upper.borrow());
91 ship = ship.merge(split);
92 store.lower(&mut self.lower);
93 }
94 self.tidy();
95 B::seal(&mut vec![ship], description)
96 }
97 else {
98 B::seal(&mut vec![], description)
99 }
100 }
101
102 fn frontier(&mut self) -> AntichainRef<'_, Self::Time> { self.lower.borrow() }
103}