differential_dataflow/trace/wrappers/
enter.rs
1use timely::progress::timestamp::Refines;
5use timely::progress::{Antichain, frontier::AntichainRef};
6
7use crate::lattice::Lattice;
8use crate::trace::{TraceReader, BatchReader, Description};
9use crate::trace::cursor::Cursor;
10
11pub struct TraceEnter<Tr: TraceReader, TInner> {
13 trace: Tr,
14 stash1: Antichain<Tr::Time>,
15 stash2: Antichain<TInner>,
16}
17
18impl<Tr: TraceReader + Clone, TInner> Clone for TraceEnter<Tr, TInner> {
19 fn clone(&self) -> Self {
20 TraceEnter {
21 trace: self.trace.clone(),
22 stash1: Antichain::new(),
23 stash2: Antichain::new(),
24 }
25 }
26}
27
28impl<Tr, TInner> TraceReader for TraceEnter<Tr, TInner>
29where
30 Tr: TraceReader,
31 Tr::Batch: Clone,
32 TInner: Refines<Tr::Time>+Lattice,
33{
34 type Key<'a> = Tr::Key<'a>;
35 type Val<'a> = Tr::Val<'a>;
36 type Time = TInner;
37 type TimeGat<'a> = &'a TInner;
38 type Diff = Tr::Diff;
39 type DiffGat<'a> = Tr::DiffGat<'a>;
40
41 type Batch = BatchEnter<Tr::Batch, TInner>;
42 type Storage = Tr::Storage;
43 type Cursor = CursorEnter<Tr::Cursor, TInner>;
44
45 fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
46 self.trace.map_batches(|batch| {
47 f(&Self::Batch::make_from(batch.clone()));
48 })
49 }
50
51 fn set_logical_compaction(&mut self, frontier: AntichainRef<TInner>) {
52 self.stash1.clear();
53 for time in frontier.iter() {
54 self.stash1.insert(time.clone().to_outer());
55 }
56 self.trace.set_logical_compaction(self.stash1.borrow());
57 }
58 fn get_logical_compaction(&mut self) -> AntichainRef<TInner> {
59 self.stash2.clear();
60 for time in self.trace.get_logical_compaction().iter() {
61 self.stash2.insert(TInner::to_inner(time.clone()));
62 }
63 self.stash2.borrow()
64 }
65
66 fn set_physical_compaction(&mut self, frontier: AntichainRef<TInner>) {
67 self.stash1.clear();
68 for time in frontier.iter() {
69 self.stash1.insert(time.clone().to_outer());
70 }
71 self.trace.set_physical_compaction(self.stash1.borrow());
72 }
73 fn get_physical_compaction(&mut self) -> AntichainRef<TInner> {
74 self.stash2.clear();
75 for time in self.trace.get_physical_compaction().iter() {
76 self.stash2.insert(TInner::to_inner(time.clone()));
77 }
78 self.stash2.borrow()
79 }
80
81 fn cursor_through(&mut self, upper: AntichainRef<TInner>) -> Option<(Self::Cursor, Self::Storage)> {
82 self.stash1.clear();
83 for time in upper.iter() {
84 self.stash1.insert(time.clone().to_outer());
85 }
86 self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x), y))
87 }
88}
89
90impl<Tr, TInner> TraceEnter<Tr, TInner>
91where
92 Tr: TraceReader,
93 TInner: Refines<Tr::Time>+Lattice,
94{
95 pub fn make_from(trace: Tr) -> Self {
97 TraceEnter {
98 trace,
99 stash1: Antichain::new(),
100 stash2: Antichain::new(),
101 }
102 }
103}
104
105
106#[derive(Clone)]
108pub struct BatchEnter<B, TInner> {
109 batch: B,
110 description: Description<TInner>,
111}
112
113impl<B, TInner> BatchReader for BatchEnter<B, TInner>
114where
115 B: BatchReader,
116 TInner: Refines<B::Time>+Lattice,
117{
118 type Key<'a> = B::Key<'a>;
119 type Val<'a> = B::Val<'a>;
120 type Time = TInner;
121 type TimeGat<'a> = &'a TInner;
122 type Diff = B::Diff;
123 type DiffGat<'a> = B::DiffGat<'a>;
124
125 type Cursor = BatchCursorEnter<B::Cursor, TInner>;
126
127 fn cursor(&self) -> Self::Cursor {
128 BatchCursorEnter::new(self.batch.cursor())
129 }
130 fn len(&self) -> usize { self.batch.len() }
131 fn description(&self) -> &Description<TInner> { &self.description }
132}
133
134impl<B, TInner> BatchEnter<B, TInner>
135where
136 B: BatchReader,
137 TInner: Refines<B::Time>+Lattice,
138{
139 pub fn make_from(batch: B) -> Self {
141 let lower: Vec<_> = batch.description().lower().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
142 let upper: Vec<_> = batch.description().upper().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
143 let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
144
145 BatchEnter {
146 batch,
147 description: Description::new(Antichain::from(lower), Antichain::from(upper), Antichain::from(since))
148 }
149 }
150}
151
152pub struct CursorEnter<C, TInner> {
154 phantom: ::std::marker::PhantomData<TInner>,
155 cursor: C,
156}
157
158impl<C, TInner> CursorEnter<C, TInner> {
159 fn new(cursor: C) -> Self {
160 CursorEnter {
161 phantom: ::std::marker::PhantomData,
162 cursor,
163 }
164 }
165}
166
167impl<C, TInner> Cursor for CursorEnter<C, TInner>
168where
169 C: Cursor,
170 TInner: Refines<C::Time>+Lattice,
171{
172 type Key<'a> = C::Key<'a>;
173 type Val<'a> = C::Val<'a>;
174 type Time = TInner;
175 type TimeGat<'a> = &'a TInner;
176 type Diff = C::Diff;
177 type DiffGat<'a> = C::DiffGat<'a>;
178
179 type Storage = C::Storage;
180
181 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
182 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
183
184 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
185 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
186
187 #[inline]
188 fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
189 use crate::IntoOwned;
190 self.cursor.map_times(storage, |time, diff| {
191 logic(&TInner::to_inner(time.into_owned()), diff)
192 })
193 }
194
195 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
196 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
197
198 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
199 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
200
201 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
202 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
203}
204
205
206
207pub struct BatchCursorEnter<C, TInner> {
209 phantom: ::std::marker::PhantomData<TInner>,
210 cursor: C,
211}
212
213impl<C, TInner> BatchCursorEnter<C, TInner> {
214 fn new(cursor: C) -> Self {
215 BatchCursorEnter {
216 phantom: ::std::marker::PhantomData,
217 cursor,
218 }
219 }
220}
221
222impl<TInner, C: Cursor> Cursor for BatchCursorEnter<C, TInner>
223where
224 TInner: Refines<C::Time>+Lattice,
225{
226 type Key<'a> = C::Key<'a>;
227 type Val<'a> = C::Val<'a>;
228 type Time = TInner;
229 type TimeGat<'a> = &'a TInner;
230 type Diff = C::Diff;
231 type DiffGat<'a> = C::DiffGat<'a>;
232
233 type Storage = BatchEnter<C::Storage, TInner>;
234
235 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
236 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
237
238 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
239 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
240
241 #[inline]
242 fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
243 use crate::IntoOwned;
244 self.cursor.map_times(&storage.batch, |time, diff| {
245 logic(&TInner::to_inner(time.into_owned()), diff)
246 })
247 }
248
249 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
250 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
251
252 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
253 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
254
255 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
256 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
257}