timely/dataflow/operators/core/probe.rs
1//! Monitor progress at a `Stream`.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::progress::Timestamp;
7use crate::progress::frontier::{AntichainRef, MutableAntichain};
8use crate::dataflow::channels::pushers::Counter as PushCounter;
9use crate::dataflow::channels::pact::Pipeline;
10use crate::dataflow::channels::pullers::Counter as PullCounter;
11use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
12
13
14use crate::dataflow::{StreamCore, Scope};
15use crate::Container;
16use crate::dataflow::channels::Message;
17
18/// Monitors progress at a `Stream`.
19pub trait Probe<G: Scope, C: Container> {
20 /// Constructs a progress probe which indicates which timestamps have elapsed at the operator.
21 ///
22 /// # Examples
23 /// ```
24 /// use timely::*;
25 /// use timely::dataflow::Scope;
26 /// use timely::dataflow::operators::{Input, Probe, Inspect};
27 ///
28 /// // construct and execute a timely dataflow
29 /// timely::execute(Config::thread(), |worker| {
30 ///
31 /// // add an input and base computation off of it
32 /// let (mut input, probe) = worker.dataflow(|scope| {
33 /// let (input, stream) = scope.new_input();
34 /// let probe = stream.inspect(|x| println!("hello {:?}", x))
35 /// .probe();
36 /// (input, probe)
37 /// });
38 ///
39 /// // introduce input, advance computation
40 /// for round in 0..10 {
41 /// input.send(round);
42 /// input.advance_to(round + 1);
43 /// worker.step_while(|| probe.less_than(input.time()));
44 /// }
45 /// }).unwrap();
46 /// ```
47 fn probe(&self) -> Handle<G::Timestamp>;
48
49 /// Inserts a progress probe in a stream.
50 ///
51 /// # Examples
52 /// ```
53 /// use timely::*;
54 /// use timely::dataflow::Scope;
55 /// use timely::dataflow::operators::{Input, Probe, Inspect};
56 /// use timely::dataflow::operators::probe::Handle;
57 ///
58 /// // construct and execute a timely dataflow
59 /// timely::execute(Config::thread(), |worker| {
60 ///
61 /// // add an input and base computation off of it
62 /// let mut probe = Handle::new();
63 /// let mut input = worker.dataflow(|scope| {
64 /// let (input, stream) = scope.new_input();
65 /// stream.probe_with(&mut probe)
66 /// .inspect(|x| println!("hello {:?}", x));
67 ///
68 /// input
69 /// });
70 ///
71 /// // introduce input, advance computation
72 /// for round in 0..10 {
73 /// input.send(round);
74 /// input.advance_to(round + 1);
75 /// worker.step_while(|| probe.less_than(input.time()));
76 /// }
77 /// }).unwrap();
78 /// ```
79 fn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, C>;
80}
81
82impl<G: Scope, C: Container> Probe<G, C> for StreamCore<G, C> {
83 fn probe(&self) -> Handle<G::Timestamp> {
84
85 // the frontier is shared state; scope updates, handle reads.
86 let handle = Handle::<G::Timestamp>::new();
87 self.probe_with(&handle);
88 handle
89 }
90 fn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, C> {
91
92 let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope());
93 let mut input = PullCounter::new(builder.new_input(self, Pipeline));
94 let (tee, stream) = builder.new_output();
95 let mut output = PushCounter::new(tee);
96
97 let shared_frontier = Rc::downgrade(&handle.frontier);
98 let mut started = false;
99
100 builder.build(
101 move |progress| {
102
103 // surface all frontier changes to the shared frontier.
104 if let Some(shared_frontier) = shared_frontier.upgrade() {
105 let mut borrow = shared_frontier.borrow_mut();
106 borrow.update_iter(progress.frontiers[0].drain());
107 }
108
109 if !started {
110 // discard initial capability.
111 progress.internals[0].update(G::Timestamp::minimum(), -1);
112 started = true;
113 }
114
115 while let Some(message) = input.next() {
116 Message::push_at(&mut message.data, message.time.clone(), &mut output);
117 }
118 use timely_communication::Push;
119 output.done();
120
121 // extract what we know about progress from the input and output adapters.
122 input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]);
123 output.produced().borrow_mut().drain_into(&mut progress.produceds[0]);
124
125 false
126 },
127 );
128
129 stream
130 }
131}
132
133/// Reports information about progress at the probe.
134#[derive(Debug)]
135pub struct Handle<T:Timestamp> {
136 frontier: Rc<RefCell<MutableAntichain<T>>>
137}
138
139impl<T: Timestamp> Handle<T> {
140 /// Returns `true` iff the frontier is strictly less than `time`.
141 #[inline] pub fn less_than(&self, time: &T) -> bool { self.frontier.borrow().less_than(time) }
142 /// Returns `true` iff the frontier is less than or equal to `time`.
143 #[inline] pub fn less_equal(&self, time: &T) -> bool { self.frontier.borrow().less_equal(time) }
144 /// Returns `true` iff the frontier is empty.
145 #[inline] pub fn done(&self) -> bool { self.frontier.borrow().is_empty() }
146 /// Allocates a new handle.
147 #[inline] pub fn new() -> Self { Handle { frontier: Rc::new(RefCell::new(MutableAntichain::new())) } }
148
149 /// Invokes a method on the frontier, returning its result.
150 ///
151 /// This method allows inspection of the frontier, which cannot be returned by reference as
152 /// it is on the other side of a `RefCell`.
153 ///
154 /// # Examples
155 ///
156 /// ```
157 /// use timely::dataflow::operators::probe::Handle;
158 ///
159 /// let handle = Handle::<usize>::new();
160 /// let frontier = handle.with_frontier(|frontier| frontier.to_vec());
161 /// ```
162 #[inline]
163 pub fn with_frontier<R, F: FnMut(AntichainRef<T>)->R>(&self, mut function: F) -> R {
164 function(self.frontier.borrow().frontier())
165 }
166}
167
168impl<T: Timestamp> Clone for Handle<T> {
169 fn clone(&self) -> Self {
170 Handle {
171 frontier: Rc::clone(&self.frontier)
172 }
173 }
174}
175
176impl<T> Default for Handle<T>
177where
178 T: Timestamp,
179{
180 fn default() -> Self {
181 Self::new()
182 }
183}
184
185#[cfg(test)]
186mod tests {
187
188 use crate::Config;
189 use crate::dataflow::operators::{Input, Probe};
190
191 #[test]
192 fn probe() {
193
194 // initializes and runs a timely dataflow computation
195 crate::execute(Config::thread(), |worker| {
196
197 // create a new input, and inspect its output
198 let (mut input, probe) = worker.dataflow(move |scope| {
199 let (input, stream) = scope.new_input::<String>();
200 (input, stream.probe())
201 });
202
203 // introduce data and watch!
204 for round in 0..10 {
205 assert!(!probe.done());
206 assert!(probe.less_equal(&round));
207 assert!(probe.less_than(&(round + 1)));
208 input.advance_to(round + 1);
209 worker.step();
210 }
211
212 // seal the input
213 input.close();
214
215 // finish off any remaining work
216 worker.step();
217 worker.step();
218 worker.step();
219 worker.step();
220 assert!(probe.done());
221 }).unwrap();
222 }
223
224}