differential_dataflow/trace/wrappers/
filter.rs1use timely::progress::frontier::AntichainRef;
4
5use crate::trace::{TraceReader, BatchReader, Description};
6use crate::trace::cursor::Cursor;
7
8pub struct TraceFilter<Tr, F> {
10 trace: Tr,
11 logic: F,
12}
13
14impl<Tr,F> Clone for TraceFilter<Tr, F>
15where
16 Tr: TraceReader+Clone,
17 F: Clone,
18{
19 fn clone(&self) -> Self {
20 TraceFilter {
21 trace: self.trace.clone(),
22 logic: self.logic.clone(),
23 }
24 }
25}
26
27impl<Tr: TraceReader, F> WithLayout for TraceFilter<Tr, F> {
28 type Layout = Tr::Layout;
29}
30
31impl<Tr, F> TraceReader for TraceFilter<Tr, F>
32where
33 Tr: TraceReader<Batch: Clone>,
34 F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static,
35{
36 type Batch = BatchFilter<Tr::Batch, F>;
37 type Storage = Tr::Storage;
38 type Cursor = CursorFilter<Tr::Cursor, F>;
39
40 fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
41 let logic = self.logic.clone();
42 self.trace
43 .map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), logic.clone())))
44 }
45
46 fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) }
47 fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() }
48
49 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) }
50 fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() }
51
52 fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
53 self.trace.cursor_through(upper).map(|(x,y)| (CursorFilter::new(x, self.logic.clone()), y))
54 }
55}
56
57impl<Tr: TraceReader, F> TraceFilter<Tr, F> {
58 pub fn make_from(trace: Tr, logic: F) -> Self {
60 TraceFilter {
61 trace,
62 logic,
63 }
64 }
65}
66
67
68#[derive(Clone)]
70pub struct BatchFilter<B, F> {
71 batch: B,
72 logic: F,
73}
74
75impl<B: BatchReader, F> WithLayout for BatchFilter<B, F> {
76 type Layout = B::Layout;
77}
78
79impl<B, F> BatchReader for BatchFilter<B, F>
80where
81 B: BatchReader,
82 F: FnMut(B::Key<'_>, B::Val<'_>)->bool+Clone+'static
83{
84 type Cursor = BatchCursorFilter<B::Cursor, F>;
85
86 fn cursor(&self) -> Self::Cursor {
87 BatchCursorFilter::new(self.batch.cursor(), self.logic.clone())
88 }
89 fn len(&self) -> usize { self.batch.len() }
90 fn description(&self) -> &Description<B::Time> { self.batch.description() }
91}
92
93impl<B: BatchReader, F> BatchFilter<B, F> {
94 pub fn make_from(batch: B, logic: F) -> Self {
96 BatchFilter {
97 batch,
98 logic,
99 }
100 }
101}
102
103pub struct CursorFilter<C, F> {
105 cursor: C,
106 logic: F,
107}
108
109use crate::trace::implementations::WithLayout;
110impl<C: Cursor, F> WithLayout for CursorFilter<C, F> {
111 type Layout = C::Layout;
112}
113
114impl<C, F> CursorFilter<C, F> {
115 fn new(cursor: C, logic: F) -> Self {
116 CursorFilter {
117 cursor,
118 logic,
119 }
120 }
121}
122
123impl<C, F> Cursor for CursorFilter<C, F>
124where
125 C: Cursor,
126 F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static
127{
128 type Storage = C::Storage;
129
130 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
131 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
132
133 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
134 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
135
136 #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
137 #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
138
139 #[inline]
140 fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
141 let key = self.key(storage);
142 let val = self.val(storage);
143 if (self.logic)(key, val) {
144 self.cursor.map_times(storage, logic)
145 }
146 }
147
148 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
149 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
150
151 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
152 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
153
154 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
155 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
156}
157
158
159
160pub struct BatchCursorFilter<C, F> {
162 cursor: C,
163 logic: F,
164}
165
166impl<C: Cursor, F> WithLayout for BatchCursorFilter<C, F> {
167 type Layout = C::Layout;
168}
169
170impl<C, F> BatchCursorFilter<C, F> {
171 fn new(cursor: C, logic: F) -> Self {
172 BatchCursorFilter {
173 cursor,
174 logic,
175 }
176 }
177}
178
179impl<C: Cursor, F> Cursor for BatchCursorFilter<C, F>
180where
181 F: FnMut(C::Key<'_>, C::Val<'_>)->bool+'static,
182{
183 type Storage = BatchFilter<C::Storage, F>;
184
185 #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) }
186 #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(&storage.batch) }
187
188 #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) }
189 #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) }
190
191 #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(&storage.batch) }
192 #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(&storage.batch) }
193
194 #[inline]
195 fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
196 let key = self.key(storage);
197 let val = self.val(storage);
198 if (self.logic)(key, val) {
199 self.cursor.map_times(&storage.batch, logic)
200 }
201 }
202
203 #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(&storage.batch) }
204 #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(&storage.batch, key) }
205
206 #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(&storage.batch) }
207 #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(&storage.batch, val) }
208
209 #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(&storage.batch) }
210 #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(&storage.batch) }
211}