differential_dataflow/operators/
mod.rs1pub use self::reduce::{Reduce, Threshold, Count};
8pub use self::iterate::Iterate;
9pub use self::join::{Join, JoinCore};
10pub use self::count::CountTotal;
11pub use self::threshold::ThresholdTotal;
12
13pub mod arrange;
14pub mod reduce;
15pub mod consolidate;
16pub mod iterate;
17pub mod join;
18pub mod count;
19pub mod threshold;
20
21use crate::lattice::Lattice;
22use crate::trace::Cursor;
23
24struct EditList<'a, C: Cursor> {
26 values: Vec<(C::Val<'a>, usize)>,
27 edits: Vec<(C::Time, C::Diff)>,
28}
29
30impl<'a, C: Cursor> EditList<'a, C> {
31 #[inline]
33 fn new() -> Self {
34 EditList {
35 values: Vec::new(),
36 edits: Vec::new(),
37 }
38 }
39 fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
41 where
42 L: Fn(C::TimeGat<'_>)->C::Time,
43 {
44 self.clear();
45 while let Some(val) = cursor.get_val(storage) {
46 cursor.map_times(storage, |time1, diff1| self.push(logic(time1), C::owned_diff(diff1)));
47 self.seal(val);
48 cursor.step_val(storage);
49 }
50 }
51 #[inline]
53 fn clear(&mut self) {
54 self.values.clear();
55 self.edits.clear();
56 }
57 fn len(&self) -> usize { self.edits.len() }
58 #[inline]
60 fn push(&mut self, time: C::Time, diff: C::Diff) {
61 self.edits.push((time, diff));
63 }
64 #[inline]
66 fn seal(&mut self, value: C::Val<'a>) {
67 let prev = self.values.last().map(|x| x.1).unwrap_or(0);
68 crate::consolidation::consolidate_from(&mut self.edits, prev);
69 if self.edits.len() > prev {
70 self.values.push((value, self.edits.len()));
71 }
72 }
73 fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
74 for index in 0 .. self.values.len() {
75 let lower = if index == 0 { 0 } else { self.values[index-1].1 };
76 let upper = self.values[index].1;
77 for edit in lower .. upper {
78 logic(self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
79 }
80 }
81 }
82}
83
84struct ValueHistory<'storage, C: Cursor> {
85 edits: EditList<'storage, C>,
86 history: Vec<(C::Time, C::Time, usize, usize)>, buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>, }
89
90impl<'storage, C: Cursor> ValueHistory<'storage, C> {
91 fn new() -> Self {
92 ValueHistory {
93 edits: EditList::new(),
94 history: Vec::new(),
95 buffer: Vec::new(),
96 }
97 }
98 fn clear(&mut self) {
99 self.edits.clear();
100 self.history.clear();
101 self.buffer.clear();
102 }
103 fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
104 where
105 L: Fn(C::TimeGat<'_>)->C::Time,
106 {
107 self.edits.load(cursor, storage, logic);
108 }
109
110 fn replay_key<'history, L>(
114 &'history mut self,
115 cursor: &mut C,
116 storage: &'storage C::Storage,
117 key: C::Key<'storage>,
118 logic: L
119 ) -> HistoryReplay<'storage, 'history, C>
120 where
121 L: Fn(C::TimeGat<'_>)->C::Time,
122 {
123 self.clear();
124 cursor.seek_key(storage, key);
125 if cursor.get_key(storage) == Some(key) {
126 self.load(cursor, storage, logic);
127 }
128 self.replay()
129 }
130
131 fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
133
134 self.buffer.clear();
135 self.history.clear();
136 for value_index in 0 .. self.edits.values.len() {
137 let lower = if value_index > 0 { self.edits.values[value_index-1].1 } else { 0 };
138 let upper = self.edits.values[value_index].1;
139 for edit_index in lower .. upper {
140 let time = self.edits.edits[edit_index].0.clone();
141 self.history.push((time.clone(), time, value_index, edit_index));
142 }
143 }
144
145 self.history.sort_by(|x,y| y.cmp(x));
146 for index in 1 .. self.history.len() {
147 self.history[index].1 = self.history[index].1.meet(&self.history[index-1].1);
148 }
149
150 HistoryReplay {
151 replay: self
152 }
153 }
154}
155
156struct HistoryReplay<'storage, 'history, C: Cursor> {
157 replay: &'history mut ValueHistory<'storage, C>
158}
159
160impl<'storage, 'history, C: Cursor> HistoryReplay<'storage, 'history, C> {
161 fn time(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.0) }
162 fn meet(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.1) }
163 fn edit(&self) -> Option<(C::Val<'storage>, &C::Time, &C::Diff)> {
164 self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
165 }
166
167 fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
168 &self.replay.buffer[..]
169 }
170
171 fn step(&mut self) {
172 let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
173 self.replay.buffer.push(((self.replay.edits.values[value_index].0, time), self.replay.edits.edits[edit_offset].1.clone()));
174 }
175 fn step_while_time_is(&mut self, time: &C::Time) -> bool {
176 let mut found = false;
177 while self.time() == Some(time) {
178 found = true;
179 self.step();
180 }
181 found
182 }
183 fn advance_buffer_by(&mut self, meet: &C::Time) {
184 for element in self.replay.buffer.iter_mut() {
185 (element.0).1 = (element.0).1.join(meet);
186 }
187 crate::consolidation::consolidate(&mut self.replay.buffer);
188 }
189 fn is_done(&self) -> bool { self.replay.history.is_empty() }
190}