timely/dataflow/operators/core/inspect.rs
1//! Extension trait and implementation for observing and action on streamed data.
2
3use crate::Container;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Scope, Stream};
6use crate::dataflow::operators::generic::Operator;
7
8/// Methods to inspect records and batches of records on a stream.
9pub trait Inspect<G: Scope, C>: InspectCore<G, C> + Sized
10where
11 for<'a> &'a C: IntoIterator,
12{
13 /// Runs a supplied closure on each observed data element.
14 ///
15 /// # Examples
16 /// ```
17 /// use timely::dataflow::operators::{ToStream, Inspect};
18 ///
19 /// timely::example(|scope| {
20 /// (0..10).to_stream(scope)
21 /// .container::<Vec<_>>()
22 /// .inspect(|x| println!("seen: {:?}", x));
23 /// });
24 /// ```
25 fn inspect<F>(self, mut func: F) -> Self
26 where
27 F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static,
28 {
29 self.inspect_batch(move |_, data| {
30 for datum in data.into_iter() { func(datum); }
31 })
32 }
33
34 /// Runs a supplied closure on each observed data element and associated time.
35 ///
36 /// # Examples
37 /// ```
38 /// use timely::dataflow::operators::{ToStream, Inspect};
39 ///
40 /// timely::example(|scope| {
41 /// (0..10).to_stream(scope)
42 /// .container::<Vec<_>>()
43 /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
44 /// });
45 /// ```
46 fn inspect_time<F>(self, mut func: F) -> Self
47 where
48 F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static,
49 {
50 self.inspect_batch(move |time, data| {
51 for datum in data.into_iter() {
52 func(time, datum);
53 }
54 })
55 }
56
57 /// Runs a supplied closure on each observed data batch (time and data slice).
58 ///
59 /// # Examples
60 /// ```
61 /// use timely::dataflow::operators::{ToStream, Inspect};
62 ///
63 /// timely::example(|scope| {
64 /// (0..10).to_stream(scope)
65 /// .container::<Vec<_>>()
66 /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
67 /// });
68 /// ```
69 fn inspect_batch(self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self {
70 self.inspect_core(move |event| {
71 if let Ok((time, data)) = event {
72 func(time, data);
73 }
74 })
75 }
76
77 /// Runs a supplied closure on each observed data batch, and each frontier advancement.
78 ///
79 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
80 /// and `Err` for frontiers. Frontiers are only presented when they change.
81 ///
82 /// # Examples
83 /// ```
84 /// use timely::dataflow::operators::{ToStream, Inspect};
85 ///
86 /// timely::example(|scope| {
87 /// (0..10).to_stream(scope)
88 /// .container::<Vec<_>>()
89 /// .inspect_core(|event| {
90 /// match event {
91 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
92 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
93 /// }
94 /// });
95 /// });
96 /// ```
97 fn inspect_core<F>(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
98}
99
100impl<G: Scope, C: Container> Inspect<G, C> for Stream<G, C>
101where
102 for<'a> &'a C: IntoIterator,
103{
104 fn inspect_core<F>(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static {
105 self.inspect_container(func)
106 }
107}
108
109/// Inspect containers
110pub trait InspectCore<G: Scope, C> {
111 /// Runs a supplied closure on each observed container, and each frontier advancement.
112 ///
113 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
114 /// and `Err` for frontiers. Frontiers are only presented when they change.
115 ///
116 /// # Examples
117 /// ```
118 /// use timely::dataflow::operators::{ToStream, InspectCore};
119 ///
120 /// timely::example(|scope| {
121 /// (0..10).to_stream(scope)
122 /// .container::<Vec<_>>()
123 /// .inspect_container(|event| {
124 /// match event {
125 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
126 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
127 /// }
128 /// });
129 /// });
130 /// ```
131 fn inspect_container<F>(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
132}
133
134impl<G: Scope, C: Container> InspectCore<G, C> for Stream<G, C> {
135
136 fn inspect_container<F>(self, mut func: F) -> Self
137 where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static
138 {
139 use crate::progress::timestamp::Timestamp;
140 let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
141 self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |(input, chain), output| {
142 if chain.frontier() != frontier.borrow() {
143 frontier.clear();
144 frontier.extend(chain.frontier().iter().cloned());
145 func(Err(frontier.elements()));
146 }
147 input.for_each_time(|time, data| {
148 let mut session = output.session(&time);
149 for data in data {
150 func(Ok((&time, &*data)));
151 session.give_container(data);
152 }
153 });
154 })
155 }
156}