timely/dataflow/operators/core/
inspect.rs

1//! Extension trait and implementation for observing and action on streamed data.
2
3use crate::{Container, Data};
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::operators::generic::Operator;
7
8/// Methods to inspect records and batches of records on a stream.
9pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
10    /// Runs a supplied closure on each observed data element.
11    ///
12    /// # Examples
13    /// ```
14    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
15    ///
16    /// timely::example(|scope| {
17    ///     (0..10).to_stream(scope)
18    ///            .inspect(|x| println!("seen: {:?}", x));
19    /// });
20    /// ```
21    fn inspect<F>(&self, mut func: F) -> Self
22    where
23        F: for<'a> FnMut(C::ItemRef<'a>) + 'static,
24    {
25        self.inspect_batch(move |_, data| {
26            for datum in data.iter() { func(datum); }
27        })
28    }
29
30    /// Runs a supplied closure on each observed data element and associated time.
31    ///
32    /// # Examples
33    /// ```
34    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
35    ///
36    /// timely::example(|scope| {
37    ///     (0..10).to_stream(scope)
38    ///            .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
39    /// });
40    /// ```
41    fn inspect_time<F>(&self, mut func: F) -> Self
42    where
43        F: for<'a> FnMut(&G::Timestamp, C::ItemRef<'a>) + 'static,
44    {
45        self.inspect_batch(move |time, data| {
46            for datum in data.iter() {
47                func(time, datum);
48            }
49        })
50    }
51
52    /// Runs a supplied closure on each observed data batch (time and data slice).
53    ///
54    /// # Examples
55    /// ```
56    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
57    ///
58    /// timely::example(|scope| {
59    ///     (0..10).to_stream(scope)
60    ///            .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
61    /// });
62    /// ```
63    fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self {
64        self.inspect_core(move |event| {
65            if let Ok((time, data)) = event {
66                func(time, data);
67            }
68        })
69    }
70
71    /// Runs a supplied closure on each observed data batch, and each frontier advancement.
72    ///
73    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
74    /// and `Err` for frontiers. Frontiers are only presented when they change.
75    ///
76    /// # Examples
77    /// ```
78    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
79    ///
80    /// timely::example(|scope| {
81    ///     (0..10).to_stream(scope)
82    ///            .inspect_core(|event| {
83    ///                match event {
84    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
85    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
86    ///                }
87    ///             });
88    /// });
89    /// ```
90    fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
91}
92
93impl<G: Scope, C: Container + Data> Inspect<G, C> for StreamCore<G, C> {
94    fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static {
95        self.inspect_container(func)
96    }
97}
98
99/// Inspect containers
100pub trait InspectCore<G: Scope, C: Container> {
101    /// Runs a supplied closure on each observed container, and each frontier advancement.
102    ///
103    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
104    /// and `Err` for frontiers. Frontiers are only presented when they change.
105    ///
106    /// # Examples
107    /// ```
108    /// use timely::dataflow::operators::{ToStream, Map, InspectCore};
109    ///
110    /// timely::example(|scope| {
111    ///     (0..10).to_stream(scope)
112    ///            .inspect_container(|event| {
113    ///                match event {
114    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
115    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
116    ///                }
117    ///             });
118    /// });
119    /// ```
120    fn inspect_container<F>(&self, func: F) -> StreamCore<G, C> where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
121}
122
123impl<G: Scope, C: Container + Data> InspectCore<G, C> for StreamCore<G, C> {
124
125    fn inspect_container<F>(&self, mut func: F) -> StreamCore<G, C>
126        where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static
127    {
128        use crate::progress::timestamp::Timestamp;
129        let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
130        self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| {
131            if input.frontier.frontier() != frontier.borrow() {
132                frontier.clear();
133                frontier.extend(input.frontier.frontier().iter().cloned());
134                func(Err(frontier.elements()));
135            }
136            input.for_each(|time, data| {
137                func(Ok((&time, &*data)));
138                output.session(&time).give_container(data);
139            });
140        })
141    }
142}