timely/synchronization/
barrier.rs

1//! Barrier synchronization.
2
3use crate::communication::Allocate;
4use crate::dataflow::{InputHandle, ProbeHandle};
5use crate::worker::Worker;
6
7/// A re-usable barrier synchronization mechanism.
8pub struct Barrier<A: Allocate> {
9    input: InputHandle<usize, ()>,
10    probe: ProbeHandle<usize>,
11    worker: Worker<A>,
12}
13
14impl<A: Allocate> Barrier<A> {
15
16    /// Allocates a new barrier.
17    pub fn new(worker: &mut Worker<A>) -> Self {
18        use crate::dataflow::operators::{Input, Probe};
19        let (input, probe) = worker.dataflow(|scope| {
20            let (handle, stream) = scope.new_input::<()>();
21            (handle, stream.probe())
22        });
23        Barrier { input, probe, worker: worker.clone() }
24    }
25
26    /// Blocks until all other workers have reached this barrier.
27    ///
28    /// This method does *not* block dataflow execution, which continues
29    /// to execute while we await the arrival of the other workers.
30    pub fn wait(&mut self) {
31        self.advance();
32        while !self.reached() {
33            self.worker.step();
34        }
35    }
36
37    /// Advances this worker to the next barrier stage.
38    ///
39    /// This change is not communicated until `worker.step()` is called.
40    #[inline]
41    pub fn advance(&mut self) {
42        let round = *self.input.time();
43        self.input.advance_to(round + 1);
44    }
45
46    /// Indicates that the barrier has been reached by all workers.
47    ///
48    /// This method may not change until `worker.step()` is called.
49    #[inline]
50    pub fn reached(&mut self) -> bool {
51        !self.probe.less_than(self.input.time())
52    }
53}
54