timely/dataflow/operators/
filter.rs

1//! Filters a stream by a predicate.
2
3use crate::Data;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Stream, Scope};
6use crate::dataflow::operators::generic::operator::Operator;
7
8/// Extension trait for filtering.
9pub trait Filter<D: Data> {
10    /// Returns a new instance of `self` containing only records satisfying `predicate`.
11    ///
12    /// # Examples
13    /// ```
14    /// use timely::dataflow::operators::{ToStream, Filter, Inspect};
15    ///
16    /// timely::example(|scope| {
17    ///     (0..10).to_stream(scope)
18    ///            .filter(|x| *x % 2 == 0)
19    ///            .inspect(|x| println!("seen: {:?}", x));
20    /// });
21    /// ```
22    fn filter<P: FnMut(&D)->bool+'static>(&self, predicate: P) -> Self;
23}
24
25impl<G: Scope, D: Data> Filter<D> for Stream<G, D> {
26    fn filter<P: FnMut(&D)->bool+'static>(&self, mut predicate: P) -> Stream<G, D> {
27        self.unary(Pipeline, "Filter", move |_,_| move |input, output| {
28            input.for_each(|time, data| {
29                data.retain(|x| predicate(x));
30                if !data.is_empty() {
31                    output.session(&time).give_container(data);
32                }
33            });
34        })
35    }
36}