differential_dataflow/trace/wrappers/
frontier.rs

1//! Wrapper for frontiered trace.
2//!
3//! Wraps a trace with `since` and `upper` frontiers so that all exposed timestamps are first advanced
4//! by the `since` frontier and restricted by the `upper` frontier. This presents a deterministic trace
5//! on the interval `[since, upper)`, presenting only accumulations up to `since` (rather than partially
6//! accumulated updates) and no updates at times greater or equal to `upper` (even as parts of batches
7//! that span that time).
8
9use timely::progress::{Antichain, frontier::AntichainRef};
10
11use crate::trace::{TraceReader, BatchReader, Description};
12use crate::trace::cursor::Cursor;
13use crate::lattice::Lattice;
14
15/// Wrapper to provide trace to nested scope.
16pub struct TraceFrontier<Tr: TraceReader> {
17    trace: Tr,
18    /// Frontier to which all update times will be advanced.
19    since: Antichain<Tr::Time>,
20    /// Frontier after which all update times will be suppressed.
21    until: Antichain<Tr::Time>,
22}
23
24impl<Tr: TraceReader + Clone> Clone for TraceFrontier<Tr> {
25    fn clone(&self) -> Self {
26        TraceFrontier {
27            trace: self.trace.clone(),
28            since: self.since.clone(),
29            until: self.until.clone(),
30        }
31    }
32}
33
34impl<Tr: TraceReader> TraceReader for TraceFrontier<Tr> {
35    type Key<'a> = Tr::Key<'a>;
36    type Val<'a> = Tr::Val<'a>;
37    type Time = Tr::Time;
38    type TimeGat<'a> = &'a Tr::Time;
39    type Diff = Tr::Diff;
40    type DiffGat<'a> = Tr::DiffGat<'a>;
41
42    type Batch = BatchFrontier<Tr::Batch>;
43    type Storage = Tr::Storage;
44    type Cursor = CursorFrontier<Tr::Cursor, Tr::Time>;
45
46    fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
47        let since = self.since.borrow();
48        let until = self.until.borrow();
49        self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until)))
50    }
51
52    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
53    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
54
55    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
56    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
57
58    fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
59        let since = self.since.borrow();
60        let until = self.until.borrow();
61        self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, since, until), y))
62    }
63}
64
65impl<Tr: TraceReader> TraceFrontier<Tr> {
66    /// Makes a new trace wrapper
67    pub fn make_from(trace: Tr, since: AntichainRef<Tr::Time>, until: AntichainRef<Tr::Time>) -> Self {
68        TraceFrontier {
69            trace,
70            since: since.to_owned(),
71            until: until.to_owned(),
72        }
73    }
74}
75
76
77/// Wrapper to provide batch to nested scope.
78#[derive(Clone)]
79pub struct BatchFrontier<B: BatchReader> {
80    batch: B,
81    since: Antichain<B::Time>,
82    until: Antichain<B::Time>,
83}
84
85impl<B: BatchReader> BatchReader for BatchFrontier<B> {
86    type Key<'a> = B::Key<'a>;
87    type Val<'a> = B::Val<'a>;
88    type Time = B::Time;
89    type TimeGat<'a> = &'a B::Time;
90    type Diff = B::Diff;
91    type DiffGat<'a> = B::DiffGat<'a>;
92
93    type Cursor = BatchCursorFrontier<B::Cursor>;
94
95    fn cursor(&self) -> Self::Cursor {
96        BatchCursorFrontier::new(self.batch.cursor(), self.since.borrow(), self.until.borrow())
97    }
98    fn len(&self) -> usize { self.batch.len() }
99    fn description(&self) -> &Description<B::Time> { self.batch.description() }
100}
101
102impl<B: BatchReader> BatchFrontier<B> {
103    /// Makes a new batch wrapper
104    pub fn make_from(batch: B, since: AntichainRef<B::Time>, until: AntichainRef<B::Time>) -> Self {
105        BatchFrontier {
106            batch,
107            since: since.to_owned(),
108            until: until.to_owned(),
109        }
110    }
111}
112
113/// Wrapper to provide cursor to nested scope.
114pub struct CursorFrontier<C, T> {
115    cursor: C,
116    since: Antichain<T>,
117    until: Antichain<T>
118}
119
120impl<C, T: Clone> CursorFrontier<C, T> {
121    fn new(cursor: C, since: AntichainRef<T>, until: AntichainRef<T>) -> Self {
122        CursorFrontier {
123            cursor,
124            since: since.to_owned(),
125            until: until.to_owned(),
126        }
127    }
128}
129
130impl<C: Cursor> Cursor for CursorFrontier<C, C::Time> {
131    type Key<'a> = C::Key<'a>;
132    type Val<'a> = C::Val<'a>;
133    type Time = C::Time;
134    type TimeGat<'a> = &'a C::Time;
135    type Diff = C::Diff;
136    type DiffGat<'a> = C::DiffGat<'a>;
137
138    type Storage = C::Storage;
139
140    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
141    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
142
143    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
144    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
145
146    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
147    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
148
149    #[inline]
150    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
151        let since = self.since.borrow();
152        let until = self.until.borrow();
153        let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
154        self.cursor.map_times(storage, |time, diff| {
155            use crate::IntoOwned;
156            time.clone_onto(&mut temp);
157            temp.advance_by(since);
158            if !until.less_equal(&temp) {
159                logic(&temp, diff);
160            }
161        })
162    }
163
164    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
165    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
166
167    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
168    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
169
170    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
171    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
172}
173
174
175
176/// Wrapper to provide cursor to nested scope.
177pub struct BatchCursorFrontier<C: Cursor> {
178    cursor: C,
179    since: Antichain<C::Time>,
180    until: Antichain<C::Time>,
181}
182
183impl<C: Cursor> BatchCursorFrontier<C> {
184    fn new(cursor: C, since: AntichainRef<C::Time>, until: AntichainRef<C::Time>) -> Self {
185        BatchCursorFrontier {
186            cursor,
187            since: since.to_owned(),
188            until: until.to_owned(),
189        }
190    }
191}
192
193impl<C: Cursor<Storage: BatchReader>> Cursor for BatchCursorFrontier<C> {
194    type Key<'a> = C::Key<'a>;
195    type Val<'a> = C::Val<'a>;
196    type Time = C::Time;
197    type TimeGat<'a> = &'a C::Time;
198    type Diff = C::Diff;
199    type DiffGat<'a> = C::DiffGat<'a>;
200
201    type Storage = BatchFrontier<C::Storage>;
202
203    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
204    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
205
206    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
207    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
208
209    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
210    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
211
212    #[inline]
213    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
214        let since = self.since.borrow();
215        let until = self.until.borrow();
216        let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
217        self.cursor.map_times(&storage.batch, |time, diff| {
218            use crate::IntoOwned;
219            time.clone_onto(&mut temp);
220            temp.advance_by(since);
221            if !until.less_equal(&temp) {
222                logic(&temp, diff);
223            }
224        })
225    }
226
227    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
228    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
229
230    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
231    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
232
233    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
234    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
235}