timely/synchronization/
sequence.rs

1//! A shared ordered log.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::time::{Instant, Duration};
6use std::collections::VecDeque;
7
8use crate::{communication::Allocate, ExchangeData, PartialOrder};
9use crate::scheduling::Scheduler;
10use crate::worker::Worker;
11use crate::dataflow::channels::pact::Exchange;
12use crate::dataflow::operators::generic::operator::source;
13use crate::dataflow::operators::generic::operator::Operator;
14use crate::scheduling::activate::Activator;
15
16// A Sequencer needs all operators firing with high frequency, because
17// it uses the timer to gauge progress. If other workers cease
18// advancing their own capabilities, although they might receive a
19// record they may not actually tick forward their own source clocks,
20// and no one will actually form the sequence.
21//
22// A CatchupActivator is an activator with an optional timestamp
23// attached. This allows us to represent a special state, where after
24// receiving an action from another worker, each of the other workers
25// will keep scheduling its source operator, until its capability
26// timestamp exceeds the greatest timestamp that the sink has
27// received.
28//
29// This allows operators to go quiet again until a new requests shows
30// up. The operators lose the ability to confirm that nothing is
31// scheduled for a particular time (they could request this with a
32// no-op event bearing a timestamp), but everyone still sees the same
33// sequence.
34struct CatchupActivator {
35    pub catchup_until: Option<Duration>,
36    activator: Activator,
37}
38
39impl CatchupActivator {
40    pub fn activate(&self) {
41        self.activator.activate();
42    }
43}
44
45/// Orders elements inserted across all workers.
46///
47/// A Sequencer allows each worker to insert into a consistent ordered
48/// sequence that is seen by all workers in the same order.
49pub struct Sequencer<T> {
50    activator: Rc<RefCell<Option<CatchupActivator>>>,
51    send: Rc<RefCell<VecDeque<T>>>, // proposed items.
52    recv: Rc<RefCell<VecDeque<T>>>, // sequenced items.
53}
54
55impl<T: ExchangeData> Sequencer<T> {
56
57    /// Creates a new Sequencer.
58    ///
59    /// The `timer` instant is used to synchronize the workers, who use this
60    /// elapsed time as their timestamp. Elements are ordered by this time,
61    /// and cannot be made visible until all workers have reached the time.
62    ///
63    /// # Examples
64    ///
65    /// ```rust
66    /// use std::time::{Instant, Duration};
67    ///
68    /// use timely::Config;
69    /// use timely::synchronization::Sequencer;
70    ///
71    /// timely::execute(Config::process(4), |worker| {
72    ///     let timer = Instant::now();
73    ///     let mut sequencer = Sequencer::new(worker, timer);
74    ///
75    ///     for round in 0 .. 10 {
76    ///
77    ///         // Sleep, and then send an announcement on wake-up.
78    ///         std::thread::sleep(Duration::from_millis(1 + worker.index() as u64));
79    ///         sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
80    ///
81    ///         // Ensures the pushed string is sent.
82    ///         worker.step();
83    ///
84    ///         // Read out received announcements.
85    ///         while let Some(element) = sequencer.next() {
86    ///             println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
87    ///         }
88    ///     }
89    /// }).expect("Timely computation did not complete correctly.");
90    /// ```
91    pub fn new<A: Allocate>(worker: &mut Worker<A>, timer: Instant) -> Self {
92        Sequencer::preloaded(worker, timer, VecDeque::new())
93    }
94
95    /// Creates a new Sequencer preloaded with a queue of
96    /// elements.
97    pub fn preloaded<A: Allocate>(worker: &mut Worker<A>, timer: Instant, preload: VecDeque<T>) -> Self {
98
99        let send: Rc<RefCell<VecDeque<T>>> = Rc::new(RefCell::new(VecDeque::new()));
100        let recv = Rc::new(RefCell::new(preload));
101        let send_weak = Rc::downgrade(&send);
102        let recv_weak = Rc::downgrade(&recv);
103
104        // The SequenceInput activator will be held by the sequencer,
105        // by the operator itself, and by the sink operator. We can
106        // only initialize the activator once we obtain the operator
107        // address.
108        let activator = Rc::new(RefCell::new(None));
109        let activator_source = Rc::clone(&activator);
110        let activator_sink = Rc::clone(&activator);
111
112        // build a dataflow used to serialize and circulate commands
113        worker.dataflow::<Duration,_,_>(move |dataflow| {
114
115            let scope = dataflow.clone();
116            let peers = dataflow.peers();
117
118            let mut recvd = Vec::new();
119
120            // monotonic counter to maintain per-worker total order.
121            let mut counter = 0;
122
123            // a source that attempts to pull from `recv` and produce commands for everyone
124            source(dataflow, "SequenceInput", move |capability, info| {
125
126                // initialize activator, now that we have the address
127                activator_source
128                    .borrow_mut()
129                    .replace(CatchupActivator {
130                        activator: scope.activator_for(info.address),
131                        catchup_until: None,
132                    });
133
134                // so we can drop, if input queue vanishes.
135                let mut capability = Some(capability);
136
137                // closure broadcasts any commands it grabs.
138                move |output| {
139
140                    if let Some(send_queue) = send_weak.upgrade() {
141
142                        // capability *should* still be non-None.
143                        let capability = capability.as_mut().expect("Capability unavailable");
144
145                        // downgrade capability to current time.
146                        capability.downgrade(&timer.elapsed());
147
148                        // drain and broadcast `send`.
149                        let mut session = output.session(&capability);
150                        let mut borrow = send_queue.borrow_mut();
151                        for element in borrow.drain(..) {
152                            for worker_index in 0 .. peers {
153                                session.give((worker_index, counter, element.clone()));
154                            }
155                            counter += 1;
156                        }
157
158                        let mut activator_borrow = activator_source.borrow_mut();
159                        let activator = activator_borrow.as_mut().unwrap();
160
161                        if let Some(t) = activator.catchup_until {
162                            if capability.time().less_than(&t) {
163                                activator.activate();
164                            } else {
165                                activator.catchup_until = None;
166                            }
167                        }
168                    } else {
169                        capability = None;
170                    }
171                }
172            })
173            .sink(
174                Exchange::new(|x: &(usize, usize, T)| x.0 as u64),
175                "SequenceOutput",
176                move |(input, frontier)| {
177
178                    // grab each command and queue it up
179                    input.for_each_time(|time, data| {
180                        for (worker, counter, element) in data.flat_map(|d| d.drain(..)) {
181                            recvd.push(((*time.time(), worker, counter), element));
182                        }
183                    });
184
185                    recvd.sort_by(|x,y| x.0.cmp(&y.0));
186
187                    if let Some(last) = recvd.last() {
188                        let mut activator_borrow = activator_sink.borrow_mut();
189                        let activator = activator_borrow.as_mut().unwrap();
190
191                        activator.catchup_until = Some((last.0).0);
192                        activator.activate();
193                    }
194
195                    // determine how many (which) elements to read from `recvd`.
196                    let count = recvd.iter().filter(|&((ref time, _, _), _)| !frontier.less_equal(time)).count();
197                    let iter = recvd.drain(..count);
198
199                    if let Some(recv_queue) = recv_weak.upgrade() {
200                        recv_queue.borrow_mut().extend(iter.map(|(_,elem)| elem));
201                    }
202                }
203            );
204        });
205
206        Sequencer { activator, send, recv, }
207    }
208
209    /// Adds an element to the shared log.
210    pub fn push(&mut self, element: T) {
211        self.send.borrow_mut().push_back(element);
212        self.activator.borrow_mut().as_mut().unwrap().activate();
213    }
214}
215
216impl<T> Iterator for Sequencer<T> {
217    type Item = T;
218    fn next(&mut self) -> Option<T> {
219        self.recv.borrow_mut().pop_front()
220    }
221}
222
223// We should activate on drop, as this will cause the source to drop its capability.
224impl<T> Drop for Sequencer<T> {
225    fn drop(&mut self) {
226        self.activator
227            .borrow()
228            .as_ref()
229            .expect("Sequencer.activator unavailable")
230            .activate()
231    }
232}