1use timely::progress::timestamp::Refines;
4use timely::progress::{Antichain, frontier::AntichainRef};
5
6use crate::lattice::Lattice;
7use crate::trace::{TraceReader, BatchReader, Description};
8use crate::trace::cursor::Cursor;
9
10pub struct TraceEnter<Tr: TraceReader, TInner, F, G> {
19 trace: Tr,
20 stash1: Antichain<Tr::Time>,
21 stash2: Antichain<TInner>,
22 logic: F,
23 prior: G,
24}
25
26impl<Tr,TInner,F,G> Clone for TraceEnter<Tr, TInner, F, G>
27where
28 Tr: TraceReader+Clone,
29 F: Clone,
30 G: Clone,
31{
32 fn clone(&self) -> Self {
33 TraceEnter {
34 trace: self.trace.clone(),
35 stash1: Antichain::new(),
36 stash2: Antichain::new(),
37 logic: self.logic.clone(),
38 prior: self.prior.clone(),
39 }
40 }
41}
42
43impl<Tr, TInner, F, G> WithLayout for TraceEnter<Tr, TInner, F, G>
44where
45 Tr: TraceReader<Batch: Clone>,
46 TInner: Refines<Tr::Time>+Lattice,
47 F: Clone,
48 G: Clone,
49{
50 type Layout = (
51 <Tr::Layout as Layout>::KeyContainer,
52 <Tr::Layout as Layout>::ValContainer,
53 Vec<TInner>,
54 <Tr::Layout as Layout>::DiffContainer,
55 <Tr::Layout as Layout>::OffsetContainer,
56 );
57}
58
59impl<Tr, TInner, F, G> TraceReader for TraceEnter<Tr, TInner, F, G>
60where
61 Tr: TraceReader<Batch: Clone>,
62 TInner: Refines<Tr::Time>+Lattice,
63 F: 'static,
64 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone,
65 G: FnMut(&TInner)->Tr::Time+Clone+'static,
66{
67 type Batch = BatchEnter<Tr::Batch, TInner,F>;
68 type Storage = Tr::Storage;
69 type Cursor = CursorEnter<Tr::Cursor, TInner,F>;
70
71 fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
72 let logic = self.logic.clone();
73 self.trace.map_batches(|batch| {
74 f(&Self::Batch::make_from(batch.clone(), logic.clone()));
75 })
76 }
77
78 fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
79 self.stash1.clear();
80 for time in frontier.iter() {
81 self.stash1.insert((self.prior)(time));
82 }
83 self.trace.set_logical_compaction(self.stash1.borrow());
84 }
85 fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> {
86 self.stash2.clear();
87 for time in self.trace.get_logical_compaction().iter() {
88 self.stash2.insert(TInner::to_inner(time.clone()));
89 }
90 self.stash2.borrow()
91 }
92
93 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
94 self.stash1.clear();
95 for time in frontier.iter() {
96 self.stash1.insert((self.prior)(time));
97 }
98 self.trace.set_physical_compaction(self.stash1.borrow());
99 }
100 fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> {
101 self.stash2.clear();
102 for time in self.trace.get_physical_compaction().iter() {
103 self.stash2.insert(TInner::to_inner(time.clone()));
104 }
105 self.stash2.borrow()
106 }
107
108 fn cursor_through(&mut self, upper: AntichainRef<TInner>) -> Option<(Self::Cursor, Self::Storage)> {
109 self.stash1.clear();
110 for time in upper.iter() {
111 self.stash1.insert(time.clone().to_outer());
112 }
113 self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x, self.logic.clone()), y))
114 }
115}
116
117impl<Tr, TInner, F, G> TraceEnter<Tr, TInner, F, G>
118where
119 Tr: TraceReader,
120 TInner: Refines<Tr::Time>+Lattice,
121{
122 pub fn make_from(trace: Tr, logic: F, prior: G) -> Self {
124 TraceEnter {
125 trace,
126 stash1: Antichain::new(),
127 stash2: Antichain::new(),
128 logic,
129 prior,
130 }
131 }
132}
133
134
135#[derive(Clone)]
137pub struct BatchEnter<B, TInner, F> {
138 batch: B,
139 description: Description<TInner>,
140 logic: F,
141}
142
143impl<B, TInner, F> WithLayout for BatchEnter<B, TInner, F>
144where
145 B: BatchReader,
146 TInner: Refines<B::Time>+Lattice,
147{
148 type Layout = (
149 <B::Layout as Layout>::KeyContainer,
150 <B::Layout as Layout>::ValContainer,
151 Vec<TInner>,
152 <B::Layout as Layout>::DiffContainer,
153 <B::Layout as Layout>::OffsetContainer,
154 );
155}
156
157use crate::trace::implementations::LayoutExt;
158impl<B, TInner, F> BatchReader for BatchEnter<B, TInner, F>
159where
160 B: BatchReader,
161 TInner: Refines<B::Time>+Lattice,
162 F: FnMut(B::Key<'_>, <B::Cursor as LayoutExt>::Val<'_>, B::TimeGat<'_>)->TInner+Clone,
163{
164 type Cursor = BatchCursorEnter<B::Cursor, TInner, F>;
165
166 fn cursor(&self) -> Self::Cursor {
167 BatchCursorEnter::new(self.batch.cursor(), self.logic.clone())
168 }
169 fn len(&self) -> usize { self.batch.len() }
170 fn description(&self) -> &Description<TInner> { &self.description }
171}
172
173impl<B, TInner, F> BatchEnter<B, TInner, F>
174where
175 B: BatchReader,
176 TInner: Refines<B::Time>+Lattice,
177{
178 pub fn make_from(batch: B, logic: F) -> Self {
180 let lower: Vec<_> = batch.description().lower().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
181 let upper: Vec<_> = batch.description().upper().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
182 let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
183
184 BatchEnter {
185 batch,
186 description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since)),
187 logic,
188 }
189 }
190}
191
192pub struct CursorEnter<C, TInner, F> {
194 phantom: ::std::marker::PhantomData<TInner>,
195 cursor: C,
196 logic: F,
197}
198
199use crate::trace::implementations::{Layout, WithLayout};
200impl<C, TInner, F> WithLayout for CursorEnter<C, TInner, F>
201where
202 C: Cursor,
203 TInner: Refines<C::Time>+Lattice,
204{
205 type Layout = (
206 <C::Layout as Layout>::KeyContainer,
207 <C::Layout as Layout>::ValContainer,
208 Vec<TInner>,
209 <C::Layout as Layout>::DiffContainer,
210 <C::Layout as Layout>::OffsetContainer,
211 );
212}
213
214impl<C, TInner, F> CursorEnter<C, TInner, F> {
215 fn new(cursor: C, logic: F) -> Self {
216 CursorEnter {
217 phantom: ::std::marker::PhantomData,
218 cursor,
219 logic,
220 }
221 }
222}
223
224impl<C, TInner, F> Cursor for CursorEnter<C, TInner, F>
225where
226 C: Cursor,
227 TInner: Refines<C::Time>+Lattice,
228 F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner,
229{
230 type Storage = C::Storage;
231
232 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
233 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
234
235 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
236 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
237
238 #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
239 #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
240
241 #[inline]
242 fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
243 let key = self.key(storage);
244 let val = self.val(storage);
245 let logic2 = &mut self.logic;
246 self.cursor.map_times(storage, |time, diff| {
247 logic(&logic2(key, val, time), diff)
248 })
249 }
250
251 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
252 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
253
254 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
255 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
256
257 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
258 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
259}
260
261
262
263pub struct BatchCursorEnter<C, TInner, F> {
265 phantom: ::std::marker::PhantomData<TInner>,
266 cursor: C,
267 logic: F,
268}
269
270impl<C, TInner, F> WithLayout for BatchCursorEnter<C, TInner, F>
271where
272 C: Cursor,
273 TInner: Refines<C::Time>+Lattice,
274{
275 type Layout = (
276 <C::Layout as Layout>::KeyContainer,
277 <C::Layout as Layout>::ValContainer,
278 Vec<TInner>,
279 <C::Layout as Layout>::DiffContainer,
280 <C::Layout as Layout>::OffsetContainer,
281 );
282}
283
284impl<C, TInner, F> BatchCursorEnter<C, TInner, F> {
285 fn new(cursor: C, logic: F) -> Self {
286 BatchCursorEnter {
287 phantom: ::std::marker::PhantomData,
288 cursor,
289 logic,
290 }
291 }
292}
293
294impl<TInner, C: Cursor, F> Cursor for BatchCursorEnter<C, TInner, F>
295where
296 TInner: Refines<C::Time>+Lattice,
297 F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner,
298{
299 type Storage = BatchEnter<C::Storage, TInner, F>;
300
301 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
302 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
303
304 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
305 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
306
307 #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
308 #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
309
310 #[inline]
311 fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
312 let key = self.key(storage);
313 let val = self.val(storage);
314 let logic2 = &mut self.logic;
315 self.cursor.map_times(&storage.batch, |time, diff| {
316 logic(&logic2(key, val, time), diff)
317 })
318 }
319
320 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
321 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
322
323 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
324 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
325
326 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
327 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
328}