differential_dataflow/trace/wrappers/
enter.rs

1//! Wrappers to provide trace access to nested scopes.
2
3// use timely::progress::nested::product::Product;
4use timely::progress::timestamp::Refines;
5use timely::progress::{Antichain, frontier::AntichainRef};
6
7use crate::lattice::Lattice;
8use crate::trace::{TraceReader, BatchReader, Description};
9use crate::trace::cursor::Cursor;
10
11/// Wrapper to provide trace to nested scope.
12pub struct TraceEnter<Tr: TraceReader, TInner> {
13    trace: Tr,
14    stash1: Antichain<Tr::Time>,
15    stash2: Antichain<TInner>,
16}
17
18impl<Tr: TraceReader + Clone, TInner> Clone for TraceEnter<Tr, TInner> {
19    fn clone(&self) -> Self {
20        TraceEnter {
21            trace: self.trace.clone(),
22            stash1: Antichain::new(),
23            stash2: Antichain::new(),
24        }
25    }
26}
27
28impl<Tr, TInner> WithLayout for TraceEnter<Tr, TInner>
29where
30    Tr: TraceReader<Batch: Clone>,
31    TInner: Refines<Tr::Time>+Lattice,
32{
33    type Layout = (
34        <Tr::Layout as Layout>::KeyContainer,
35        <Tr::Layout as Layout>::ValContainer,
36        Vec<TInner>,
37        <Tr::Layout as Layout>::DiffContainer,
38        <Tr::Layout as Layout>::OffsetContainer,
39    );
40}
41
42impl<Tr, TInner> TraceReader for TraceEnter<Tr, TInner>
43where
44    Tr: TraceReader<Batch: Clone>,
45    TInner: Refines<Tr::Time>+Lattice,
46{
47    type Batch = BatchEnter<Tr::Batch, TInner>;
48    type Storage = Tr::Storage;
49    type Cursor = CursorEnter<Tr::Cursor, TInner>;
50
51    fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
52        self.trace.map_batches(|batch| {
53            f(&Self::Batch::make_from(batch.clone()));
54        })
55    }
56
57    fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
58        self.stash1.clear();
59        for time in frontier.iter() {
60            self.stash1.insert(time.clone().to_outer());
61        }
62        self.trace.set_logical_compaction(self.stash1.borrow());
63    }
64    fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> {
65        self.stash2.clear();
66        for time in self.trace.get_logical_compaction().iter() {
67            self.stash2.insert(TInner::to_inner(time.clone()));
68        }
69        self.stash2.borrow()
70    }
71
72    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
73        self.stash1.clear();
74        for time in frontier.iter() {
75            self.stash1.insert(time.clone().to_outer());
76        }
77        self.trace.set_physical_compaction(self.stash1.borrow());
78    }
79    fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> {
80        self.stash2.clear();
81        for time in self.trace.get_physical_compaction().iter() {
82            self.stash2.insert(TInner::to_inner(time.clone()));
83        }
84        self.stash2.borrow()
85    }
86
87    fn cursor_through(&mut self, upper: AntichainRef<TInner>) -> Option<(Self::Cursor, Self::Storage)> {
88        self.stash1.clear();
89        for time in upper.iter() {
90            self.stash1.insert(time.clone().to_outer());
91        }
92        self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x), y))
93    }
94}
95
96impl<Tr, TInner> TraceEnter<Tr, TInner>
97where
98    Tr: TraceReader,
99    TInner: Refines<Tr::Time>+Lattice,
100{
101    /// Makes a new trace wrapper
102    pub fn make_from(trace: Tr) -> Self {
103        TraceEnter {
104            trace,
105            stash1: Antichain::new(),
106            stash2: Antichain::new(),
107        }
108    }
109}
110
111
112/// Wrapper to provide batch to nested scope.
113#[derive(Clone)]
114pub struct BatchEnter<B, TInner> {
115    batch: B,
116    description: Description<TInner>,
117}
118
119impl<B, TInner> WithLayout for BatchEnter<B, TInner>
120where
121    B: BatchReader,
122    TInner: Refines<B::Time>+Lattice,
123{
124    type Layout = (
125        <B::Layout as Layout>::KeyContainer,
126        <B::Layout as Layout>::ValContainer,
127        Vec<TInner>,
128        <B::Layout as Layout>::DiffContainer,
129        <B::Layout as Layout>::OffsetContainer,
130    );
131}
132
133impl<B, TInner> BatchReader for BatchEnter<B, TInner>
134where
135    B: BatchReader,
136    TInner: Refines<B::Time>+Lattice,
137{
138    type Cursor = BatchCursorEnter<B::Cursor, TInner>;
139
140    fn cursor(&self) -> Self::Cursor {
141        BatchCursorEnter::new(self.batch.cursor())
142    }
143    fn len(&self) -> usize { self.batch.len() }
144    fn description(&self) -> &Description<TInner> { &self.description }
145}
146
147impl<B, TInner> BatchEnter<B, TInner>
148where
149    B: BatchReader,
150    TInner: Refines<B::Time>+Lattice,
151{
152    /// Makes a new batch wrapper
153    pub fn make_from(batch: B) -> Self {
154        let lower: Vec<_> = batch.description().lower().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
155        let upper: Vec<_> = batch.description().upper().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
156        let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
157
158        BatchEnter {
159            batch,
160            description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since))
161        }
162    }
163}
164
165/// Wrapper to provide cursor to nested scope.
166pub struct CursorEnter<C, TInner> {
167    phantom: ::std::marker::PhantomData<TInner>,
168    cursor: C,
169}
170
171use crate::trace::implementations::{Layout, WithLayout};
172impl<C, TInner> WithLayout for CursorEnter<C, TInner>
173where
174    C: Cursor,
175    TInner: Refines<C::Time>+Lattice,
176{
177    type Layout = (
178        <C::Layout as Layout>::KeyContainer,
179        <C::Layout as Layout>::ValContainer,
180        Vec<TInner>,
181        <C::Layout as Layout>::DiffContainer,
182        <C::Layout as Layout>::OffsetContainer,
183    );
184}
185
186impl<C, TInner> CursorEnter<C, TInner> {
187    fn new(cursor: C) -> Self {
188        CursorEnter {
189            phantom: ::std::marker::PhantomData,
190            cursor,
191        }
192    }
193}
194
195impl<C, TInner> Cursor for CursorEnter<C, TInner>
196where
197    C: Cursor,
198    TInner: Refines<C::Time>+Lattice,
199{
200    type Storage = C::Storage;
201
202    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
203    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
204
205    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
206    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
207
208    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
209    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
210
211    #[inline]
212    fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
213        self.cursor.map_times(storage, |time, diff| {
214            logic(&TInner::to_inner(C::owned_time(time)), diff)
215        })
216    }
217
218    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
219    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
220
221    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
222    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
223
224    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
225    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
226}
227
228
229
230/// Wrapper to provide cursor to nested scope.
231pub struct BatchCursorEnter<C, TInner> {
232    phantom: ::std::marker::PhantomData<TInner>,
233    cursor: C,
234}
235
236impl<C, TInner> BatchCursorEnter<C, TInner> {
237    fn new(cursor: C) -> Self {
238        BatchCursorEnter {
239            phantom: ::std::marker::PhantomData,
240            cursor,
241        }
242    }
243}
244
245impl<C, TInner> WithLayout for BatchCursorEnter<C, TInner>
246where
247    C: Cursor,
248    TInner: Refines<C::Time>+Lattice,
249{
250    type Layout = (
251        <C::Layout as Layout>::KeyContainer,
252        <C::Layout as Layout>::ValContainer,
253        Vec<TInner>,
254        <C::Layout as Layout>::DiffContainer,
255        <C::Layout as Layout>::OffsetContainer,
256    );
257}
258
259impl<TInner, C: Cursor> Cursor for BatchCursorEnter<C, TInner>
260where
261    TInner: Refines<C::Time>+Lattice,
262{
263    type Storage = BatchEnter<C::Storage, TInner>;
264
265    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
266    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
267
268    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
269    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
270
271    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
272    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
273
274    #[inline]
275    fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
276        self.cursor.map_times(&storage.batch, |time, diff| {
277            logic(&TInner::to_inner(C::owned_time(time)), diff)
278        })
279    }
280
281    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
282    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
283
284    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
285    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
286
287    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
288    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
289}