timely/synchronization/
barrier.rs1use crate::dataflow::{InputHandleVec, ProbeHandle};
4use crate::worker::Worker;
5
6pub struct Barrier {
8 input: InputHandleVec<usize, ()>,
9 probe: ProbeHandle<usize>,
10 worker: Worker,
11}
12
13impl Barrier {
14
15 pub fn new(worker: &mut Worker) -> Self {
17 use crate::dataflow::operators::{Input, Probe};
18 let (input, probe) = worker.dataflow(|scope| {
19 let (handle, stream) = scope.new_input::<Vec<()>>();
20 (handle, stream.probe().0)
21 });
22 Barrier { input, probe, worker: worker.clone() }
23 }
24
25 pub fn wait(&mut self) {
30 self.advance();
31 while !self.reached() {
32 self.worker.step();
33 }
34 }
35
36 #[inline]
40 pub fn advance(&mut self) {
41 let round = *self.input.time();
42 self.input.advance_to(round + 1);
43 }
44
45 #[inline]
49 pub fn reached(&mut self) -> bool {
50 !self.probe.less_than(self.input.time())
51 }
52}