timely/synchronization/sequence.rs
1//! A shared ordered log.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::time::{Instant, Duration};
6use std::collections::VecDeque;
7
8use crate::{ExchangeData, PartialOrder};
9use crate::worker::Worker;
10use crate::dataflow::channels::pact::Exchange;
11use crate::dataflow::operators::generic::operator::source;
12use crate::dataflow::operators::generic::operator::Operator;
13use crate::scheduling::activate::Activator;
14
15// A Sequencer needs all operators firing with high frequency, because
16// it uses the timer to gauge progress. If other workers cease
17// advancing their own capabilities, although they might receive a
18// record they may not actually tick forward their own source clocks,
19// and no one will actually form the sequence.
20//
21// A CatchupActivator is an activator with an optional timestamp
22// attached. This allows us to represent a special state, where after
23// receiving an action from another worker, each of the other workers
24// will keep scheduling its source operator, until its capability
25// timestamp exceeds the greatest timestamp that the sink has
26// received.
27//
28// This allows operators to go quiet again until a new requests shows
29// up. The operators lose the ability to confirm that nothing is
30// scheduled for a particular time (they could request this with a
31// no-op event bearing a timestamp), but everyone still sees the same
32// sequence.
33struct CatchupActivator {
34 pub catchup_until: Option<Duration>,
35 activator: Activator,
36}
37
38impl CatchupActivator {
39 pub fn activate(&self) {
40 self.activator.activate();
41 }
42}
43
44/// Orders elements inserted across all workers.
45///
46/// A Sequencer allows each worker to insert into a consistent ordered
47/// sequence that is seen by all workers in the same order.
48pub struct Sequencer<T> {
49 activator: Rc<RefCell<Option<CatchupActivator>>>,
50 send: Rc<RefCell<VecDeque<T>>>, // proposed items.
51 recv: Rc<RefCell<VecDeque<T>>>, // sequenced items.
52}
53
54impl<T: ExchangeData+Clone> Sequencer<T> {
55
56 /// Creates a new Sequencer.
57 ///
58 /// The `timer` instant is used to synchronize the workers, who use this
59 /// elapsed time as their timestamp. Elements are ordered by this time,
60 /// and cannot be made visible until all workers have reached the time.
61 ///
62 /// # Examples
63 ///
64 /// ```rust
65 /// use std::time::{Instant, Duration};
66 ///
67 /// use timely::Config;
68 /// use timely::synchronization::Sequencer;
69 ///
70 /// timely::execute(Config::process(4), |worker| {
71 /// let timer = Instant::now();
72 /// let mut sequencer = Sequencer::new(worker, timer);
73 ///
74 /// for round in 0 .. 10 {
75 ///
76 /// // Sleep, and then send an announcement on wake-up.
77 /// std::thread::sleep(Duration::from_millis(1 + worker.index() as u64));
78 /// sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
79 ///
80 /// // Ensures the pushed string is sent.
81 /// worker.step();
82 ///
83 /// // Read out received announcements.
84 /// while let Some(element) = sequencer.next() {
85 /// println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
86 /// }
87 /// }
88 /// }).expect("Timely computation did not complete correctly.");
89 /// ```
90 pub fn new(worker: &mut Worker, timer: Instant) -> Self {
91 Sequencer::preloaded(worker, timer, VecDeque::new())
92 }
93
94 /// Creates a new Sequencer preloaded with a queue of
95 /// elements.
96 pub fn preloaded(worker: &mut Worker, timer: Instant, preload: VecDeque<T>) -> Self {
97
98 let send: Rc<RefCell<VecDeque<T>>> = Rc::new(RefCell::new(VecDeque::new()));
99 let recv = Rc::new(RefCell::new(preload));
100 let send_weak = Rc::downgrade(&send);
101 let recv_weak = Rc::downgrade(&recv);
102
103 // The SequenceInput activator will be held by the sequencer,
104 // by the operator itself, and by the sink operator. We can
105 // only initialize the activator once we obtain the operator
106 // address.
107 let activator = Rc::new(RefCell::new(None));
108 let activator_source = Rc::clone(&activator);
109 let activator_sink = Rc::clone(&activator);
110
111 // build a dataflow used to serialize and circulate commands
112 worker.dataflow::<Duration,_,_>(move |scope| {
113
114 let peers = scope.peers();
115
116 let mut recvd = Vec::new();
117
118 // monotonic counter to maintain per-worker total order.
119 let mut counter = 0;
120
121 // a source that attempts to pull from `recv` and produce commands for everyone
122 source(scope, "SequenceInput", move |capability, info| {
123
124 // initialize activator, now that we have the address
125 activator_source
126 .borrow_mut()
127 .replace(CatchupActivator {
128 activator: scope.activator_for(info.address),
129 catchup_until: None,
130 });
131
132 // so we can drop, if input queue vanishes.
133 let mut capability = Some(capability);
134
135 // closure broadcasts any commands it grabs.
136 move |output| {
137
138 if let Some(send_queue) = send_weak.upgrade() {
139
140 // capability *should* still be non-None.
141 let capability = capability.as_mut().expect("Capability unavailable");
142
143 // downgrade capability to current time.
144 capability.downgrade(&timer.elapsed());
145
146 // drain and broadcast `send`.
147 let mut session = output.session(&capability);
148 let mut borrow = send_queue.borrow_mut();
149 for element in borrow.drain(..) {
150 for worker_index in 0 .. peers {
151 session.give((worker_index, counter, element.clone()));
152 }
153 counter += 1;
154 }
155
156 let mut activator_borrow = activator_source.borrow_mut();
157 let activator = activator_borrow.as_mut().unwrap();
158
159 if let Some(t) = activator.catchup_until {
160 if capability.time().less_than(&t) {
161 activator.activate();
162 } else {
163 activator.catchup_until = None;
164 }
165 }
166 } else {
167 capability = None;
168 }
169 }
170 })
171 .sink(
172 Exchange::new(|x: &(usize, usize, T)| x.0 as u64),
173 "SequenceOutput",
174 move |(input, frontier)| {
175
176 // grab each command and queue it up
177 input.for_each_time(|time, data| {
178 for (worker, counter, element) in data.flat_map(|d| d.drain(..)) {
179 recvd.push(((*time.time(), worker, counter), element));
180 }
181 });
182
183 recvd.sort_unstable_by(|x,y| x.0.cmp(&y.0));
184
185 if let Some(last) = recvd.last() {
186 let mut activator_borrow = activator_sink.borrow_mut();
187 let activator = activator_borrow.as_mut().unwrap();
188
189 activator.catchup_until = Some((last.0).0);
190 activator.activate();
191 }
192
193 // determine how many (which) elements to read from `recvd`.
194 let count = recvd.iter().filter(|&((ref time, _, _), _)| !frontier.less_equal(time)).count();
195 let iter = recvd.drain(..count);
196
197 if let Some(recv_queue) = recv_weak.upgrade() {
198 recv_queue.borrow_mut().extend(iter.map(|(_,elem)| elem));
199 }
200 }
201 );
202 });
203
204 Sequencer { activator, send, recv, }
205 }
206
207 /// Adds an element to the shared log.
208 pub fn push(&mut self, element: T) {
209 self.send.borrow_mut().push_back(element);
210 self.activator.borrow_mut().as_mut().unwrap().activate();
211 }
212}
213
214impl<T> Iterator for Sequencer<T> {
215 type Item = T;
216 fn next(&mut self) -> Option<T> {
217 self.recv.borrow_mut().pop_front()
218 }
219}
220
221// We should activate on drop, as this will cause the source to drop its capability.
222impl<T> Drop for Sequencer<T> {
223 fn drop(&mut self) {
224 self.activator
225 .borrow()
226 .as_ref()
227 .expect("Sequencer.activator unavailable")
228 .activate()
229 }
230}