Skip to main content

timely/dataflow/operators/core/
probe.rs

1//! Monitor progress at a `Stream`.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::progress::Timestamp;
7use crate::progress::frontier::{AntichainRef, MutableAntichain};
8use crate::dataflow::channels::pushers::Counter as PushCounter;
9use crate::dataflow::channels::pact::Pipeline;
10use crate::dataflow::channels::pullers::Counter as PullCounter;
11use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
12
13
14use crate::dataflow::{Stream, Scope};
15use crate::Container;
16use crate::dataflow::channels::Message;
17
18/// Monitors progress at a `Stream`.
19pub trait Probe<G: Scope, C: Container> {
20    /// Constructs a progress probe which indicates which timestamps have elapsed at the operator.
21    ///
22    /// Returns a tuple of a probe handle and the input stream.
23    ///
24    /// # Examples
25    /// ```
26    /// use timely::*;
27    /// use timely::dataflow::Scope;
28    /// use timely::dataflow::operators::{Input, Probe, Inspect};
29    ///
30    /// // construct and execute a timely dataflow
31    /// timely::execute(Config::thread(), |worker| {
32    ///
33    ///     // add an input and base computation off of it
34    ///     let (mut input, probe) = worker.dataflow(|scope| {
35    ///         let (input, stream) = scope.new_input::<Vec<_>>();
36    ///         let (probe, _) = stream.inspect(|x| println!("hello {:?}", x))
37    ///                                .probe();
38    ///         (input, probe)
39    ///     });
40    ///
41    ///     // introduce input, advance computation
42    ///     for round in 0..10 {
43    ///         input.send(round);
44    ///         input.advance_to(round + 1);
45    ///         worker.step_while(|| probe.less_than(input.time()));
46    ///     }
47    /// }).unwrap();
48    /// ```
49    fn probe(self) -> (Handle<G::Timestamp>, Self);
50
51    /// Inserts a progress probe in a stream.
52    ///
53    /// # Examples
54    /// ```
55    /// use timely::*;
56    /// use timely::dataflow::Scope;
57    /// use timely::dataflow::operators::{Input, Probe, Inspect};
58    /// use timely::dataflow::operators::probe::Handle;
59    ///
60    /// // construct and execute a timely dataflow
61    /// timely::execute(Config::thread(), |worker| {
62    ///
63    ///     // add an input and base computation off of it
64    ///     let mut probe = Handle::new();
65    ///     let mut input = worker.dataflow(|scope| {
66    ///         let (input, stream) = scope.new_input::<Vec<_>>();
67    ///         stream.probe_with(&mut probe)
68    ///               .inspect(|x| println!("hello {:?}", x));
69    ///
70    ///         input
71    ///     });
72    ///
73    ///     // introduce input, advance computation
74    ///     for round in 0..10 {
75    ///         input.send(round);
76    ///         input.advance_to(round + 1);
77    ///         worker.step_while(|| probe.less_than(input.time()));
78    ///     }
79    /// }).unwrap();
80    /// ```
81    fn probe_with(self, handle: &Handle<G::Timestamp>) -> Stream<G, C>;
82}
83
84impl<G: Scope, C: Container> Probe<G, C> for Stream<G, C> {
85    fn probe(self) -> (Handle<G::Timestamp>, Self) {
86
87        // the frontier is shared state; scope updates, handle reads.
88        let handle = Handle::<G::Timestamp>::new();
89        let stream = self.probe_with(&handle);
90        (handle, stream)
91    }
92    fn probe_with(self, handle: &Handle<G::Timestamp>) -> Stream<G, C> {
93
94        let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope());
95        let mut input = PullCounter::new(builder.new_input(self, Pipeline));
96        let (tee, stream) = builder.new_output();
97        let mut output = PushCounter::new(tee);
98
99        // Conservatively introduce a minimal time to the handle.
100        // This will be relaxed when the operator is first scheduled and can see its frontier.
101        handle.frontier.borrow_mut().update_iter(std::iter::once((Timestamp::minimum(), 1)));
102
103        let shared_frontier = Rc::downgrade(&handle.frontier);
104        let mut started = false;
105
106        builder.build(
107            move |progress| {
108
109                // Mirror presented frontier changes into the shared handle.
110                if let Some(shared_frontier) = shared_frontier.upgrade() {
111                    let mut borrow = shared_frontier.borrow_mut();
112                    borrow.update_iter(progress.frontiers[0].drain());
113                }
114
115                // At initialization, we have a few tasks.
116                if !started {
117                    // We must discard the capability held by `OpereratorCore`.
118                    progress.internals[0].update(G::Timestamp::minimum(), -1);
119                    // We must retract the conservative hold in the shared handle.
120                    if let Some(shared_frontier) = shared_frontier.upgrade() {
121                        let mut borrow = shared_frontier.borrow_mut();
122                        borrow.update_iter(std::iter::once((Timestamp::minimum(), -1)));
123                    }
124                    started = true;
125                }
126
127                while let Some(message) = input.next() {
128                    Message::push_at(&mut message.data, message.time.clone(), &mut output);
129                }
130                use timely_communication::Push;
131                output.done();
132
133                // extract what we know about progress from the input and output adapters.
134                input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]);
135                output.produced().borrow_mut().drain_into(&mut progress.produceds[0]);
136
137                false
138            },
139        );
140
141        stream
142    }
143}
144
145/// Reports information about progress at the probe.
146#[derive(Debug)]
147pub struct Handle<T:Timestamp> {
148    frontier: Rc<RefCell<MutableAntichain<T>>>
149}
150
151impl<T: Timestamp> Handle<T> {
152    /// Returns `true` iff the frontier is strictly less than `time`.
153    #[inline] pub fn less_than(&self, time: &T) -> bool { self.frontier.borrow().less_than(time) }
154    /// Returns `true` iff the frontier is less than or equal to `time`.
155    #[inline] pub fn less_equal(&self, time: &T) -> bool { self.frontier.borrow().less_equal(time) }
156    /// Returns `true` iff the frontier is empty.
157    #[inline] pub fn done(&self) -> bool { self.frontier.borrow().is_empty() }
158    /// Allocates a new handle.
159    #[inline] pub fn new() -> Self { Handle { frontier: Rc::new(RefCell::new(MutableAntichain::new())) } }
160
161    /// Invokes a method on the frontier, returning its result.
162    ///
163    /// This method allows inspection of the frontier, which cannot be returned by reference as
164    /// it is on the other side of a `RefCell`.
165    ///
166    /// # Examples
167    ///
168    /// ```
169    /// use timely::dataflow::operators::probe::Handle;
170    ///
171    /// let handle = Handle::<usize>::new();
172    /// let frontier = handle.with_frontier(|frontier| frontier.to_vec());
173    /// ```
174    #[inline]
175    pub fn with_frontier<R, F: FnMut(AntichainRef<T>)->R>(&self, mut function: F) -> R {
176        function(self.frontier.borrow().frontier())
177    }
178}
179
180impl<T: Timestamp> Clone for Handle<T> {
181    fn clone(&self) -> Self {
182        Handle {
183            frontier: Rc::clone(&self.frontier)
184        }
185    }
186}
187
188impl<T> Default for Handle<T>
189where
190    T: Timestamp,
191{
192    fn default() -> Self {
193        Self::new()
194    }
195}
196
197#[cfg(test)]
198mod tests {
199
200    use crate::Config;
201    use crate::dataflow::operators::{Input, Probe};
202
203    #[test]
204    fn probe() {
205
206        // initializes and runs a timely dataflow computation
207        crate::execute(Config::thread(), |worker| {
208
209            // create a new input, and inspect its output
210            let (mut input, probe) = worker.dataflow(move |scope| {
211                let (input, stream) = scope.new_input::<Vec<String>>();
212                (input, stream.probe().0)
213            });
214
215            // introduce data and watch!
216            for round in 0..10 {
217                assert!(!probe.done());
218                assert!(probe.less_equal(&round));
219                assert!(probe.less_than(&(round + 1)));
220                input.advance_to(round + 1);
221                worker.step();
222            }
223
224            // seal the input
225            input.close();
226
227            // finish off any remaining work
228            worker.step();
229            worker.step();
230            worker.step();
231            worker.step();
232            assert!(probe.done());
233        }).unwrap();
234    }
235
236}