timely/dataflow/operators/
filter.rs
1use crate::Data;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Stream, Scope};
6use crate::dataflow::operators::generic::operator::Operator;
7
8pub trait Filter<D: Data> {
10 fn filter<P: FnMut(&D)->bool+'static>(&self, predicate: P) -> Self;
23}
24
25impl<G: Scope, D: Data> Filter<D> for Stream<G, D> {
26 fn filter<P: FnMut(&D)->bool+'static>(&self, mut predicate: P) -> Stream<G, D> {
27 self.unary(Pipeline, "Filter", move |_,_| move |input, output| {
28 input.for_each(|time, data| {
29 data.retain(|x| predicate(x));
30 if !data.is_empty() {
31 output.session(&time).give_container(data);
32 }
33 });
34 })
35 }
36}