differential_dataflow/operators/
mod.rs
1pub use self::negate::Negate;
8pub use self::reduce::{Reduce, Threshold, Count};
9pub use self::iterate::{Iterate, ResultsIn};
10pub use self::join::{Join, JoinCore};
11pub use self::count::CountTotal;
12pub use self::threshold::ThresholdTotal;
13
14pub mod arrange;
15pub mod negate;
16pub mod reduce;
17pub mod consolidate;
18pub mod iterate;
19pub mod join;
20pub mod count;
21pub mod threshold;
22
23use crate::lattice::Lattice;
24use crate::trace::Cursor;
25use crate::IntoOwned;
26
27struct EditList<'a, C: Cursor> {
29 values: Vec<(C::Val<'a>, usize)>,
30 edits: Vec<(C::Time, C::Diff)>,
31}
32
33impl<'a, C: Cursor> EditList<'a, C> {
34 #[inline]
36 fn new() -> Self {
37 EditList {
38 values: Vec::new(),
39 edits: Vec::new(),
40 }
41 }
42 fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
44 where
45 L: Fn(C::TimeGat<'_>)->C::Time,
46 {
47 self.clear();
48 while cursor.val_valid(storage) {
49 cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.into_owned()));
50 self.seal(cursor.val(storage));
51 cursor.step_val(storage);
52 }
53 }
54 #[inline]
56 fn clear(&mut self) {
57 self.values.clear();
58 self.edits.clear();
59 }
60 fn len(&self) -> usize { self.edits.len() }
61 #[inline]
63 fn push(&mut self, time: C::Time, diff: C::Diff) {
64 self.edits.push((time, diff));
66 }
67 #[inline]
69 fn seal(&mut self, value: C::Val<'a>) {
70 let prev = self.values.last().map(|x| x.1).unwrap_or(0);
71 crate::consolidation::consolidate_from(&mut self.edits, prev);
72 if self.edits.len() > prev {
73 self.values.push((value, self.edits.len()));
74 }
75 }
76 fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
77 for index in 0 .. self.values.len() {
78 let lower = if index == 0 { 0 } else { self.values[index-1].1 };
79 let upper = self.values[index].1;
80 for edit in lower .. upper {
81 logic(self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
82 }
83 }
84 }
85}
86
87struct ValueHistory<'storage, C: Cursor> {
88 edits: EditList<'storage, C>,
89 history: Vec<(C::Time, C::Time, usize, usize)>, buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>, }
92
93impl<'storage, C: Cursor> ValueHistory<'storage, C> {
94 fn new() -> Self {
95 ValueHistory {
96 edits: EditList::new(),
97 history: Vec::new(),
98 buffer: Vec::new(),
99 }
100 }
101 fn clear(&mut self) {
102 self.edits.clear();
103 self.history.clear();
104 self.buffer.clear();
105 }
106 fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
107 where
108 L: Fn(C::TimeGat<'_>)->C::Time,
109 {
110 self.edits.load(cursor, storage, logic);
111 }
112
113 fn replay_key<'history, L>(
117 &'history mut self,
118 cursor: &mut C,
119 storage: &'storage C::Storage,
120 key: C::Key<'storage>,
121 logic: L
122 ) -> HistoryReplay<'storage, 'history, C>
123 where
124 L: Fn(C::TimeGat<'_>)->C::Time,
125 {
126 self.clear();
127 cursor.seek_key(storage, key);
128 if cursor.get_key(storage) == Some(key) {
129 self.load(cursor, storage, logic);
130 }
131 self.replay()
132 }
133
134 fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
136
137 self.buffer.clear();
138 self.history.clear();
139 for value_index in 0 .. self.edits.values.len() {
140 let lower = if value_index > 0 { self.edits.values[value_index-1].1 } else { 0 };
141 let upper = self.edits.values[value_index].1;
142 for edit_index in lower .. upper {
143 let time = self.edits.edits[edit_index].0.clone();
144 self.history.push((time.clone(), time, value_index, edit_index));
145 }
146 }
147
148 self.history.sort_by(|x,y| y.cmp(x));
149 for index in 1 .. self.history.len() {
150 self.history[index].1 = self.history[index].1.meet(&self.history[index-1].1);
151 }
152
153 HistoryReplay {
154 replay: self
155 }
156 }
157}
158
159struct HistoryReplay<'storage, 'history, C: Cursor> {
160 replay: &'history mut ValueHistory<'storage, C>
161}
162
163impl<'storage, 'history, C: Cursor> HistoryReplay<'storage, 'history, C> {
164 fn time(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.0) }
165 fn meet(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.1) }
166 fn edit(&self) -> Option<(C::Val<'storage>, &C::Time, &C::Diff)> {
167 self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
168 }
169
170 fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
171 &self.replay.buffer[..]
172 }
173
174 fn step(&mut self) {
175 let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
176 self.replay.buffer.push(((self.replay.edits.values[value_index].0, time), self.replay.edits.edits[edit_offset].1.clone()));
177 }
178 fn step_while_time_is(&mut self, time: &C::Time) -> bool {
179 let mut found = false;
180 while self.time() == Some(time) {
181 found = true;
182 self.step();
183 }
184 found
185 }
186 fn advance_buffer_by(&mut self, meet: &C::Time) {
187 for element in self.replay.buffer.iter_mut() {
188 (element.0).1 = (element.0).1.join(meet);
189 }
190 crate::consolidation::consolidate(&mut self.replay.buffer);
191 }
192 fn is_done(&self) -> bool { self.replay.history.is_empty() }
193}