timely/dataflow/operators/core/inspect.rs
1//! Extension trait and implementation for observing and action on streamed data.
2
3use crate::Container;
4use crate::progress::Timestamp;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::Stream;
7use crate::dataflow::operators::generic::Operator;
8
9/// Methods to inspect records and batches of records on a stream.
10pub trait Inspect<T: Timestamp, C>: InspectCore<T, C> + Sized
11where
12 for<'a> &'a C: IntoIterator,
13{
14 /// Runs a supplied closure on each observed data element.
15 ///
16 /// # Examples
17 /// ```
18 /// use timely::dataflow::operators::{ToStream, Inspect};
19 ///
20 /// timely::example(|scope| {
21 /// (0..10).to_stream(scope)
22 /// .container::<Vec<_>>()
23 /// .inspect(|x| println!("seen: {:?}", x));
24 /// });
25 /// ```
26 fn inspect<F>(self, mut func: F) -> Self
27 where
28 F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static,
29 {
30 self.inspect_batch(move |_, data| {
31 for datum in data.into_iter() { func(datum); }
32 })
33 }
34
35 /// Runs a supplied closure on each observed data element and associated time.
36 ///
37 /// # Examples
38 /// ```
39 /// use timely::dataflow::operators::{ToStream, Inspect};
40 ///
41 /// timely::example(|scope| {
42 /// (0..10).to_stream(scope)
43 /// .container::<Vec<_>>()
44 /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
45 /// });
46 /// ```
47 fn inspect_time<F>(self, mut func: F) -> Self
48 where
49 F: for<'a> FnMut(&T, <&'a C as IntoIterator>::Item) + 'static,
50 {
51 self.inspect_batch(move |time, data| {
52 for datum in data.into_iter() {
53 func(time, datum);
54 }
55 })
56 }
57
58 /// Runs a supplied closure on each observed data batch (time and data slice).
59 ///
60 /// # Examples
61 /// ```
62 /// use timely::dataflow::operators::{ToStream, Inspect};
63 ///
64 /// timely::example(|scope| {
65 /// (0..10).to_stream(scope)
66 /// .container::<Vec<_>>()
67 /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
68 /// });
69 /// ```
70 fn inspect_batch(self, mut func: impl FnMut(&T, &C)+'static) -> Self {
71 self.inspect_core(move |event| {
72 if let Ok((time, data)) = event {
73 func(time, data);
74 }
75 })
76 }
77
78 /// Runs a supplied closure on each observed data batch, and each frontier advancement.
79 ///
80 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
81 /// and `Err` for frontiers. Frontiers are only presented when they change.
82 ///
83 /// # Examples
84 /// ```
85 /// use timely::dataflow::operators::{ToStream, Inspect};
86 ///
87 /// timely::example(|scope| {
88 /// (0..10).to_stream(scope)
89 /// .container::<Vec<_>>()
90 /// .inspect_core(|event| {
91 /// match event {
92 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
93 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
94 /// }
95 /// });
96 /// });
97 /// ```
98 fn inspect_core<F>(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>)+'static;
99}
100
101impl<T: Timestamp, C: Container> Inspect<T, C> for Stream<'_, T, C>
102where
103 for<'a> &'a C: IntoIterator,
104{
105 fn inspect_core<F>(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>) + 'static {
106 self.inspect_container(func)
107 }
108}
109
110/// Inspect containers
111pub trait InspectCore<T: Timestamp, C> {
112 /// Runs a supplied closure on each observed container, and each frontier advancement.
113 ///
114 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
115 /// and `Err` for frontiers. Frontiers are only presented when they change.
116 ///
117 /// # Examples
118 /// ```
119 /// use timely::dataflow::operators::{ToStream, InspectCore};
120 ///
121 /// timely::example(|scope| {
122 /// (0..10).to_stream(scope)
123 /// .container::<Vec<_>>()
124 /// .inspect_container(|event| {
125 /// match event {
126 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
127 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
128 /// }
129 /// });
130 /// });
131 /// ```
132 fn inspect_container<F>(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>)+'static;
133}
134
135impl<T: Timestamp, C: Container> InspectCore<T, C> for Stream<'_, T, C> {
136
137 fn inspect_container<F>(self, mut func: F) -> Self
138 where F: FnMut(Result<(&T, &C), &[T]>)+'static
139 {
140 let mut frontier = crate::progress::Antichain::from_elem(T::minimum());
141 self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |(input, chain), output| {
142 if chain.frontier() != frontier.borrow() {
143 frontier.clear();
144 frontier.extend(chain.frontier().iter().cloned());
145 func(Err(frontier.elements()));
146 }
147 input.for_each_time(|time, data| {
148 let mut session = output.session(&time);
149 for data in data {
150 func(Ok((&time, &*data)));
151 session.give_container(data);
152 }
153 });
154 })
155 }
156}