timely/dataflow/operators/core/inspect.rs
1//! Extension trait and implementation for observing and action on streamed data.
2
3use crate::Container;
4use crate::container::IterContainer;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::{Scope, StreamCore};
7use crate::dataflow::operators::generic::Operator;
8
9/// Methods to inspect records and batches of records on a stream.
10pub trait Inspect<G: Scope, C: IterContainer>: InspectCore<G, C> + Sized {
11 /// Runs a supplied closure on each observed data element.
12 ///
13 /// # Examples
14 /// ```
15 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
16 ///
17 /// timely::example(|scope| {
18 /// (0..10).to_stream(scope)
19 /// .inspect(|x| println!("seen: {:?}", x));
20 /// });
21 /// ```
22 fn inspect<F>(&self, mut func: F) -> Self
23 where
24 F: for<'a> FnMut(C::ItemRef<'a>) + 'static,
25 {
26 self.inspect_batch(move |_, data| {
27 for datum in data.iter() { func(datum); }
28 })
29 }
30
31 /// Runs a supplied closure on each observed data element and associated time.
32 ///
33 /// # Examples
34 /// ```
35 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
36 ///
37 /// timely::example(|scope| {
38 /// (0..10).to_stream(scope)
39 /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
40 /// });
41 /// ```
42 fn inspect_time<F>(&self, mut func: F) -> Self
43 where
44 F: for<'a> FnMut(&G::Timestamp, C::ItemRef<'a>) + 'static,
45 {
46 self.inspect_batch(move |time, data| {
47 for datum in data.iter() {
48 func(time, datum);
49 }
50 })
51 }
52
53 /// Runs a supplied closure on each observed data batch (time and data slice).
54 ///
55 /// # Examples
56 /// ```
57 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
58 ///
59 /// timely::example(|scope| {
60 /// (0..10).to_stream(scope)
61 /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
62 /// });
63 /// ```
64 fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self {
65 self.inspect_core(move |event| {
66 if let Ok((time, data)) = event {
67 func(time, data);
68 }
69 })
70 }
71
72 /// Runs a supplied closure on each observed data batch, and each frontier advancement.
73 ///
74 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
75 /// and `Err` for frontiers. Frontiers are only presented when they change.
76 ///
77 /// # Examples
78 /// ```
79 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
80 ///
81 /// timely::example(|scope| {
82 /// (0..10).to_stream(scope)
83 /// .inspect_core(|event| {
84 /// match event {
85 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
86 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
87 /// }
88 /// });
89 /// });
90 /// ```
91 fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
92}
93
94impl<G: Scope, C: Container + IterContainer> Inspect<G, C> for StreamCore<G, C> {
95 fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static {
96 self.inspect_container(func)
97 }
98}
99
100/// Inspect containers
101pub trait InspectCore<G: Scope, C> {
102 /// Runs a supplied closure on each observed container, and each frontier advancement.
103 ///
104 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
105 /// and `Err` for frontiers. Frontiers are only presented when they change.
106 ///
107 /// # Examples
108 /// ```
109 /// use timely::dataflow::operators::{ToStream, Map, InspectCore};
110 ///
111 /// timely::example(|scope| {
112 /// (0..10).to_stream(scope)
113 /// .inspect_container(|event| {
114 /// match event {
115 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
116 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
117 /// }
118 /// });
119 /// });
120 /// ```
121 fn inspect_container<F>(&self, func: F) -> StreamCore<G, C> where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
122}
123
124impl<G: Scope, C: Container> InspectCore<G, C> for StreamCore<G, C> {
125
126 fn inspect_container<F>(&self, mut func: F) -> StreamCore<G, C>
127 where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static
128 {
129 use crate::progress::timestamp::Timestamp;
130 let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
131 self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| {
132 if input.frontier.frontier() != frontier.borrow() {
133 frontier.clear();
134 frontier.extend(input.frontier.frontier().iter().cloned());
135 func(Err(frontier.elements()));
136 }
137 input.for_each(|time, data| {
138 func(Ok((&time, &*data)));
139 output.session(&time).give_container(data);
140 });
141 })
142 }
143}