timely/synchronization/
barrier.rs
1use crate::communication::Allocate;
4use crate::dataflow::{InputHandle, ProbeHandle};
5use crate::worker::Worker;
6
7pub struct Barrier<A: Allocate> {
9 input: InputHandle<usize, ()>,
10 probe: ProbeHandle<usize>,
11 worker: Worker<A>,
12}
13
14impl<A: Allocate> Barrier<A> {
15
16 pub fn new(worker: &mut Worker<A>) -> Self {
18 use crate::dataflow::operators::{Input, Probe};
19 let (input, probe) = worker.dataflow(|scope| {
20 let (handle, stream) = scope.new_input::<()>();
21 (handle, stream.probe())
22 });
23 Barrier { input, probe, worker: worker.clone() }
24 }
25
26 pub fn wait(&mut self) {
31 self.advance();
32 while !self.reached() {
33 self.worker.step();
34 }
35 }
36
37 #[inline]
41 pub fn advance(&mut self) {
42 let round = *self.input.time();
43 self.input.advance_to(round + 1);
44 }
45
46 #[inline]
50 pub fn reached(&mut self) -> bool {
51 !self.probe.less_than(self.input.time())
52 }
53}
54