differential_dataflow/trace/wrappers/
freeze.rs

1//! Wrappers to transform the timestamps of updates.
2//!
3//! These wrappers are primarily intended to support the re-use of a multi-version index
4//! as if it were frozen at a particular (nested) timestamp. For example, if one wants to
5//! re-use an index multiple times with minor edits, and only observe the edits at one
6//! logical time (meaning: observing all edits less or equal to that time, advanced to that
7//! time), this should allow that behavior.
8//!
9//! Informally, this wrapper is parameterized by a function `F: Fn(&T)->Option<T>` which
10//! provides the opportunity to alter the time at which an update happens and to suppress
11//! that update, if appropriate. For example, the function
12//!
13//! ```ignore
14//! |t| if t.inner <= 10 { let mut t = t.clone(); t.inner = 10; Some(t) } else { None }
15//! ```
16//!
17//! could be used to present all updates through inner iteration 10, but advanced to inner
18//! iteration 10, as if they all occurred exactly at that moment.
19
20use std::rc::Rc;
21
22use timely::dataflow::Scope;
23use timely::dataflow::operators::Map;
24use timely::progress::frontier::AntichainRef;
25
26use crate::operators::arrange::Arranged;
27use crate::trace::{TraceReader, BatchReader, Description};
28use crate::trace::cursor::Cursor;
29
30/// Freezes updates to an arrangement using a supplied function.
31///
32/// This method is experimental, and should be used with care. The intent is that the function
33/// `func` can be used to restrict and lock in updates at a particular time, as suggested in the
34/// module-level documentation.
35pub fn freeze<G, T, F>(arranged: &Arranged<G, T>, func: F) -> Arranged<G, TraceFreeze<T, F>>
36where
37    G: Scope<Timestamp=T::Time>,
38    T: TraceReader+Clone,
39    F: Fn(T::TimeGat<'_>)->Option<T::Time>+'static,
40{
41    let func1 = Rc::new(func);
42    let func2 = func1.clone();
43    Arranged {
44        stream: arranged.stream.map(move |bw| BatchFreeze::make_from(bw, func1.clone())),
45        trace: TraceFreeze::make_from(arranged.trace.clone(), func2),
46    }
47}
48
49/// Wrapper to provide trace to nested scope.
50pub struct TraceFreeze<Tr, F>
51where
52    Tr: TraceReader,
53    F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
54{
55    trace: Tr,
56    func: Rc<F>,
57}
58
59impl<Tr,F> Clone for TraceFreeze<Tr, F>
60where
61    Tr: TraceReader+Clone,
62    F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
63{
64    fn clone(&self) -> Self {
65        TraceFreeze {
66            trace: self.trace.clone(),
67            func: self.func.clone(),
68        }
69    }
70}
71
72impl<Tr, F> WithLayout for TraceFreeze<Tr, F>
73where
74    Tr: TraceReader,
75    F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
76{
77    type Layout = (
78        <Tr::Layout as Layout>::KeyContainer,
79        <Tr::Layout as Layout>::ValContainer,
80        Vec<Tr::Time>,
81        <Tr::Layout as Layout>::DiffContainer,
82        <Tr::Layout as Layout>::OffsetContainer,
83    );
84}
85
86impl<Tr, F> TraceReader for TraceFreeze<Tr, F>
87where
88    Tr: TraceReader<Batch: Clone>,
89    F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>+'static,
90{
91    type Batch = BatchFreeze<Tr::Batch, F>;
92    type Storage = Tr::Storage;
93    type Cursor = CursorFreeze<Tr::Cursor, F>;
94
95    fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
96        let func = &self.func;
97        self.trace.map_batches(|batch| {
98            f(&Self::Batch::make_from(batch.clone(), func.clone()));
99        })
100    }
101
102    fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) }
103    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() }
104
105    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) }
106    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() }
107
108    fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
109        let func = &self.func;
110        self.trace.cursor_through(upper)
111            .map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage))
112    }
113}
114
115impl<Tr, F> TraceFreeze<Tr, F>
116where
117    Tr: TraceReader<Batch: Clone>,
118    F: Fn(Tr::TimeGat<'_>)->Option<Tr::Time>,
119{
120    /// Makes a new trace wrapper
121    pub fn make_from(trace: Tr, func: Rc<F>) -> Self {
122        Self { trace, func }
123    }
124}
125
126
127/// Wrapper to provide batch to nested scope.
128pub struct BatchFreeze<B, F> {
129    batch: B,
130    func: Rc<F>,
131}
132
133impl<B: Clone, F> Clone for BatchFreeze<B, F> {
134    fn clone(&self) -> Self {
135        BatchFreeze {
136            batch: self.batch.clone(),
137            func: self.func.clone(),
138        }
139    }
140}
141
142impl<B, F> WithLayout for BatchFreeze<B, F>
143where
144    B: BatchReader,
145    F: Fn(B::TimeGat<'_>)->Option<B::Time>,
146{
147    type Layout = (
148        <B::Layout as Layout>::KeyContainer,
149        <B::Layout as Layout>::ValContainer,
150        Vec<B::Time>,
151        <B::Layout as Layout>::DiffContainer,
152        <B::Layout as Layout>::OffsetContainer,
153    );
154}
155
156impl<B, F> BatchReader for BatchFreeze<B, F>
157where
158    B: BatchReader,
159    F: Fn(B::TimeGat<'_>)->Option<B::Time>,
160{
161    type Cursor = BatchCursorFreeze<B::Cursor, F>;
162
163    fn cursor(&self) -> Self::Cursor {
164        BatchCursorFreeze::new(self.batch.cursor(), self.func.clone())
165    }
166    fn len(&self) -> usize { self.batch.len() }
167    fn description(&self) -> &Description<B::Time> { self.batch.description() }
168}
169
170impl<B, F> BatchFreeze<B, F>
171where
172    B: BatchReader,
173    F: Fn(B::TimeGat<'_>)->Option<B::Time>
174{
175    /// Makes a new batch wrapper
176    pub fn make_from(batch: B, func: Rc<F>) -> Self {
177        Self { batch, func }
178    }
179}
180
181/// Wrapper to provide cursor to nested scope.
182pub struct CursorFreeze<C, F> {
183    cursor: C,
184    func: Rc<F>,
185}
186
187use crate::trace::implementations::{Layout, WithLayout};
188impl<C: Cursor, F> WithLayout for CursorFreeze<C, F> {
189    type Layout = (
190        <C::Layout as Layout>::KeyContainer,
191        <C::Layout as Layout>::ValContainer,
192        Vec<C::Time>,
193        <C::Layout as Layout>::DiffContainer,
194        <C::Layout as Layout>::OffsetContainer,
195    );
196}
197
198impl<C, F> CursorFreeze<C, F> {
199    fn new(cursor: C, func: Rc<F>) -> Self {
200        Self { cursor, func }
201    }
202}
203
204impl<C, F> Cursor for CursorFreeze<C, F>
205where
206    C: Cursor,
207    F: Fn(C::TimeGat<'_>)->Option<C::Time>,
208{
209    type Storage = C::Storage;
210
211    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
212    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
213
214    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
215    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
216
217    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
218    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
219
220    #[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
221        let func = &self.func;
222        self.cursor.map_times(storage, |time, diff| {
223            if let Some(time) = func(time) {
224                logic(&time, diff);
225            }
226        })
227    }
228
229    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
230    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
231
232    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
233    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
234
235    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
236    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
237}
238
239
240/// Wrapper to provide cursor to nested scope.
241pub struct BatchCursorFreeze<C, F> {
242    cursor: C,
243    func: Rc<F>,
244}
245
246impl<C: Cursor, F> WithLayout for BatchCursorFreeze<C, F> {
247    type Layout = (
248        <C::Layout as Layout>::KeyContainer,
249        <C::Layout as Layout>::ValContainer,
250        Vec<C::Time>,
251        <C::Layout as Layout>::DiffContainer,
252        <C::Layout as Layout>::OffsetContainer,
253    );
254}
255
256impl<C, F> BatchCursorFreeze<C, F> {
257    fn new(cursor: C, func: Rc<F>) -> Self {
258        Self { cursor, func }
259    }
260}
261
262// impl<C: Cursor<Storage=B, Time=B::Time>, B: BatchReader<Cursor=C>, F> Cursor for BatchCursorFreeze<B, F>
263impl<C: Cursor, F> Cursor for BatchCursorFreeze<C, F>
264where
265    F: Fn(C::TimeGat<'_>)->Option<C::Time>,
266{
267    type Storage = BatchFreeze<C::Storage, F>;
268
269    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
270    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
271
272    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
273    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
274
275    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
276    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
277
278    #[inline] fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
279        let func = &self.func;
280        self.cursor.map_times(&storage.batch, |time, diff| {
281            if let Some(time) = func(time) {
282                logic(&time, diff);
283            }
284        })
285    }
286
287    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
288    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
289
290    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
291    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
292
293    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
294    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
295}