timely/dataflow/operators/core/inspect.rs
1//! Extension trait and implementation for observing and action on streamed data.
2
3use crate::{Container, Data};
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::operators::generic::Operator;
7
8/// Methods to inspect records and batches of records on a stream.
9pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
10 /// Runs a supplied closure on each observed data element.
11 ///
12 /// # Examples
13 /// ```
14 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
15 ///
16 /// timely::example(|scope| {
17 /// (0..10).to_stream(scope)
18 /// .inspect(|x| println!("seen: {:?}", x));
19 /// });
20 /// ```
21 fn inspect<F>(&self, mut func: F) -> Self
22 where
23 F: for<'a> FnMut(C::ItemRef<'a>) + 'static,
24 {
25 self.inspect_batch(move |_, data| {
26 for datum in data.iter() { func(datum); }
27 })
28 }
29
30 /// Runs a supplied closure on each observed data element and associated time.
31 ///
32 /// # Examples
33 /// ```
34 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
35 ///
36 /// timely::example(|scope| {
37 /// (0..10).to_stream(scope)
38 /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
39 /// });
40 /// ```
41 fn inspect_time<F>(&self, mut func: F) -> Self
42 where
43 F: for<'a> FnMut(&G::Timestamp, C::ItemRef<'a>) + 'static,
44 {
45 self.inspect_batch(move |time, data| {
46 for datum in data.iter() {
47 func(time, datum);
48 }
49 })
50 }
51
52 /// Runs a supplied closure on each observed data batch (time and data slice).
53 ///
54 /// # Examples
55 /// ```
56 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
57 ///
58 /// timely::example(|scope| {
59 /// (0..10).to_stream(scope)
60 /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
61 /// });
62 /// ```
63 fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self {
64 self.inspect_core(move |event| {
65 if let Ok((time, data)) = event {
66 func(time, data);
67 }
68 })
69 }
70
71 /// Runs a supplied closure on each observed data batch, and each frontier advancement.
72 ///
73 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
74 /// and `Err` for frontiers. Frontiers are only presented when they change.
75 ///
76 /// # Examples
77 /// ```
78 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
79 ///
80 /// timely::example(|scope| {
81 /// (0..10).to_stream(scope)
82 /// .inspect_core(|event| {
83 /// match event {
84 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
85 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
86 /// }
87 /// });
88 /// });
89 /// ```
90 fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
91}
92
93impl<G: Scope, C: Container + Data> Inspect<G, C> for StreamCore<G, C> {
94 fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static {
95 self.inspect_container(func)
96 }
97}
98
99/// Inspect containers
100pub trait InspectCore<G: Scope, C: Container> {
101 /// Runs a supplied closure on each observed container, and each frontier advancement.
102 ///
103 /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
104 /// and `Err` for frontiers. Frontiers are only presented when they change.
105 ///
106 /// # Examples
107 /// ```
108 /// use timely::dataflow::operators::{ToStream, Map, InspectCore};
109 ///
110 /// timely::example(|scope| {
111 /// (0..10).to_stream(scope)
112 /// .inspect_container(|event| {
113 /// match event {
114 /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
115 /// Err(frontier) => println!("frontier advanced to {:?}", frontier),
116 /// }
117 /// });
118 /// });
119 /// ```
120 fn inspect_container<F>(&self, func: F) -> StreamCore<G, C> where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
121}
122
123impl<G: Scope, C: Container + Data> InspectCore<G, C> for StreamCore<G, C> {
124
125 fn inspect_container<F>(&self, mut func: F) -> StreamCore<G, C>
126 where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static
127 {
128 use crate::progress::timestamp::Timestamp;
129 let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
130 self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| {
131 if input.frontier.frontier() != frontier.borrow() {
132 frontier.clear();
133 frontier.extend(input.frontier.frontier().iter().cloned());
134 func(Err(frontier.elements()));
135 }
136 input.for_each(|time, data| {
137 func(Ok((&time, &*data)));
138 output.session(&time).give_container(data);
139 });
140 })
141 }
142}