Skip to main content

timely/dataflow/operators/core/
inspect.rs

1//! Extension trait and implementation for observing and action on streamed data.
2
3use crate::Container;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Scope, Stream};
6use crate::dataflow::operators::generic::Operator;
7
8/// Methods to inspect records and batches of records on a stream.
9pub trait Inspect<G: Scope, C>: InspectCore<G, C> + Sized
10where
11    for<'a> &'a C: IntoIterator,
12{
13    /// Runs a supplied closure on each observed data element.
14    ///
15    /// # Examples
16    /// ```
17    /// use timely::dataflow::operators::{ToStream, Inspect};
18    ///
19    /// timely::example(|scope| {
20    ///     (0..10).to_stream(scope)
21    ///            .container::<Vec<_>>()
22    ///            .inspect(|x| println!("seen: {:?}", x));
23    /// });
24    /// ```
25    fn inspect<F>(self, mut func: F) -> Self
26    where
27        F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static,
28    {
29        self.inspect_batch(move |_, data| {
30            for datum in data.into_iter() { func(datum); }
31        })
32    }
33
34    /// Runs a supplied closure on each observed data element and associated time.
35    ///
36    /// # Examples
37    /// ```
38    /// use timely::dataflow::operators::{ToStream, Inspect};
39    ///
40    /// timely::example(|scope| {
41    ///     (0..10).to_stream(scope)
42    ///            .container::<Vec<_>>()
43    ///            .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
44    /// });
45    /// ```
46    fn inspect_time<F>(self, mut func: F) -> Self
47    where
48        F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static,
49    {
50        self.inspect_batch(move |time, data| {
51            for datum in data.into_iter() {
52                func(time, datum);
53            }
54        })
55    }
56
57    /// Runs a supplied closure on each observed data batch (time and data slice).
58    ///
59    /// # Examples
60    /// ```
61    /// use timely::dataflow::operators::{ToStream, Inspect};
62    ///
63    /// timely::example(|scope| {
64    ///     (0..10).to_stream(scope)
65    ///            .container::<Vec<_>>()
66    ///            .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
67    /// });
68    /// ```
69    fn inspect_batch(self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self {
70        self.inspect_core(move |event| {
71            if let Ok((time, data)) = event {
72                func(time, data);
73            }
74        })
75    }
76
77    /// Runs a supplied closure on each observed data batch, and each frontier advancement.
78    ///
79    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
80    /// and `Err` for frontiers. Frontiers are only presented when they change.
81    ///
82    /// # Examples
83    /// ```
84    /// use timely::dataflow::operators::{ToStream, Inspect};
85    ///
86    /// timely::example(|scope| {
87    ///     (0..10).to_stream(scope)
88    ///            .container::<Vec<_>>()
89    ///            .inspect_core(|event| {
90    ///                match event {
91    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
92    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
93    ///                }
94    ///             });
95    /// });
96    /// ```
97    fn inspect_core<F>(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
98}
99
100impl<G: Scope, C: Container> Inspect<G, C> for Stream<G, C>
101where
102    for<'a> &'a C: IntoIterator,
103{
104    fn inspect_core<F>(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static {
105        self.inspect_container(func)
106    }
107}
108
109/// Inspect containers
110pub trait InspectCore<G: Scope, C> {
111    /// Runs a supplied closure on each observed container, and each frontier advancement.
112    ///
113    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
114    /// and `Err` for frontiers. Frontiers are only presented when they change.
115    ///
116    /// # Examples
117    /// ```
118    /// use timely::dataflow::operators::{ToStream, InspectCore};
119    ///
120    /// timely::example(|scope| {
121    ///     (0..10).to_stream(scope)
122    ///            .container::<Vec<_>>()
123    ///            .inspect_container(|event| {
124    ///                match event {
125    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
126    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
127    ///                }
128    ///             });
129    /// });
130    /// ```
131    fn inspect_container<F>(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
132}
133
134impl<G: Scope, C: Container> InspectCore<G, C> for Stream<G, C> {
135
136    fn inspect_container<F>(self, mut func: F) -> Self
137        where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static
138    {
139        use crate::progress::timestamp::Timestamp;
140        let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
141        self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |(input, chain), output| {
142            if chain.frontier() != frontier.borrow() {
143                frontier.clear();
144                frontier.extend(chain.frontier().iter().cloned());
145                func(Err(frontier.elements()));
146            }
147            input.for_each_time(|time, data| {
148                let mut session = output.session(&time);
149                for data in data {
150                    func(Ok((&time, &*data)));
151                    session.give_container(data);
152                }
153            });
154        })
155    }
156}