Skip to main content

timely/synchronization/
barrier.rs

1//! Barrier synchronization.
2
3use crate::dataflow::{InputHandleVec, ProbeHandle};
4use crate::worker::Worker;
5
6/// A re-usable barrier synchronization mechanism.
7pub struct Barrier {
8    input: InputHandleVec<usize, ()>,
9    probe: ProbeHandle<usize>,
10    worker: Worker,
11}
12
13impl Barrier {
14
15    /// Allocates a new barrier.
16    pub fn new(worker: &mut Worker) -> Self {
17        use crate::dataflow::operators::{Input, Probe};
18        let (input, probe) = worker.dataflow(|scope| {
19            let (handle, stream) = scope.new_input::<Vec<()>>();
20            (handle, stream.probe().0)
21        });
22        Barrier { input, probe, worker: worker.clone() }
23    }
24
25    /// Blocks until all other workers have reached this barrier.
26    ///
27    /// This method does *not* block dataflow execution, which continues
28    /// to execute while we await the arrival of the other workers.
29    pub fn wait(&mut self) {
30        self.advance();
31        while !self.reached() {
32            self.worker.step();
33        }
34    }
35
36    /// Advances this worker to the next barrier stage.
37    ///
38    /// This change is not communicated until `worker.step()` is called.
39    #[inline]
40    pub fn advance(&mut self) {
41        let round = *self.input.time();
42        self.input.advance_to(round + 1);
43    }
44
45    /// Indicates that the barrier has been reached by all workers.
46    ///
47    /// This method may not change until `worker.step()` is called.
48    #[inline]
49    pub fn reached(&mut self) -> bool {
50        !self.probe.less_than(self.input.time())
51    }
52}