1use std::rc::Rc;
21
22use timely::dataflow::Scope;
23use timely::dataflow::operators::Map;
24use timely::progress::frontier::AntichainRef;
25
26use crate::operators::arrange::Arranged;
27use crate::trace::{TraceReader, BatchReader, Description};
28use crate::trace::cursor::Cursor;
29
30pub fn freeze<G, T, F>(arranged: &Arranged<G, T>, func: F) -> Arranged<G, TraceFreeze<T, F>>
36where
37 G: Scope<Timestamp=T::Time>,
38 T: TraceReader+Clone,
39 F: Fn(T::TimeGat<'_>)->Option<T::Time>+'static,
40{
41 let func1 = Rc::new(func);
42 let func2 = func1.clone();
43 Arranged {
44 stream: arranged.stream.map(move |bw| BatchFreeze::make_from(bw, func1.clone())),
45 trace: TraceFreeze::make_from(arranged.trace.clone(), func2),
46 }
47}
48
49pub struct TraceFreeze<Tr, F>
51where
52 Tr: TraceReader,
53 F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
54{
55 trace: Tr,
56 func: Rc<F>,
57}
58
59impl<Tr,F> Clone for TraceFreeze<Tr, F>
60where
61 Tr: TraceReader+Clone,
62 F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
63{
64 fn clone(&self) -> Self {
65 TraceFreeze {
66 trace: self.trace.clone(),
67 func: self.func.clone(),
68 }
69 }
70}
71
72impl<Tr, F> WithLayout for TraceFreeze<Tr, F>
73where
74 Tr: TraceReader,
75 F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
76{
77 type Layout = (
78 <Tr::Layout as Layout>::KeyContainer,
79 <Tr::Layout as Layout>::ValContainer,
80 Vec<Tr::Time>,
81 <Tr::Layout as Layout>::DiffContainer,
82 <Tr::Layout as Layout>::OffsetContainer,
83 );
84}
85
86impl<Tr, F> TraceReader for TraceFreeze<Tr, F>
87where
88 Tr: TraceReader<Batch: Clone>,
89 F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>+'static,
90{
91 type Batch = BatchFreeze<Tr::Batch, F>;
92 type Storage = Tr::Storage;
93 type Cursor = CursorFreeze<Tr::Cursor, F>;
94
95 fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
96 let func = &self.func;
97 self.trace.map_batches(|batch| {
98 f(&Self::Batch::make_from(batch.clone(), func.clone()));
99 })
100 }
101
102 fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) }
103 fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() }
104
105 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) }
106 fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() }
107
108 fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
109 let func = &self.func;
110 self.trace.cursor_through(upper)
111 .map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage))
112 }
113}
114
115impl<Tr, F> TraceFreeze<Tr, F>
116where
117 Tr: TraceReader<Batch: Clone>,
118 F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
119{
120 pub fn make_from(trace: Tr, func: Rc<F>) -> Self {
122 Self { trace, func }
123 }
124}
125
126
127pub struct BatchFreeze<B, F> {
129 batch: B,
130 func: Rc<F>,
131}
132
133impl<B: Clone, F> Clone for BatchFreeze<B, F> {
134 fn clone(&self) -> Self {
135 BatchFreeze {
136 batch: self.batch.clone(),
137 func: self.func.clone(),
138 }
139 }
140}
141
142impl<B, F> WithLayout for BatchFreeze<B, F>
143where
144 B: BatchReader,
145 F: Fn(B::TimeGat<'_>)->Option<B::Time>,
146{
147 type Layout = (
148 <B::Layout as Layout>::KeyContainer,
149 <B::Layout as Layout>::ValContainer,
150 Vec<B::Time>,
151 <B::Layout as Layout>::DiffContainer,
152 <B::Layout as Layout>::OffsetContainer,
153 );
154}
155
156impl<B, F> BatchReader for BatchFreeze<B, F>
157where
158 B: BatchReader,
159 F: Fn(B::TimeGat<'_>)->Option<B::Time>,
160{
161 type Cursor = BatchCursorFreeze<B::Cursor, F>;
162
163 fn cursor(&self) -> Self::Cursor {
164 BatchCursorFreeze::new(self.batch.cursor(), self.func.clone())
165 }
166 fn len(&self) -> usize { self.batch.len() }
167 fn description(&self) -> &Description<B::Time> { self.batch.description() }
168}
169
170impl<B, F> BatchFreeze<B, F>
171where
172 B: BatchReader,
173 F: Fn(B::TimeGat<'_>)->Option<B::Time>
174{
175 pub fn make_from(batch: B, func: Rc<F>) -> Self {
177 Self { batch, func }
178 }
179}
180
181pub struct CursorFreeze<C, F> {
183 cursor: C,
184 func: Rc<F>,
185}
186
187use crate::trace::implementations::{Layout, WithLayout};
188impl<C: Cursor, F> WithLayout for CursorFreeze<C, F> {
189 type Layout = (
190 <C::Layout as Layout>::KeyContainer,
191 <C::Layout as Layout>::ValContainer,
192 Vec<C::Time>,
193 <C::Layout as Layout>::DiffContainer,
194 <C::Layout as Layout>::OffsetContainer,
195 );
196}
197
198impl<C, F> CursorFreeze<C, F> {
199 fn new(cursor: C, func: Rc<F>) -> Self {
200 Self { cursor, func }
201 }
202}
203
204impl<C, F> Cursor for CursorFreeze<C, F>
205where
206 C: Cursor,
207 F: Fn(C::TimeGat<'_>)->Option<C::Time>,
208{
209 type Storage = C::Storage;
210
211 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
212 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
213
214 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
215 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
216
217 #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
218 #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
219
220 #[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
221 let func = &self.func;
222 self.cursor.map_times(storage, |time, diff| {
223 if let Some(time) = func(time) {
224 logic(&time, diff);
225 }
226 })
227 }
228
229 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
230 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
231
232 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
233 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
234
235 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
236 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
237}
238
239
240pub struct BatchCursorFreeze<C, F> {
242 cursor: C,
243 func: Rc<F>,
244}
245
246impl<C: Cursor, F> WithLayout for BatchCursorFreeze<C, F> {
247 type Layout = (
248 <C::Layout as Layout>::KeyContainer,
249 <C::Layout as Layout>::ValContainer,
250 Vec<C::Time>,
251 <C::Layout as Layout>::DiffContainer,
252 <C::Layout as Layout>::OffsetContainer,
253 );
254}
255
256impl<C, F> BatchCursorFreeze<C, F> {
257 fn new(cursor: C, func: Rc<F>) -> Self {
258 Self { cursor, func }
259 }
260}
261
262impl<C: Cursor, F> Cursor for BatchCursorFreeze<C, F>
264where
265 F: Fn(C::TimeGat<'_>)->Option<C::Time>,
266{
267 type Storage = BatchFreeze<C::Storage, F>;
268
269 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
270 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
271
272 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
273 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
274
275 #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
276 #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
277
278 #[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
279 let func = &self.func;
280 self.cursor.map_times(&storage.batch, |time, diff| {
281 if let Some(time) = func(time) {
282 logic(&time, diff);
283 }
284 })
285 }
286
287 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
288 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
289
290 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
291 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
292
293 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
294 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
295}