differential_dataflow/trace/wrappers/
filter.rs

1//! Wrapper for filtered trace.
2
3use timely::progress::frontier::AntichainRef;
4
5use crate::trace::{TraceReader, BatchReader, Description};
6use crate::trace::cursor::Cursor;
7
8/// Wrapper to provide trace to nested scope.
9pub struct TraceFilter<Tr, F> {
10    trace: Tr,
11    logic: F,
12}
13
14impl<Tr,F> Clone for TraceFilter<Tr, F>
15where
16    Tr: TraceReader+Clone,
17    F: Clone,
18{
19    fn clone(&self) -> Self {
20        TraceFilter {
21            trace: self.trace.clone(),
22            logic: self.logic.clone(),
23        }
24    }
25}
26
27impl<Tr, F> TraceReader for TraceFilter<Tr, F>
28where
29    Tr: TraceReader<Batch: Clone>,
30    F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static,
31{
32    type Key<'a> = Tr::Key<'a>;
33    type Val<'a> = Tr::Val<'a>;
34    type Time = Tr::Time;
35    type TimeGat<'a> = Tr::TimeGat<'a>;
36    type Diff = Tr::Diff;
37    type DiffGat<'a> = Tr::DiffGat<'a>;
38
39    type Batch = BatchFilter<Tr::Batch, F>;
40    type Storage = Tr::Storage;
41    type Cursor = CursorFilter<Tr::Cursor, F>;
42
43    fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
44        let logic = self.logic.clone();
45        self.trace
46            .map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), logic.clone())))
47    }
48
49    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
50    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
51
52    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
53    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
54
55    fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
56        self.trace.cursor_through(upper).map(|(x,y)| (CursorFilter::new(x, self.logic.clone()), y))
57    }
58}
59
60impl<Tr: TraceReader, F> TraceFilter<Tr, F> {
61    /// Makes a new trace wrapper
62    pub fn make_from(trace: Tr, logic: F) -> Self {
63        TraceFilter {
64            trace,
65            logic,
66        }
67    }
68}
69
70
71/// Wrapper to provide batch to nested scope.
72#[derive(Clone)]
73pub struct BatchFilter<B, F> {
74    batch: B,
75    logic: F,
76}
77
78impl<B, F> BatchReader for BatchFilter<B, F>
79where
80    B: BatchReader,
81    F: FnMut(B::Key<'_>, B::Val<'_>)->bool+Clone+'static
82{
83    type Key<'a> = B::Key<'a>;
84    type Val<'a> = B::Val<'a>;
85    type Time = B::Time;
86    type TimeGat<'a> = B::TimeGat<'a>;
87    type Diff = B::Diff;
88    type DiffGat<'a> = B::DiffGat<'a>;
89
90    type Cursor = BatchCursorFilter<B::Cursor, F>;
91
92    fn cursor(&self) -> Self::Cursor {
93        BatchCursorFilter::new(self.batch.cursor(), self.logic.clone())
94    }
95    fn len(&self) -> usize { self.batch.len() }
96    fn description(&self) -> &Description<B::Time> { self.batch.description() }
97}
98
99impl<B: BatchReader, F> BatchFilter<B, F> {
100    /// Makes a new batch wrapper
101    pub fn make_from(batch: B, logic: F) -> Self {
102        BatchFilter {
103            batch,
104            logic,
105        }
106    }
107}
108
109/// Wrapper to provide cursor to nested scope.
110pub struct CursorFilter<C, F> {
111    cursor: C,
112    logic: F,
113}
114
115impl<C, F> CursorFilter<C, F> {
116    fn new(cursor: C, logic: F) -> Self {
117        CursorFilter {
118            cursor,
119            logic,
120        }
121    }
122}
123
124impl<C, F> Cursor for CursorFilter<C, F>
125where
126    C: Cursor,
127    F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static
128{
129    type Key<'a> = C::Key<'a>;
130    type Val<'a> = C::Val<'a>;
131    type Time = C::Time;
132    type TimeGat<'a> = C::TimeGat<'a>;
133    type Diff = C::Diff;
134    type DiffGat<'a> = C::DiffGat<'a>;
135
136    type Storage = C::Storage;
137
138    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
139    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
140
141    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
142    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
143
144    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
145    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
146
147    #[inline]
148    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
149        let key = self.key(storage);
150        let val = self.val(storage);
151        if (self.logic)(key, val) {
152            self.cursor.map_times(storage, logic)
153        }
154    }
155
156    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
157    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
158
159    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
160    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
161
162    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
163    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
164}
165
166
167
168/// Wrapper to provide cursor to nested scope.
169pub struct BatchCursorFilter<C, F> {
170    cursor: C,
171    logic: F,
172}
173
174impl<C, F> BatchCursorFilter<C, F> {
175    fn new(cursor: C, logic: F) -> Self {
176        BatchCursorFilter {
177            cursor,
178            logic,
179        }
180    }
181}
182
183impl<C: Cursor, F> Cursor for BatchCursorFilter<C, F>
184where
185    F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static,
186{
187    type Key<'a> = C::Key<'a>;
188    type Val<'a> = C::Val<'a>;
189    type Time = C::Time;
190    type TimeGat<'a> = C::TimeGat<'a>;
191    type Diff = C::Diff;
192    type DiffGat<'a> = C::DiffGat<'a>;
193
194    type Storage = BatchFilter<C::Storage, F>;
195
196    #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
197    #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
198
199    #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
200    #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
201
202    #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
203    #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
204
205    #[inline]
206    fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
207        let key = self.key(storage);
208        let val = self.val(storage);
209        if (self.logic)(key, val) {
210            self.cursor.map_times(&storage.batch, logic)
211        }
212    }
213
214    #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
215    #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
216
217    #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
218    #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
219
220    #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
221    #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
222}