differential_dataflow/trace/wrappers/
enter.rs1use timely::progress::timestamp::Refines;
5use timely::progress::{Antichain, frontier::AntichainRef};
6
7use crate::lattice::Lattice;
8use crate::trace::{TraceReader, BatchReader, Description};
9use crate::trace::cursor::Cursor;
10
11pub struct TraceEnter<Tr: TraceReader, TInner> {
13 trace: Tr,
14 stash1: Antichain<Tr::Time>,
15 stash2: Antichain<TInner>,
16}
17
18impl<Tr: TraceReader + Clone, TInner> Clone for TraceEnter<Tr, TInner> {
19 fn clone(&self) -> Self {
20 TraceEnter {
21 trace: self.trace.clone(),
22 stash1: Antichain::new(),
23 stash2: Antichain::new(),
24 }
25 }
26}
27
28impl<Tr, TInner> WithLayout for TraceEnter<Tr, TInner>
29where
30 Tr: TraceReader<Batch: Clone>,
31 TInner: Refines<Tr::Time>+Lattice,
32{
33 type Layout = (
34 <Tr::Layout as Layout>::KeyContainer,
35 <Tr::Layout as Layout>::ValContainer,
36 Vec<TInner>,
37 <Tr::Layout as Layout>::DiffContainer,
38 <Tr::Layout as Layout>::OffsetContainer,
39 );
40}
41
42impl<Tr, TInner> TraceReader for TraceEnter<Tr, TInner>
43where
44 Tr: TraceReader<Batch: Clone>,
45 TInner: Refines<Tr::Time>+Lattice,
46{
47 type Batch = BatchEnter<Tr::Batch, TInner>;
48 type Storage = Tr::Storage;
49 type Cursor = CursorEnter<Tr::Cursor, TInner>;
50
51 fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
52 self.trace.map_batches(|batch| {
53 f(&Self::Batch::make_from(batch.clone()));
54 })
55 }
56
57 fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
58 self.stash1.clear();
59 for time in frontier.iter() {
60 self.stash1.insert(time.clone().to_outer());
61 }
62 self.trace.set_logical_compaction(self.stash1.borrow());
63 }
64 fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> {
65 self.stash2.clear();
66 for time in self.trace.get_logical_compaction().iter() {
67 self.stash2.insert(TInner::to_inner(time.clone()));
68 }
69 self.stash2.borrow()
70 }
71
72 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
73 self.stash1.clear();
74 for time in frontier.iter() {
75 self.stash1.insert(time.clone().to_outer());
76 }
77 self.trace.set_physical_compaction(self.stash1.borrow());
78 }
79 fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> {
80 self.stash2.clear();
81 for time in self.trace.get_physical_compaction().iter() {
82 self.stash2.insert(TInner::to_inner(time.clone()));
83 }
84 self.stash2.borrow()
85 }
86
87 fn cursor_through(&mut self, upper: AntichainRef<TInner>) -> Option<(Self::Cursor, Self::Storage)> {
88 self.stash1.clear();
89 for time in upper.iter() {
90 self.stash1.insert(time.clone().to_outer());
91 }
92 self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x), y))
93 }
94}
95
96impl<Tr, TInner> TraceEnter<Tr, TInner>
97where
98 Tr: TraceReader,
99 TInner: Refines<Tr::Time>+Lattice,
100{
101 pub fn make_from(trace: Tr) -> Self {
103 TraceEnter {
104 trace,
105 stash1: Antichain::new(),
106 stash2: Antichain::new(),
107 }
108 }
109}
110
111
112#[derive(Clone)]
114pub struct BatchEnter<B, TInner> {
115 batch: B,
116 description: Description<TInner>,
117}
118
119impl<B, TInner> WithLayout for BatchEnter<B, TInner>
120where
121 B: BatchReader,
122 TInner: Refines<B::Time>+Lattice,
123{
124 type Layout = (
125 <B::Layout as Layout>::KeyContainer,
126 <B::Layout as Layout>::ValContainer,
127 Vec<TInner>,
128 <B::Layout as Layout>::DiffContainer,
129 <B::Layout as Layout>::OffsetContainer,
130 );
131}
132
133impl<B, TInner> BatchReader for BatchEnter<B, TInner>
134where
135 B: BatchReader,
136 TInner: Refines<B::Time>+Lattice,
137{
138 type Cursor = BatchCursorEnter<B::Cursor, TInner>;
139
140 fn cursor(&self) -> Self::Cursor {
141 BatchCursorEnter::new(self.batch.cursor())
142 }
143 fn len(&self) -> usize { self.batch.len() }
144 fn description(&self) -> &Description<TInner> { &self.description }
145}
146
147impl<B, TInner> BatchEnter<B, TInner>
148where
149 B: BatchReader,
150 TInner: Refines<B::Time>+Lattice,
151{
152 pub fn make_from(batch: B) -> Self {
154 let lower: Vec<_> = batch.description().lower().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
155 let upper: Vec<_> = batch.description().upper().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
156 let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
157
158 BatchEnter {
159 batch,
160 description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since))
161 }
162 }
163}
164
165pub struct CursorEnter<C, TInner> {
167 phantom: ::std::marker::PhantomData<TInner>,
168 cursor: C,
169}
170
171use crate::trace::implementations::{Layout, WithLayout};
172impl<C, TInner> WithLayout for CursorEnter<C, TInner>
173where
174 C: Cursor,
175 TInner: Refines<C::Time>+Lattice,
176{
177 type Layout = (
178 <C::Layout as Layout>::KeyContainer,
179 <C::Layout as Layout>::ValContainer,
180 Vec<TInner>,
181 <C::Layout as Layout>::DiffContainer,
182 <C::Layout as Layout>::OffsetContainer,
183 );
184}
185
186impl<C, TInner> CursorEnter<C, TInner> {
187 fn new(cursor: C) -> Self {
188 CursorEnter {
189 phantom: ::std::marker::PhantomData,
190 cursor,
191 }
192 }
193}
194
195impl<C, TInner> Cursor for CursorEnter<C, TInner>
196where
197 C: Cursor,
198 TInner: Refines<C::Time>+Lattice,
199{
200 type Storage = C::Storage;
201
202 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
203 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
204
205 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
206 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
207
208 #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
209 #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
210
211 #[inline]
212 fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
213 self.cursor.map_times(storage, |time, diff| {
214 logic(&TInner::to_inner(C::owned_time(time)), diff)
215 })
216 }
217
218 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
219 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
220
221 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
222 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
223
224 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
225 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
226}
227
228
229
230pub struct BatchCursorEnter<C, TInner> {
232 phantom: ::std::marker::PhantomData<TInner>,
233 cursor: C,
234}
235
236impl<C, TInner> BatchCursorEnter<C, TInner> {
237 fn new(cursor: C) -> Self {
238 BatchCursorEnter {
239 phantom: ::std::marker::PhantomData,
240 cursor,
241 }
242 }
243}
244
245impl<C, TInner> WithLayout for BatchCursorEnter<C, TInner>
246where
247 C: Cursor,
248 TInner: Refines<C::Time>+Lattice,
249{
250 type Layout = (
251 <C::Layout as Layout>::KeyContainer,
252 <C::Layout as Layout>::ValContainer,
253 Vec<TInner>,
254 <C::Layout as Layout>::DiffContainer,
255 <C::Layout as Layout>::OffsetContainer,
256 );
257}
258
259impl<TInner, C: Cursor> Cursor for BatchCursorEnter<C, TInner>
260where
261 TInner: Refines<C::Time>+Lattice,
262{
263 type Storage = BatchEnter<C::Storage, TInner>;
264
265 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
266 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
267
268 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
269 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
270
271 #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
272 #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
273
274 #[inline]
275 fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
276 self.cursor.map_times(&storage.batch, |time, diff| {
277 logic(&TInner::to_inner(C::owned_time(time)), diff)
278 })
279 }
280
281 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
282 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
283
284 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
285 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
286
287 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
288 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
289}