timely/dataflow/operators/core/inspect.rs
1//! Extension trait and implementation for observing and action on streamed data.
2
3use crate::Container;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::operators::generic::Operator;
7
8/// Methods to inspect records and batches of records on a stream.
9pub trait Inspect<G: Scope, C>: InspectCore<G, C> + Sized
10where
11 for<'a> &'a C: IntoIterator,
12{
13 /// Runs a supplied closure on each observed data element.
14 ///
15 /// # Examples
16 /// ```
17 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
18 ///
19 /// timely::example(|scope| {
20 /// (0..10).to_stream(scope)
21 /// .inspect(|x| println!("seen: {:?}", x));
22 /// });
23 /// ```
24 fn inspect<F>(&self, mut func: F) -> Self
25 where
26 F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static,
27 {
28 self.inspect_batch(move |_, data| {
29 for datum in data.into_iter() { func(datum); }
30 })
31 }
32
33 /// Runs a supplied closure on each observed data element and associated time.
34 ///
35 /// # Examples
36 /// ```
37 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
38 ///
39 /// timely::example(|scope| {
40 /// (0..10).to_stream(scope)
41 /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
42 /// });
43 /// ```
44 fn inspect_time<F>(&self, mut func: F) -> Self
45 where
46 F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static,
47 {
48 self.inspect_batch(move |time, data| {
49 for datum in data.into_iter() {
50 func(time, datum);
51 }
52 })
53 }
54
55 /// Runs a supplied closure on each observed data batch (time and data slice).
56 ///
57 /// # Examples
58 /// ```
59 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
60 ///
61 /// timely::example(|scope| {
62 /// (0..10).to_stream(scope)
63 /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
64 /// });
65 /// ```
66 fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self {
67 self.inspect_core(move |event| {
68 if let Ok((time, data)) = event {
69 func(time, data);
70 }
71 })
72 }
73
74 /// Runs a supplied closure on each observed data batch, and each frontier advancement.
75 ///
76 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
77 /// and `Err` for frontiers. Frontiers are only presented when they change.
78 ///
79 /// # Examples
80 /// ```
81 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
82 ///
83 /// timely::example(|scope| {
84 /// (0..10).to_stream(scope)
85 /// .inspect_core(|event| {
86 /// match event {
87 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
88 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
89 /// }
90 /// });
91 /// });
92 /// ```
93 fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
94}
95
96impl<G: Scope, C: Container> Inspect<G, C> for StreamCore<G, C>
97where
98 for<'a> &'a C: IntoIterator,
99{
100 fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static {
101 self.inspect_container(func)
102 }
103}
104
105/// Inspect containers
106pub trait InspectCore<G: Scope, C> {
107 /// Runs a supplied closure on each observed container, and each frontier advancement.
108 ///
109 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
110 /// and `Err` for frontiers. Frontiers are only presented when they change.
111 ///
112 /// # Examples
113 /// ```
114 /// use timely::dataflow::operators::{ToStream, Map, InspectCore};
115 ///
116 /// timely::example(|scope| {
117 /// (0..10).to_stream(scope)
118 /// .inspect_container(|event| {
119 /// match event {
120 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
121 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
122 /// }
123 /// });
124 /// });
125 /// ```
126 fn inspect_container<F>(&self, func: F) -> StreamCore<G, C> where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
127}
128
129impl<G: Scope, C: Container> InspectCore<G, C> for StreamCore<G, C> {
130
131 fn inspect_container<F>(&self, mut func: F) -> StreamCore<G, C>
132 where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static
133 {
134 use crate::progress::timestamp::Timestamp;
135 let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
136 self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |(input, chain), output| {
137 if chain.frontier() != frontier.borrow() {
138 frontier.clear();
139 frontier.extend(chain.frontier().iter().cloned());
140 func(Err(frontier.elements()));
141 }
142 input.for_each_time(|time, data| {
143 let mut session = output.session(&time);
144 for data in data {
145 func(Ok((&time, &*data)));
146 session.give_container(data);
147 }
148 });
149 })
150 }
151}