Skip to main content

timely/dataflow/operators/core/
inspect.rs

1//! Extension trait and implementation for observing and action on streamed data.
2
3use crate::Container;
4use crate::progress::Timestamp;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::Stream;
7use crate::dataflow::operators::generic::Operator;
8
9/// Methods to inspect records and batches of records on a stream.
10pub trait Inspect<T: Timestamp, C>: InspectCore<T, C> + Sized
11where
12    for<'a> &'a C: IntoIterator,
13{
14    /// Runs a supplied closure on each observed data element.
15    ///
16    /// # Examples
17    /// ```
18    /// use timely::dataflow::operators::{ToStream, Inspect};
19    ///
20    /// timely::example(|scope| {
21    ///     (0..10).to_stream(scope)
22    ///            .container::<Vec<_>>()
23    ///            .inspect(|x| println!("seen: {:?}", x));
24    /// });
25    /// ```
26    fn inspect<F>(self, mut func: F) -> Self
27    where
28        F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static,
29    {
30        self.inspect_batch(move |_, data| {
31            for datum in data.into_iter() { func(datum); }
32        })
33    }
34
35    /// Runs a supplied closure on each observed data element and associated time.
36    ///
37    /// # Examples
38    /// ```
39    /// use timely::dataflow::operators::{ToStream, Inspect};
40    ///
41    /// timely::example(|scope| {
42    ///     (0..10).to_stream(scope)
43    ///            .container::<Vec<_>>()
44    ///            .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
45    /// });
46    /// ```
47    fn inspect_time<F>(self, mut func: F) -> Self
48    where
49        F: for<'a> FnMut(&T, <&'a C as IntoIterator>::Item) + 'static,
50    {
51        self.inspect_batch(move |time, data| {
52            for datum in data.into_iter() {
53                func(time, datum);
54            }
55        })
56    }
57
58    /// Runs a supplied closure on each observed data batch (time and data slice).
59    ///
60    /// # Examples
61    /// ```
62    /// use timely::dataflow::operators::{ToStream, Inspect};
63    ///
64    /// timely::example(|scope| {
65    ///     (0..10).to_stream(scope)
66    ///            .container::<Vec<_>>()
67    ///            .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
68    /// });
69    /// ```
70    fn inspect_batch(self, mut func: impl FnMut(&T, &C)+'static) -> Self {
71        self.inspect_core(move |event| {
72            if let Ok((time, data)) = event {
73                func(time, data);
74            }
75        })
76    }
77
78    /// Runs a supplied closure on each observed data batch, and each frontier advancement.
79    ///
80    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
81    /// and `Err` for frontiers. Frontiers are only presented when they change.
82    ///
83    /// # Examples
84    /// ```
85    /// use timely::dataflow::operators::{ToStream, Inspect};
86    ///
87    /// timely::example(|scope| {
88    ///     (0..10).to_stream(scope)
89    ///            .container::<Vec<_>>()
90    ///            .inspect_core(|event| {
91    ///                match event {
92    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
93    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
94    ///                }
95    ///             });
96    /// });
97    /// ```
98    fn inspect_core<F>(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>)+'static;
99}
100
101impl<T: Timestamp, C: Container> Inspect<T, C> for Stream<'_, T, C>
102where
103    for<'a> &'a C: IntoIterator,
104{
105    fn inspect_core<F>(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>) + 'static {
106        self.inspect_container(func)
107    }
108}
109
110/// Inspect containers
111pub trait InspectCore<T: Timestamp, C> {
112    /// Runs a supplied closure on each observed container, and each frontier advancement.
113    ///
114    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
115    /// and `Err` for frontiers. Frontiers are only presented when they change.
116    ///
117    /// # Examples
118    /// ```
119    /// use timely::dataflow::operators::{ToStream, InspectCore};
120    ///
121    /// timely::example(|scope| {
122    ///     (0..10).to_stream(scope)
123    ///            .container::<Vec<_>>()
124    ///            .inspect_container(|event| {
125    ///                match event {
126    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
127    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
128    ///                }
129    ///             });
130    /// });
131    /// ```
132    fn inspect_container<F>(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>)+'static;
133}
134
135impl<T: Timestamp, C: Container> InspectCore<T, C> for Stream<'_, T, C> {
136
137    fn inspect_container<F>(self, mut func: F) -> Self
138        where F: FnMut(Result<(&T, &C), &[T]>)+'static
139    {
140        let mut frontier = crate::progress::Antichain::from_elem(T::minimum());
141        self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |(input, chain), output| {
142            if chain.frontier() != frontier.borrow() {
143                frontier.clear();
144                frontier.extend(chain.frontier().iter().cloned());
145                func(Err(frontier.elements()));
146            }
147            input.for_each_time(|time, data| {
148                let mut session = output.session(&time);
149                for data in data {
150                    func(Ok((&time, &*data)));
151                    session.give_container(data);
152                }
153            });
154        })
155    }
156}