differential_dataflow/trace/wrappers/
frontier.rs1use timely::progress::{Antichain, frontier::AntichainRef};
10
11use crate::trace::{TraceReader, BatchReader, Description};
12use crate::trace::cursor::Cursor;
13use crate::lattice::Lattice;
14
15pub struct TraceFrontier<Tr: TraceReader> {
17 trace: Tr,
18 since: Antichain<Tr::Time>,
20 until: Antichain<Tr::Time>,
22}
23
24impl<Tr: TraceReader + Clone> Clone for TraceFrontier<Tr> {
25 fn clone(&self) -> Self {
26 TraceFrontier {
27 trace: self.trace.clone(),
28 since: self.since.clone(),
29 until: self.until.clone(),
30 }
31 }
32}
33
34impl<Tr: TraceReader> WithLayout for TraceFrontier<Tr> {
35 type Layout = (
36 <Tr::Layout as Layout>::KeyContainer,
37 <Tr::Layout as Layout>::ValContainer,
38 Vec<Tr::Time>,
39 <Tr::Layout as Layout>::DiffContainer,
40 <Tr::Layout as Layout>::OffsetContainer,
41 );
42}
43
44impl<Tr: TraceReader> TraceReader for TraceFrontier<Tr> {
45
46 type Batch = BatchFrontier<Tr::Batch>;
47 type Storage = Tr::Storage;
48 type Cursor = CursorFrontier<Tr::Cursor, Tr::Time>;
49
50 fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
51 let since = self.since.borrow();
52 let until = self.until.borrow();
53 self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until)))
54 }
55
56 fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) }
57 fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() }
58
59 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) }
60 fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() }
61
62 fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
63 let since = self.since.borrow();
64 let until = self.until.borrow();
65 self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, since, until), y))
66 }
67}
68
69impl<Tr: TraceReader> TraceFrontier<Tr> {
70 pub fn make_from(trace: Tr, since: AntichainRef<'_, Tr::Time>, until: AntichainRef<'_, Tr::Time>) -> Self {
72 TraceFrontier {
73 trace,
74 since: since.to_owned(),
75 until: until.to_owned(),
76 }
77 }
78}
79
80
81#[derive(Clone)]
83pub struct BatchFrontier<B: BatchReader> {
84 batch: B,
85 since: Antichain<B::Time>,
86 until: Antichain<B::Time>,
87}
88
89impl<B: BatchReader> WithLayout for BatchFrontier<B> {
90 type Layout = (
91 <B::Layout as Layout>::KeyContainer,
92 <B::Layout as Layout>::ValContainer,
93 Vec<B::Time>,
94 <B::Layout as Layout>::DiffContainer,
95 <B::Layout as Layout>::OffsetContainer,
96 );
97}
98
99impl<B: BatchReader> BatchReader for BatchFrontier<B> {
100
101 type Cursor = BatchCursorFrontier<B::Cursor>;
102
103 fn cursor(&self) -> Self::Cursor {
104 BatchCursorFrontier::new(self.batch.cursor(), self.since.borrow(), self.until.borrow())
105 }
106 fn len(&self) -> usize { self.batch.len() }
107 fn description(&self) -> &Description<B::Time> { self.batch.description() }
108}
109
110impl<B: BatchReader> BatchFrontier<B> {
111 pub fn make_from(batch: B, since: AntichainRef<B::Time>, until: AntichainRef<B::Time>) -> Self {
113 BatchFrontier {
114 batch,
115 since: since.to_owned(),
116 until: until.to_owned(),
117 }
118 }
119}
120
121pub struct CursorFrontier<C, T> {
123 cursor: C,
124 since: Antichain<T>,
125 until: Antichain<T>
126}
127
128use crate::trace::implementations::{Layout, WithLayout};
129impl<C: Cursor> WithLayout for CursorFrontier<C, C::Time> {
130 type Layout = (
131 <C::Layout as Layout>::KeyContainer,
132 <C::Layout as Layout>::ValContainer,
133 Vec<C::Time>,
134 <C::Layout as Layout>::DiffContainer,
135 <C::Layout as Layout>::OffsetContainer,
136 );
137}
138
139impl<C, T: Clone> CursorFrontier<C, T> {
140 fn new(cursor: C, since: AntichainRef<T>, until: AntichainRef<T>) -> Self {
141 CursorFrontier {
142 cursor,
143 since: since.to_owned(),
144 until: until.to_owned(),
145 }
146 }
147}
148
149impl<C: Cursor> Cursor for CursorFrontier<C, C::Time> {
150
151 type Storage = C::Storage;
152
153 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
154 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
155
156 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
157 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
158
159 #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
160 #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
161
162 #[inline]
163 fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
164 let since = self.since.borrow();
165 let until = self.until.borrow();
166 let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
167 self.cursor.map_times(storage, |time, diff| {
168 C::clone_time_onto(time, &mut temp);
169 temp.advance_by(since);
170 if !until.less_equal(&temp) {
171 logic(&temp, diff);
172 }
173 })
174 }
175
176 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
177 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
178
179 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
180 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
181
182 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
183 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
184}
185
186
187
188pub struct BatchCursorFrontier<C: Cursor> {
190 cursor: C,
191 since: Antichain<C::Time>,
192 until: Antichain<C::Time>,
193}
194
195impl<C: Cursor> WithLayout for BatchCursorFrontier<C> {
196 type Layout = (
197 <C::Layout as Layout>::KeyContainer,
198 <C::Layout as Layout>::ValContainer,
199 Vec<C::Time>,
200 <C::Layout as Layout>::DiffContainer,
201 <C::Layout as Layout>::OffsetContainer,
202 );
203}
204
205impl<C: Cursor> BatchCursorFrontier<C> {
206 fn new(cursor: C, since: AntichainRef<C::Time>, until: AntichainRef<C::Time>) -> Self {
207 BatchCursorFrontier {
208 cursor,
209 since: since.to_owned(),
210 until: until.to_owned(),
211 }
212 }
213}
214
215impl<C: Cursor<Storage: BatchReader>> Cursor for BatchCursorFrontier<C> {
216
217 type Storage = BatchFrontier<C::Storage>;
218
219 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
220 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
221
222 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
223 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
224
225 #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
226 #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
227
228 #[inline]
229 fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
230 let since = self.since.borrow();
231 let until = self.until.borrow();
232 let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
233 self.cursor.map_times(&storage.batch, |time, diff| {
234 C::clone_time_onto(time, &mut temp);
235 temp.advance_by(since);
236 if !until.less_equal(&temp) {
237 logic(&temp, diff);
238 }
239 })
240 }
241
242 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
243 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
244
245 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
246 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
247
248 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
249 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
250}