differential_dataflow/operators/
mod.rs1pub use self::negate::Negate;
8pub use self::reduce::{Reduce, Threshold, Count};
9pub use self::iterate::{Iterate, ResultsIn};
10pub use self::join::{Join, JoinCore};
11pub use self::count::CountTotal;
12pub use self::threshold::ThresholdTotal;
13
14pub mod arrange;
15pub mod negate;
16pub mod reduce;
17pub mod consolidate;
18pub mod iterate;
19pub mod join;
20pub mod count;
21pub mod threshold;
22
23use crate::lattice::Lattice;
24use crate::trace::Cursor;
25
26struct EditList<'a, C: Cursor> {
28 values: Vec<(C::Val<'a>, usize)>,
29 edits: Vec<(C::Time, C::Diff)>,
30}
31
32impl<'a, C: Cursor> EditList<'a, C> {
33 #[inline]
35 fn new() -> Self {
36 EditList {
37 values: Vec::new(),
38 edits: Vec::new(),
39 }
40 }
41 fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
43 where
44 L: Fn(C::TimeGat<'_>)->C::Time,
45 {
46 self.clear();
47 while let Some(val) = cursor.get_val(storage) {
48 cursor.map_times(storage, |time1, diff1| self.push(logic(time1), C::owned_diff(diff1)));
49 self.seal(val);
50 cursor.step_val(storage);
51 }
52 }
53 #[inline]
55 fn clear(&mut self) {
56 self.values.clear();
57 self.edits.clear();
58 }
59 fn len(&self) -> usize { self.edits.len() }
60 #[inline]
62 fn push(&mut self, time: C::Time, diff: C::Diff) {
63 self.edits.push((time, diff));
65 }
66 #[inline]
68 fn seal(&mut self, value: C::Val<'a>) {
69 let prev = self.values.last().map(|x| x.1).unwrap_or(0);
70 crate::consolidation::consolidate_from(&mut self.edits, prev);
71 if self.edits.len() > prev {
72 self.values.push((value, self.edits.len()));
73 }
74 }
75 fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
76 for index in 0 .. self.values.len() {
77 let lower = if index == 0 { 0 } else { self.values[index-1].1 };
78 let upper = self.values[index].1;
79 for edit in lower .. upper {
80 logic(self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
81 }
82 }
83 }
84}
85
86struct ValueHistory<'storage, C: Cursor> {
87 edits: EditList<'storage, C>,
88 history: Vec<(C::Time, C::Time, usize, usize)>, buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>, }
91
92impl<'storage, C: Cursor> ValueHistory<'storage, C> {
93 fn new() -> Self {
94 ValueHistory {
95 edits: EditList::new(),
96 history: Vec::new(),
97 buffer: Vec::new(),
98 }
99 }
100 fn clear(&mut self) {
101 self.edits.clear();
102 self.history.clear();
103 self.buffer.clear();
104 }
105 fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
106 where
107 L: Fn(C::TimeGat<'_>)->C::Time,
108 {
109 self.edits.load(cursor, storage, logic);
110 }
111
112 fn replay_key<'history, L>(
116 &'history mut self,
117 cursor: &mut C,
118 storage: &'storage C::Storage,
119 key: C::Key<'storage>,
120 logic: L
121 ) -> HistoryReplay<'storage, 'history, C>
122 where
123 L: Fn(C::TimeGat<'_>)->C::Time,
124 {
125 self.clear();
126 cursor.seek_key(storage, key);
127 if cursor.get_key(storage) == Some(key) {
128 self.load(cursor, storage, logic);
129 }
130 self.replay()
131 }
132
133 fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
135
136 self.buffer.clear();
137 self.history.clear();
138 for value_index in 0 .. self.edits.values.len() {
139 let lower = if value_index > 0 { self.edits.values[value_index-1].1 } else { 0 };
140 let upper = self.edits.values[value_index].1;
141 for edit_index in lower .. upper {
142 let time = self.edits.edits[edit_index].0.clone();
143 self.history.push((time.clone(), time, value_index, edit_index));
144 }
145 }
146
147 self.history.sort_by(|x,y| y.cmp(x));
148 for index in 1 .. self.history.len() {
149 self.history[index].1 = self.history[index].1.meet(&self.history[index-1].1);
150 }
151
152 HistoryReplay {
153 replay: self
154 }
155 }
156}
157
158struct HistoryReplay<'storage, 'history, C: Cursor> {
159 replay: &'history mut ValueHistory<'storage, C>
160}
161
162impl<'storage, 'history, C: Cursor> HistoryReplay<'storage, 'history, C> {
163 fn time(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.0) }
164 fn meet(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.1) }
165 fn edit(&self) -> Option<(C::Val<'storage>, &C::Time, &C::Diff)> {
166 self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
167 }
168
169 fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
170 &self.replay.buffer[..]
171 }
172
173 fn step(&mut self) {
174 let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
175 self.replay.buffer.push(((self.replay.edits.values[value_index].0, time), self.replay.edits.edits[edit_offset].1.clone()));
176 }
177 fn step_while_time_is(&mut self, time: &C::Time) -> bool {
178 let mut found = false;
179 while self.time() == Some(time) {
180 found = true;
181 self.step();
182 }
183 found
184 }
185 fn advance_buffer_by(&mut self, meet: &C::Time) {
186 for element in self.replay.buffer.iter_mut() {
187 (element.0).1 = (element.0).1.join(meet);
188 }
189 crate::consolidation::consolidate(&mut self.replay.buffer);
190 }
191 fn is_done(&self) -> bool { self.replay.history.is_empty() }
192}