1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
//! Barrier synchronization.
use crate::communication::Allocate;
use crate::dataflow::{InputHandle, ProbeHandle};
use crate::worker::Worker;
/// A re-usable barrier synchronization mechanism.
pub struct Barrier<A: Allocate> {
input: InputHandle<usize, ()>,
probe: ProbeHandle<usize>,
worker: Worker<A>,
}
impl<A: Allocate> Barrier<A> {
/// Allocates a new barrier.
pub fn new(worker: &mut Worker<A>) -> Self {
use crate::dataflow::operators::{Input, Probe};
let (input, probe) = worker.dataflow(|scope| {
let (handle, stream) = scope.new_input::<()>();
(handle, stream.probe())
});
Barrier { input, probe, worker: worker.clone() }
}
/// Blocks until all other workers have reached this barrier.
///
/// This method does *not* block dataflow execution, which continues
/// to execute while we await the arrival of the other workers.
pub fn wait(&mut self) {
self.advance();
while !self.reached() {
self.worker.step();
}
}
/// Advances this worker to the next barrier stage.
///
/// This change is not communicated until `worker.step()` is called.
#[inline]
pub fn advance(&mut self) {
let round = *self.input.time();
self.input.advance_to(round + 1);
}
/// Indicates that the barrier has been reached by all workers.
///
/// This method may not change until `worker.step()` is called.
#[inline]
pub fn reached(&mut self) -> bool {
!self.probe.less_than(self.input.time())
}
}