differential_dataflow/trace/wrappers/
enter_at.rs

1//! Wrappers to provide trace access to nested scopes.
2
3use timely::progress::timestamp::Refines;
4use timely::progress::{Antichain, frontier::AntichainRef};
5
6use crate::lattice::Lattice;
7use crate::trace::{TraceReader, BatchReader, Description};
8use crate::trace::cursor::Cursor;
9
10/// Wrapper to provide trace to nested scope.
11///
12/// Each wrapped update is presented with a timestamp determined by `logic`.
13///
14/// At the same time, we require a method `prior` that can "invert" timestamps,
15/// and which will be applied to compaction frontiers as they are communicated
16/// back to the wrapped traces. A better explanation is pending, and until that
17/// happens use this construct at your own peril!
18pub struct TraceEnter<Tr: TraceReader, TInner, F, G> {
19    trace: Tr,
20    stash1: Antichain<Tr::Time>,
21    stash2: Antichain<TInner>,
22    logic: F,
23    prior: G,
24}
25
26impl<Tr,TInner,F,G> Clone for TraceEnter<Tr, TInner, F, G>
27where
28    Tr: TraceReader+Clone,
29    F: Clone,
30    G: Clone,
31{
32    fn clone(&self) -> Self {
33        TraceEnter {
34            trace: self.trace.clone(),
35            stash1: Antichain::new(),
36            stash2: Antichain::new(),
37            logic: self.logic.clone(),
38            prior: self.prior.clone(),
39        }
40    }
41}
42
43impl<Tr, TInner, F, G> WithLayout for TraceEnter<Tr, TInner, F, G>
44where
45    Tr: TraceReader<Batch: Clone>,
46    TInner: Refines<Tr::Time>+Lattice,
47    F: Clone,
48    G: Clone,
49{
50    type Layout = (
51        <Tr::Layout as Layout>::KeyContainer,
52        <Tr::Layout as Layout>::ValContainer,
53        Vec<TInner>,
54        <Tr::Layout as Layout>::DiffContainer,
55        <Tr::Layout as Layout>::OffsetContainer,
56    );
57}
58
59impl<Tr, TInner, F, G> TraceReader for TraceEnter<Tr, TInner, F, G>
60where
61    Tr: TraceReader<Batch: Clone>,
62    TInner: Refines<Tr::Time>+Lattice,
63    F: 'static,
64    F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone,
65    G: FnMut(&TInner)->Tr::Time+Clone+'static,
66{
67    type Batch = BatchEnter<Tr::Batch, TInner,F>;
68    type Storage = Tr::Storage;
69    type Cursor = CursorEnter<Tr::Cursor, TInner,F>;
70
71    fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
72        let logic = self.logic.clone();
73        self.trace.map_batches(|batch| {
74            f(&Self::Batch::make_from(batch.clone(), logic.clone()));
75        })
76    }
77
78    fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
79        self.stash1.clear();
80        for time in frontier.iter() {
81            self.stash1.insert((self.prior)(time));
82        }
83        self.trace.set_logical_compaction(self.stash1.borrow());
84    }
85    fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> {
86        self.stash2.clear();
87        for time in self.trace.get_logical_compaction().iter() {
88            self.stash2.insert(TInner::to_inner(time.clone()));
89        }
90        self.stash2.borrow()
91    }
92
93    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
94        self.stash1.clear();
95        for time in frontier.iter() {
96            self.stash1.insert((self.prior)(time));
97        }
98        self.trace.set_physical_compaction(self.stash1.borrow());
99    }
100    fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> {
101        self.stash2.clear();
102        for time in self.trace.get_physical_compaction().iter() {
103            self.stash2.insert(TInner::to_inner(time.clone()));
104        }
105        self.stash2.borrow()
106    }
107
108    fn cursor_through(&mut self, upper: AntichainRef<TInner>) -> Option<(Self::Cursor, Self::Storage)> {
109        self.stash1.clear();
110        for time in upper.iter() {
111            self.stash1.insert(time.clone().to_outer());
112        }
113        self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x, self.logic.clone()), y))
114    }
115}
116
117impl<Tr, TInner, F, G> TraceEnter<Tr, TInner, F, G>
118where
119    Tr: TraceReader,
120    TInner: Refines<Tr::Time>+Lattice,
121{
122    /// Makes a new trace wrapper
123    pub fn make_from(trace: Tr, logic: F, prior: G) -> Self {
124        TraceEnter {
125            trace,
126            stash1: Antichain::new(),
127            stash2: Antichain::new(),
128            logic,
129            prior,
130        }
131    }
132}
133
134
135/// Wrapper to provide batch to nested scope.
136#[derive(Clone)]
137pub struct BatchEnter<B, TInner, F> {
138    batch: B,
139    description: Description<TInner>,
140    logic: F,
141}
142
143impl<B, TInner, F> WithLayout for BatchEnter<B, TInner, F>
144where
145    B: BatchReader,
146    TInner: Refines<B::Time>+Lattice,
147{
148    type Layout = (
149        <B::Layout as Layout>::KeyContainer,
150        <B::Layout as Layout>::ValContainer,
151        Vec<TInner>,
152        <B::Layout as Layout>::DiffContainer,
153        <B::Layout as Layout>::OffsetContainer,
154    );
155}
156
157use crate::trace::implementations::LayoutExt;
158impl<B, TInner, F> BatchReader for BatchEnter<B, TInner, F>
159where
160    B: BatchReader,
161    TInner: Refines<B::Time>+Lattice,
162    F: FnMut(B::Key<'_>, <B::Cursor as LayoutExt>::Val<'_>, B::TimeGat<'_>)->TInner+Clone,
163{
164    type Cursor = BatchCursorEnter<B::Cursor, TInner, F>;
165
166    fn cursor(&self) -> Self::Cursor {
167        BatchCursorEnter::new(self.batch.cursor(), self.logic.clone())
168    }
169    fn len(&self) -> usize { self.batch.len() }
170    fn description(&self) -> &Description<TInner> { &self.description }
171}
172
173impl<B, TInner, F> BatchEnter<B, TInner, F>
174where
175    B: BatchReader,
176    TInner: Refines<B::Time>+Lattice,
177{
178    /// Makes a new batch wrapper
179    pub fn make_from(batch: B, logic: F) -> Self {
180        let lower: Vec<_> = batch.description().lower().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
181        let upper: Vec<_> = batch.description().upper().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
182        let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
183
184        BatchEnter {
185            batch,
186            description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since)),
187            logic,
188        }
189    }
190}
191
192/// Wrapper to provide cursor to nested scope.
193pub struct CursorEnter<C, TInner, F> {
194    phantom: ::std::marker::PhantomData<TInner>,
195    cursor: C,
196    logic: F,
197}
198
199use crate::trace::implementations::{Layout, WithLayout};
200impl<C, TInner, F> WithLayout for CursorEnter<C, TInner, F>
201where
202    C: Cursor,
203    TInner: Refines<C::Time>+Lattice,
204{
205    type Layout = (
206        <C::Layout as Layout>::KeyContainer,
207        <C::Layout as Layout>::ValContainer,
208        Vec<TInner>,
209        <C::Layout as Layout>::DiffContainer,
210        <C::Layout as Layout>::OffsetContainer,
211    );
212}
213
214impl<C, TInner, F> CursorEnter<C, TInner, F> {
215    fn new(cursor: C, logic: F) -> Self {
216        CursorEnter {
217            phantom: ::std::marker::PhantomData,
218            cursor,
219            logic,
220        }
221    }
222}
223
224impl<C, TInner, F> Cursor for CursorEnter<C, TInner, F>
225where
226    C: Cursor,
227    TInner: Refines<C::Time>+Lattice,
228    F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner,
229{
230    type Storage = C::Storage;
231
232    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
233    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
234
235    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
236    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
237
238    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
239    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
240
241    #[inline]
242    fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
243        let key = self.key(storage);
244        let val = self.val(storage);
245        let logic2 = &mut self.logic;
246        self.cursor.map_times(storage, |time, diff| {
247            logic(&logic2(key, val, time), diff)
248        })
249    }
250
251    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
252    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
253
254    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
255    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
256
257    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
258    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
259}
260
261
262
263/// Wrapper to provide cursor to nested scope.
264pub struct BatchCursorEnter<C, TInner, F> {
265    phantom: ::std::marker::PhantomData<TInner>,
266    cursor: C,
267    logic: F,
268}
269
270impl<C, TInner, F> WithLayout for BatchCursorEnter<C, TInner, F>
271where
272    C: Cursor,
273    TInner: Refines<C::Time>+Lattice,
274{
275    type Layout = (
276        <C::Layout as Layout>::KeyContainer,
277        <C::Layout as Layout>::ValContainer,
278        Vec<TInner>,
279        <C::Layout as Layout>::DiffContainer,
280        <C::Layout as Layout>::OffsetContainer,
281    );
282}
283
284impl<C, TInner, F> BatchCursorEnter<C, TInner, F> {
285    fn new(cursor: C, logic: F) -> Self {
286        BatchCursorEnter {
287            phantom: ::std::marker::PhantomData,
288            cursor,
289            logic,
290        }
291    }
292}
293
294impl<TInner, C: Cursor, F> Cursor for BatchCursorEnter<C, TInner, F>
295where
296    TInner: Refines<C::Time>+Lattice,
297    F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner,
298{
299    type Storage = BatchEnter<C::Storage, TInner, F>;
300
301    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
302    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
303
304    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
305    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
306
307    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
308    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
309
310    #[inline]
311    fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
312        let key = self.key(storage);
313        let val = self.val(storage);
314        let logic2 = &mut self.logic;
315        self.cursor.map_times(&storage.batch, |time, diff| {
316            logic(&logic2(key, val, time), diff)
317        })
318    }
319
320    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
321    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
322
323    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
324    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
325
326    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
327    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
328}