timely/dataflow/operators/core/
inspect.rs

1//! Extension trait and implementation for observing and action on streamed data.
2
3use crate::Container;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::operators::generic::Operator;
7
8/// Methods to inspect records and batches of records on a stream.
9pub trait Inspect<G: Scope, C>: InspectCore<G, C> + Sized
10where
11    for<'a> &'a C: IntoIterator,
12{
13    /// Runs a supplied closure on each observed data element.
14    ///
15    /// # Examples
16    /// ```
17    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
18    ///
19    /// timely::example(|scope| {
20    ///     (0..10).to_stream(scope)
21    ///            .inspect(|x| println!("seen: {:?}", x));
22    /// });
23    /// ```
24    fn inspect<F>(&self, mut func: F) -> Self
25    where
26        F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static,
27    {
28        self.inspect_batch(move |_, data| {
29            for datum in data.into_iter() { func(datum); }
30        })
31    }
32
33    /// Runs a supplied closure on each observed data element and associated time.
34    ///
35    /// # Examples
36    /// ```
37    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
38    ///
39    /// timely::example(|scope| {
40    ///     (0..10).to_stream(scope)
41    ///            .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
42    /// });
43    /// ```
44    fn inspect_time<F>(&self, mut func: F) -> Self
45    where
46        F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static,
47    {
48        self.inspect_batch(move |time, data| {
49            for datum in data.into_iter() {
50                func(time, datum);
51            }
52        })
53    }
54
55    /// Runs a supplied closure on each observed data batch (time and data slice).
56    ///
57    /// # Examples
58    /// ```
59    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
60    ///
61    /// timely::example(|scope| {
62    ///     (0..10).to_stream(scope)
63    ///            .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
64    /// });
65    /// ```
66    fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self {
67        self.inspect_core(move |event| {
68            if let Ok((time, data)) = event {
69                func(time, data);
70            }
71        })
72    }
73
74    /// Runs a supplied closure on each observed data batch, and each frontier advancement.
75    ///
76    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
77    /// and `Err` for frontiers. Frontiers are only presented when they change.
78    ///
79    /// # Examples
80    /// ```
81    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
82    ///
83    /// timely::example(|scope| {
84    ///     (0..10).to_stream(scope)
85    ///            .inspect_core(|event| {
86    ///                match event {
87    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
88    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
89    ///                }
90    ///             });
91    /// });
92    /// ```
93    fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
94}
95
96impl<G: Scope, C: Container> Inspect<G, C> for StreamCore<G, C>
97where
98    for<'a> &'a C: IntoIterator,
99{
100    fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static {
101        self.inspect_container(func)
102    }
103}
104
105/// Inspect containers
106pub trait InspectCore<G: Scope, C> {
107    /// Runs a supplied closure on each observed container, and each frontier advancement.
108    ///
109    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
110    /// and `Err` for frontiers. Frontiers are only presented when they change.
111    ///
112    /// # Examples
113    /// ```
114    /// use timely::dataflow::operators::{ToStream, Map, InspectCore};
115    ///
116    /// timely::example(|scope| {
117    ///     (0..10).to_stream(scope)
118    ///            .inspect_container(|event| {
119    ///                match event {
120    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
121    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
122    ///                }
123    ///             });
124    /// });
125    /// ```
126    fn inspect_container<F>(&self, func: F) -> StreamCore<G, C> where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
127}
128
129impl<G: Scope, C: Container> InspectCore<G, C> for StreamCore<G, C> {
130
131    fn inspect_container<F>(&self, mut func: F) -> StreamCore<G, C>
132        where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static
133    {
134        use crate::progress::timestamp::Timestamp;
135        let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
136        self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |(input, chain), output| {
137            if chain.frontier() != frontier.borrow() {
138                frontier.clear();
139                frontier.extend(chain.frontier().iter().cloned());
140                func(Err(frontier.elements()));
141            }
142            input.for_each_time(|time, data| {
143                let mut session = output.session(&time);
144                for data in data {
145                    func(Ok((&time, &*data)));
146                    session.give_container(data);
147                }
148            });
149        })
150    }
151}