differential_dataflow/operators/
mod.rs

1//! Specialize differential dataflow operators.
2//!
3//! Differential dataflow introduces a small number of specialized operators on collections. These
4//! operators have specialized implementations to make them work efficiently, and are in addition
5//! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`).
6
7pub use self::negate::Negate;
8pub use self::reduce::{Reduce, Threshold, Count};
9pub use self::iterate::{Iterate, ResultsIn};
10pub use self::join::{Join, JoinCore};
11pub use self::count::CountTotal;
12pub use self::threshold::ThresholdTotal;
13
14pub mod arrange;
15pub mod negate;
16pub mod reduce;
17pub mod consolidate;
18pub mod iterate;
19pub mod join;
20pub mod count;
21pub mod threshold;
22
23use crate::lattice::Lattice;
24use crate::trace::Cursor;
25use crate::IntoOwned;
26
27/// An accumulation of (value, time, diff) updates.
28struct EditList<'a, C: Cursor> {
29    values: Vec<(C::Val<'a>, usize)>,
30    edits: Vec<(C::Time, C::Diff)>,
31}
32
33impl<'a, C: Cursor> EditList<'a, C> {
34    /// Creates an empty list of edits.
35    #[inline]
36    fn new() -> Self {
37        EditList {
38            values: Vec::new(),
39            edits: Vec::new(),
40        }
41    }
42    /// Loads the contents of a cursor.
43    fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
44    where
45        L: Fn(C::TimeGat<'_>)->C::Time,
46    {
47        self.clear();
48        while cursor.val_valid(storage) {
49            cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.into_owned()));
50            self.seal(cursor.val(storage));
51            cursor.step_val(storage);
52        }
53    }
54    /// Clears the list of edits.
55    #[inline]
56    fn clear(&mut self) {
57        self.values.clear();
58        self.edits.clear();
59    }
60    fn len(&self) -> usize { self.edits.len() }
61    /// Inserts a new edit for an as-yet undetermined value.
62    #[inline]
63    fn push(&mut self, time: C::Time, diff: C::Diff) {
64        // TODO: Could attempt "insertion-sort" like behavior here, where we collapse if possible.
65        self.edits.push((time, diff));
66    }
67    /// Associates all edits pushed since the previous `seal_value` call with `value`.
68    #[inline]
69    fn seal(&mut self, value: C::Val<'a>) {
70        let prev = self.values.last().map(|x| x.1).unwrap_or(0);
71        crate::consolidation::consolidate_from(&mut self.edits, prev);
72        if self.edits.len() > prev {
73            self.values.push((value, self.edits.len()));
74        }
75    }
76    fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
77        for index in 0 .. self.values.len() {
78            let lower = if index == 0 { 0 } else { self.values[index-1].1 };
79            let upper = self.values[index].1;
80            for edit in lower .. upper {
81                logic(self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
82            }
83        }
84    }
85}
86
87struct ValueHistory<'storage, C: Cursor> {
88    edits: EditList<'storage, C>,
89    history: Vec<(C::Time, C::Time, usize, usize)>,     // (time, meet, value_index, edit_offset)
90    buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>,   // where we accumulate / collapse updates.
91}
92
93impl<'storage, C: Cursor> ValueHistory<'storage, C> {
94    fn new() -> Self {
95        ValueHistory {
96            edits: EditList::new(),
97            history: Vec::new(),
98            buffer: Vec::new(),
99        }
100    }
101    fn clear(&mut self) {
102        self.edits.clear();
103        self.history.clear();
104        self.buffer.clear();
105    }
106    fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
107    where
108        L: Fn(C::TimeGat<'_>)->C::Time,
109    {
110        self.edits.load(cursor, storage, logic);
111    }
112
113    /// Loads and replays a specified key.
114    ///
115    /// If the key is absent, the replayed history will be empty.
116    fn replay_key<'history, L>(
117        &'history mut self,
118        cursor: &mut C,
119        storage: &'storage C::Storage,
120        key: C::Key<'storage>,
121        logic: L
122    ) -> HistoryReplay<'storage, 'history, C>
123    where
124        L: Fn(C::TimeGat<'_>)->C::Time,
125    {
126        self.clear();
127        cursor.seek_key(storage, key);
128        if cursor.get_key(storage) == Some(key) {
129            self.load(cursor, storage, logic);
130        }
131        self.replay()
132    }
133
134    /// Organizes history based on current contents of edits.
135    fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
136
137        self.buffer.clear();
138        self.history.clear();
139        for value_index in 0 .. self.edits.values.len() {
140            let lower = if value_index > 0 { self.edits.values[value_index-1].1 } else { 0 };
141            let upper = self.edits.values[value_index].1;
142            for edit_index in lower .. upper {
143                let time = self.edits.edits[edit_index].0.clone();
144                self.history.push((time.clone(), time, value_index, edit_index));
145            }
146        }
147
148        self.history.sort_by(|x,y| y.cmp(x));
149        for index in 1 .. self.history.len() {
150            self.history[index].1 = self.history[index].1.meet(&self.history[index-1].1);
151        }
152
153        HistoryReplay {
154            replay: self
155        }
156    }
157}
158
159struct HistoryReplay<'storage, 'history, C: Cursor> {
160    replay: &'history mut ValueHistory<'storage, C>
161}
162
163impl<'storage, 'history, C: Cursor> HistoryReplay<'storage, 'history, C> {
164    fn time(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.0) }
165    fn meet(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.1) }
166    fn edit(&self) -> Option<(C::Val<'storage>, &C::Time, &C::Diff)> {
167        self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
168    }
169
170    fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
171        &self.replay.buffer[..]
172    }
173
174    fn step(&mut self) {
175        let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
176        self.replay.buffer.push(((self.replay.edits.values[value_index].0, time), self.replay.edits.edits[edit_offset].1.clone()));
177    }
178    fn step_while_time_is(&mut self, time: &C::Time) -> bool {
179        let mut found = false;
180        while self.time() == Some(time) {
181            found = true;
182            self.step();
183        }
184        found
185    }
186    fn advance_buffer_by(&mut self, meet: &C::Time) {
187        for element in self.replay.buffer.iter_mut() {
188            (element.0).1 = (element.0).1.join(meet);
189        }
190        crate::consolidation::consolidate(&mut self.replay.buffer);
191    }
192    fn is_done(&self) -> bool { self.replay.history.is_empty() }
193}