differential_dataflow/trace/wrappers/
frontier.rs1use timely::progress::{Antichain, frontier::AntichainRef};
10
11use crate::trace::{TraceReader, BatchReader, Description};
12use crate::trace::cursor::Cursor;
13use crate::lattice::Lattice;
14
15pub struct TraceFrontier<Tr: TraceReader> {
17    trace: Tr,
18    since: Antichain<Tr::Time>,
20    until: Antichain<Tr::Time>,
22}
23
24impl<Tr: TraceReader + Clone> Clone for TraceFrontier<Tr> {
25    fn clone(&self) -> Self {
26        TraceFrontier {
27            trace: self.trace.clone(),
28            since: self.since.clone(),
29            until: self.until.clone(),
30        }
31    }
32}
33
34impl<Tr: TraceReader> WithLayout for TraceFrontier<Tr> {
35    type Layout = (
36        <Tr::Layout as Layout>::KeyContainer,
37        <Tr::Layout as Layout>::ValContainer,
38        Vec<Tr::Time>,
39        <Tr::Layout as Layout>::DiffContainer,
40        <Tr::Layout as Layout>::OffsetContainer,
41    );
42}
43
44impl<Tr: TraceReader> TraceReader for TraceFrontier<Tr> {
45
46    type Batch = BatchFrontier<Tr::Batch>;
47    type Storage = Tr::Storage;
48    type Cursor = CursorFrontier<Tr::Cursor, Tr::Time>;
49
50    fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
51        let since = self.since.borrow();
52        let until = self.until.borrow();
53        self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until)))
54    }
55
56    fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) }
57    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() }
58
59    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) }
60    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() }
61
62    fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
63        let since = self.since.borrow();
64        let until = self.until.borrow();
65        self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, since, until), y))
66    }
67}
68
69impl<Tr: TraceReader> TraceFrontier<Tr> {
70    pub fn make_from(trace: Tr, since: AntichainRef<'_, Tr::Time>, until: AntichainRef<'_, Tr::Time>) -> Self {
72        TraceFrontier {
73            trace,
74            since: since.to_owned(),
75            until: until.to_owned(),
76        }
77    }
78}
79
80
81#[derive(Clone)]
83pub struct BatchFrontier<B: BatchReader> {
84    batch: B,
85    since: Antichain<B::Time>,
86    until: Antichain<B::Time>,
87}
88
89impl<B: BatchReader> WithLayout for BatchFrontier<B> {
90    type Layout = (
91        <B::Layout as Layout>::KeyContainer,
92        <B::Layout as Layout>::ValContainer,
93        Vec<B::Time>,
94        <B::Layout as Layout>::DiffContainer,
95        <B::Layout as Layout>::OffsetContainer,
96    );
97}
98
99impl<B: BatchReader> BatchReader for BatchFrontier<B> {
100
101    type Cursor = BatchCursorFrontier<B::Cursor>;
102
103    fn cursor(&self) -> Self::Cursor {
104        BatchCursorFrontier::new(self.batch.cursor(), self.since.borrow(), self.until.borrow())
105    }
106    fn len(&self) -> usize { self.batch.len() }
107    fn description(&self) -> &Description<B::Time> { self.batch.description() }
108}
109
110impl<B: BatchReader> BatchFrontier<B> {
111    pub fn make_from(batch: B, since: AntichainRef<B::Time>, until: AntichainRef<B::Time>) -> Self {
113        BatchFrontier {
114            batch,
115            since: since.to_owned(),
116            until: until.to_owned(),
117        }
118    }
119}
120
121pub struct CursorFrontier<C, T> {
123    cursor: C,
124    since: Antichain<T>,
125    until: Antichain<T>
126}
127
128use crate::trace::implementations::{Layout, WithLayout};
129impl<C: Cursor> WithLayout for CursorFrontier<C, C::Time> {
130    type Layout = (
131        <C::Layout as Layout>::KeyContainer,
132        <C::Layout as Layout>::ValContainer,
133        Vec<C::Time>,
134        <C::Layout as Layout>::DiffContainer,
135        <C::Layout as Layout>::OffsetContainer,
136    );
137}
138
139impl<C, T: Clone> CursorFrontier<C, T> {
140    fn new(cursor: C, since: AntichainRef<T>, until: AntichainRef<T>) -> Self {
141        CursorFrontier {
142            cursor,
143            since: since.to_owned(),
144            until: until.to_owned(),
145        }
146    }
147}
148
149impl<C: Cursor> Cursor for CursorFrontier<C, C::Time> {
150
151    type Storage = C::Storage;
152
153    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
154    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
155
156    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
157    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
158
159    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
160    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
161
162    #[inline]
163    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
164        let since = self.since.borrow();
165        let until = self.until.borrow();
166        let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
167        self.cursor.map_times(storage, |time, diff| {
168            C::clone_time_onto(time, &mut temp);
169            temp.advance_by(since);
170            if !until.less_equal(&temp) {
171                logic(&temp, diff);
172            }
173        })
174    }
175
176    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
177    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
178
179    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
180    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
181
182    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
183    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
184}
185
186
187
188pub struct BatchCursorFrontier<C: Cursor> {
190    cursor: C,
191    since: Antichain<C::Time>,
192    until: Antichain<C::Time>,
193}
194
195impl<C: Cursor> WithLayout for BatchCursorFrontier<C> {
196    type Layout = (
197        <C::Layout as Layout>::KeyContainer,
198        <C::Layout as Layout>::ValContainer,
199        Vec<C::Time>,
200        <C::Layout as Layout>::DiffContainer,
201        <C::Layout as Layout>::OffsetContainer,
202    );
203}
204
205impl<C: Cursor> BatchCursorFrontier<C> {
206    fn new(cursor: C, since: AntichainRef<C::Time>, until: AntichainRef<C::Time>) -> Self {
207        BatchCursorFrontier {
208            cursor,
209            since: since.to_owned(),
210            until: until.to_owned(),
211        }
212    }
213}
214
215impl<C: Cursor<Storage: BatchReader>> Cursor for BatchCursorFrontier<C> {
216
217    type Storage = BatchFrontier<C::Storage>;
218
219    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
220    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
221
222    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
223    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
224
225    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
226    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
227
228    #[inline]
229    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
230        let since = self.since.borrow();
231        let until = self.until.borrow();
232        let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
233        self.cursor.map_times(&storage.batch, |time, diff| {
234            C::clone_time_onto(time, &mut temp);
235            temp.advance_by(since);
236            if !until.less_equal(&temp) {
237                logic(&temp, diff);
238            }
239        })
240    }
241
242    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
243    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
244
245    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
246    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
247
248    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
249    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
250}