timely/dataflow/operators/
filter.rs

1//! Filters a stream by a predicate.
2
3use crate::Data;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Stream, Scope};
6use crate::dataflow::operators::generic::operator::Operator;
7
8/// Extension trait for filtering.
9pub trait Filter<D: Data> {
10    /// Returns a new instance of `self` containing only records satisfying `predicate`.
11    ///
12    /// # Examples
13    /// ```
14    /// use timely::dataflow::operators::{ToStream, Filter, Inspect};
15    ///
16    /// timely::example(|scope| {
17    ///     (0..10).to_stream(scope)
18    ///            .filter(|x| *x % 2 == 0)
19    ///            .inspect(|x| println!("seen: {:?}", x));
20    /// });
21    /// ```
22    fn filter<P: FnMut(&D)->bool+'static>(&self, predicate: P) -> Self;
23}
24
25impl<G: Scope, D: Data> Filter<D> for Stream<G, D> {
26    fn filter<P: FnMut(&D)->bool+'static>(&self, mut predicate: P) -> Stream<G, D> {
27        self.unary(Pipeline, "Filter", move |_,_| move |input, output| {
28            input.for_each_time(|time, data| {
29                let mut session = output.session(&time);
30                for data in data {
31                    data.retain(&mut predicate);
32                    session.give_container(data);
33                }
34            });
35        })
36    }
37}