timely/dataflow/operators/core/
inspect.rs

1//! Extension trait and implementation for observing and action on streamed data.
2
3use crate::Container;
4use crate::container::IterContainer;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::{Scope, StreamCore};
7use crate::dataflow::operators::generic::Operator;
8
9/// Methods to inspect records and batches of records on a stream.
10pub trait Inspect<G: Scope, C: IterContainer>: InspectCore<G, C> + Sized {
11    /// Runs a supplied closure on each observed data element.
12    ///
13    /// # Examples
14    /// ```
15    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
16    ///
17    /// timely::example(|scope| {
18    ///     (0..10).to_stream(scope)
19    ///            .inspect(|x| println!("seen: {:?}", x));
20    /// });
21    /// ```
22    fn inspect<F>(&self, mut func: F) -> Self
23    where
24        F: for<'a> FnMut(C::ItemRef<'a>) + 'static,
25    {
26        self.inspect_batch(move |_, data| {
27            for datum in data.iter() { func(datum); }
28        })
29    }
30
31    /// Runs a supplied closure on each observed data element and associated time.
32    ///
33    /// # Examples
34    /// ```
35    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
36    ///
37    /// timely::example(|scope| {
38    ///     (0..10).to_stream(scope)
39    ///            .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
40    /// });
41    /// ```
42    fn inspect_time<F>(&self, mut func: F) -> Self
43    where
44        F: for<'a> FnMut(&G::Timestamp, C::ItemRef<'a>) + 'static,
45    {
46        self.inspect_batch(move |time, data| {
47            for datum in data.iter() {
48                func(time, datum);
49            }
50        })
51    }
52
53    /// Runs a supplied closure on each observed data batch (time and data slice).
54    ///
55    /// # Examples
56    /// ```
57    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
58    ///
59    /// timely::example(|scope| {
60    ///     (0..10).to_stream(scope)
61    ///            .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
62    /// });
63    /// ```
64    fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self {
65        self.inspect_core(move |event| {
66            if let Ok((time, data)) = event {
67                func(time, data);
68            }
69        })
70    }
71
72    /// Runs a supplied closure on each observed data batch, and each frontier advancement.
73    ///
74    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
75    /// and `Err` for frontiers. Frontiers are only presented when they change.
76    ///
77    /// # Examples
78    /// ```
79    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
80    ///
81    /// timely::example(|scope| {
82    ///     (0..10).to_stream(scope)
83    ///            .inspect_core(|event| {
84    ///                match event {
85    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
86    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
87    ///                }
88    ///             });
89    /// });
90    /// ```
91    fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
92}
93
94impl<G: Scope, C: Container + IterContainer> Inspect<G, C> for StreamCore<G, C> {
95    fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static {
96        self.inspect_container(func)
97    }
98}
99
100/// Inspect containers
101pub trait InspectCore<G: Scope, C> {
102    /// Runs a supplied closure on each observed container, and each frontier advancement.
103    ///
104    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
105    /// and `Err` for frontiers. Frontiers are only presented when they change.
106    ///
107    /// # Examples
108    /// ```
109    /// use timely::dataflow::operators::{ToStream, Map, InspectCore};
110    ///
111    /// timely::example(|scope| {
112    ///     (0..10).to_stream(scope)
113    ///            .inspect_container(|event| {
114    ///                match event {
115    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
116    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
117    ///                }
118    ///             });
119    /// });
120    /// ```
121    fn inspect_container<F>(&self, func: F) -> StreamCore<G, C> where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
122}
123
124impl<G: Scope, C: Container> InspectCore<G, C> for StreamCore<G, C> {
125
126    fn inspect_container<F>(&self, mut func: F) -> StreamCore<G, C>
127        where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static
128    {
129        use crate::progress::timestamp::Timestamp;
130        let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
131        self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| {
132            if input.frontier.frontier() != frontier.borrow() {
133                frontier.clear();
134                frontier.extend(input.frontier.frontier().iter().cloned());
135                func(Err(frontier.elements()));
136            }
137            input.for_each(|time, data| {
138                func(Ok((&time, &*data)));
139                output.session(&time).give_container(data);
140            });
141        })
142    }
143}