differential_dataflow/trace/wrappers/
filter.rs

1//! Wrapper for filtered trace.
2
3use timely::progress::frontier::AntichainRef;
4
5use crate::trace::{TraceReader, BatchReader, Description};
6use crate::trace::cursor::Cursor;
7
8/// Wrapper to provide trace to nested scope.
9pub struct TraceFilter<Tr, F> {
10    trace: Tr,
11    logic: F,
12}
13
14impl<Tr,F> Clone for TraceFilter<Tr, F>
15where
16    Tr: TraceReader+Clone,
17    F: Clone,
18{
19    fn clone(&self) -> Self {
20        TraceFilter {
21            trace: self.trace.clone(),
22            logic: self.logic.clone(),
23        }
24    }
25}
26
27impl<Tr: TraceReader, F> WithLayout for TraceFilter<Tr, F> {
28    type Layout = Tr::Layout;
29}
30
31impl<Tr, F> TraceReader for TraceFilter<Tr, F>
32where
33    Tr: TraceReader<Batch: Clone>,
34    F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static,
35{
36    type Batch = BatchFilter<Tr::Batch, F>;
37    type Storage = Tr::Storage;
38    type Cursor = CursorFilter<Tr::Cursor, F>;
39
40    fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
41        let logic = self.logic.clone();
42        self.trace
43            .map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), logic.clone())))
44    }
45
46    fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) }
47    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() }
48
49    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) }
50    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() }
51
52    fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
53        self.trace.cursor_through(upper).map(|(x,y)| (CursorFilter::new(x, self.logic.clone()), y))
54    }
55}
56
57impl<Tr: TraceReader, F> TraceFilter<Tr, F> {
58    /// Makes a new trace wrapper
59    pub fn make_from(trace: Tr, logic: F) -> Self {
60        TraceFilter {
61            trace,
62            logic,
63        }
64    }
65}
66
67
68/// Wrapper to provide batch to nested scope.
69#[derive(Clone)]
70pub struct BatchFilter<B, F> {
71    batch: B,
72    logic: F,
73}
74
75impl<B: BatchReader, F> WithLayout for BatchFilter<B, F> {
76    type Layout = B::Layout;
77}
78
79impl<B, F> BatchReader for BatchFilter<B, F>
80where
81    B: BatchReader,
82    F: FnMut(B::Key<'_>, B::Val<'_>)->bool+Clone+'static
83{
84    type Cursor = BatchCursorFilter<B::Cursor, F>;
85
86    fn cursor(&self) -> Self::Cursor {
87        BatchCursorFilter::new(self.batch.cursor(), self.logic.clone())
88    }
89    fn len(&self) -> usize { self.batch.len() }
90    fn description(&self) -> &Description<B::Time> { self.batch.description() }
91}
92
93impl<B: BatchReader, F> BatchFilter<B, F> {
94    /// Makes a new batch wrapper
95    pub fn make_from(batch: B, logic: F) -> Self {
96        BatchFilter {
97            batch,
98            logic,
99        }
100    }
101}
102
103/// Wrapper to provide cursor to nested scope.
104pub struct CursorFilter<C, F> {
105    cursor: C,
106    logic: F,
107}
108
109use crate::trace::implementations::WithLayout;
110impl<C: Cursor, F> WithLayout for CursorFilter<C, F> {
111    type Layout = C::Layout;
112}
113
114impl<C, F> CursorFilter<C, F> {
115    fn new(cursor: C, logic: F) -> Self {
116        CursorFilter {
117            cursor,
118            logic,
119        }
120    }
121}
122
123impl<C, F> Cursor for CursorFilter<C, F>
124where
125    C: Cursor,
126    F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static
127{
128    type Storage = C::Storage;
129
130    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
131    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
132
133    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
134    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
135
136    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
137    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
138
139    #[inline]
140    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
141        let key = self.key(storage);
142        let val = self.val(storage);
143        if (self.logic)(key, val) {
144            self.cursor.map_times(storage, logic)
145        }
146    }
147
148    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
149    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
150
151    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
152    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
153
154    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
155    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
156}
157
158
159
160/// Wrapper to provide cursor to nested scope.
161pub struct BatchCursorFilter<C, F> {
162    cursor: C,
163    logic: F,
164}
165
166impl<C: Cursor, F> WithLayout for BatchCursorFilter<C, F> {
167    type Layout = C::Layout;
168}
169
170impl<C, F> BatchCursorFilter<C, F> {
171    fn new(cursor: C, logic: F) -> Self {
172        BatchCursorFilter {
173            cursor,
174            logic,
175        }
176    }
177}
178
179impl<C: Cursor, F> Cursor for BatchCursorFilter<C, F>
180where
181    F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static,
182{
183    type Storage = BatchFilter<C::Storage, F>;
184
185    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
186    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
187
188    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
189    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
190
191    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
192    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
193
194    #[inline]
195    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
196        let key = self.key(storage);
197        let val = self.val(storage);
198        if (self.logic)(key, val) {
199            self.cursor.map_times(&storage.batch, logic)
200        }
201    }
202
203    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
204    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
205
206    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
207    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
208
209    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
210    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
211}