timely/dataflow/operators/
filter.rs1use crate::Data;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Stream, Scope};
6use crate::dataflow::operators::generic::operator::Operator;
7
8pub trait Filter<D: Data> {
10 fn filter<P: FnMut(&D)->bool+'static>(&self, predicate: P) -> Self;
23}
24
25impl<G: Scope, D: Data> Filter<D> for Stream<G, D> {
26 fn filter<P: FnMut(&D)->bool+'static>(&self, mut predicate: P) -> Stream<G, D> {
27 self.unary(Pipeline, "Filter", move |_,_| move |input, output| {
28 input.for_each_time(|time, data| {
29 let mut session = output.session(&time);
30 for data in data {
31 data.retain(&mut predicate);
32 session.give_container(data);
33 }
34 });
35 })
36 }
37}