Skip to main content

timely/synchronization/
sequence.rs

1//! A shared ordered log.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::time::{Instant, Duration};
6use std::collections::VecDeque;
7
8use crate::{ExchangeData, PartialOrder};
9use crate::worker::Worker;
10use crate::dataflow::channels::pact::Exchange;
11use crate::dataflow::operators::generic::operator::source;
12use crate::dataflow::operators::generic::operator::Operator;
13use crate::scheduling::activate::Activator;
14
15// A Sequencer needs all operators firing with high frequency, because
16// it uses the timer to gauge progress. If other workers cease
17// advancing their own capabilities, although they might receive a
18// record they may not actually tick forward their own source clocks,
19// and no one will actually form the sequence.
20//
21// A CatchupActivator is an activator with an optional timestamp
22// attached. This allows us to represent a special state, where after
23// receiving an action from another worker, each of the other workers
24// will keep scheduling its source operator, until its capability
25// timestamp exceeds the greatest timestamp that the sink has
26// received.
27//
28// This allows operators to go quiet again until a new requests shows
29// up. The operators lose the ability to confirm that nothing is
30// scheduled for a particular time (they could request this with a
31// no-op event bearing a timestamp), but everyone still sees the same
32// sequence.
33struct CatchupActivator {
34    pub catchup_until: Option<Duration>,
35    activator: Activator,
36}
37
38impl CatchupActivator {
39    pub fn activate(&self) {
40        self.activator.activate();
41    }
42}
43
44/// Orders elements inserted across all workers.
45///
46/// A Sequencer allows each worker to insert into a consistent ordered
47/// sequence that is seen by all workers in the same order.
48pub struct Sequencer<T> {
49    activator: Rc<RefCell<Option<CatchupActivator>>>,
50    send: Rc<RefCell<VecDeque<T>>>, // proposed items.
51    recv: Rc<RefCell<VecDeque<T>>>, // sequenced items.
52}
53
54impl<T: ExchangeData+Clone> Sequencer<T> {
55
56    /// Creates a new Sequencer.
57    ///
58    /// The `timer` instant is used to synchronize the workers, who use this
59    /// elapsed time as their timestamp. Elements are ordered by this time,
60    /// and cannot be made visible until all workers have reached the time.
61    ///
62    /// # Examples
63    ///
64    /// ```rust
65    /// use std::time::{Instant, Duration};
66    ///
67    /// use timely::Config;
68    /// use timely::synchronization::Sequencer;
69    ///
70    /// timely::execute(Config::process(4), |worker| {
71    ///     let timer = Instant::now();
72    ///     let mut sequencer = Sequencer::new(worker, timer);
73    ///
74    ///     for round in 0 .. 10 {
75    ///
76    ///         // Sleep, and then send an announcement on wake-up.
77    ///         std::thread::sleep(Duration::from_millis(1 + worker.index() as u64));
78    ///         sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
79    ///
80    ///         // Ensures the pushed string is sent.
81    ///         worker.step();
82    ///
83    ///         // Read out received announcements.
84    ///         while let Some(element) = sequencer.next() {
85    ///             println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
86    ///         }
87    ///     }
88    /// }).expect("Timely computation did not complete correctly.");
89    /// ```
90    pub fn new(worker: &mut Worker, timer: Instant) -> Self {
91        Sequencer::preloaded(worker, timer, VecDeque::new())
92    }
93
94    /// Creates a new Sequencer preloaded with a queue of
95    /// elements.
96    pub fn preloaded(worker: &mut Worker, timer: Instant, preload: VecDeque<T>) -> Self {
97
98        let send: Rc<RefCell<VecDeque<T>>> = Rc::new(RefCell::new(VecDeque::new()));
99        let recv = Rc::new(RefCell::new(preload));
100        let send_weak = Rc::downgrade(&send);
101        let recv_weak = Rc::downgrade(&recv);
102
103        // The SequenceInput activator will be held by the sequencer,
104        // by the operator itself, and by the sink operator. We can
105        // only initialize the activator once we obtain the operator
106        // address.
107        let activator = Rc::new(RefCell::new(None));
108        let activator_source = Rc::clone(&activator);
109        let activator_sink = Rc::clone(&activator);
110
111        // build a dataflow used to serialize and circulate commands
112        worker.dataflow::<Duration,_,_>(move |scope| {
113
114            let peers = scope.peers();
115
116            let mut recvd = Vec::new();
117
118            // monotonic counter to maintain per-worker total order.
119            let mut counter = 0;
120
121            // a source that attempts to pull from `recv` and produce commands for everyone
122            source(scope, "SequenceInput", move |capability, info| {
123
124                // initialize activator, now that we have the address
125                activator_source
126                    .borrow_mut()
127                    .replace(CatchupActivator {
128                        activator: scope.activator_for(info.address),
129                        catchup_until: None,
130                    });
131
132                // so we can drop, if input queue vanishes.
133                let mut capability = Some(capability);
134
135                // closure broadcasts any commands it grabs.
136                move |output| {
137
138                    if let Some(send_queue) = send_weak.upgrade() {
139
140                        // capability *should* still be non-None.
141                        let capability = capability.as_mut().expect("Capability unavailable");
142
143                        // downgrade capability to current time.
144                        capability.downgrade(&timer.elapsed());
145
146                        // drain and broadcast `send`.
147                        let mut session = output.session(&capability);
148                        let mut borrow = send_queue.borrow_mut();
149                        for element in borrow.drain(..) {
150                            for worker_index in 0 .. peers {
151                                session.give((worker_index, counter, element.clone()));
152                            }
153                            counter += 1;
154                        }
155
156                        let mut activator_borrow = activator_source.borrow_mut();
157                        let activator = activator_borrow.as_mut().unwrap();
158
159                        if let Some(t) = activator.catchup_until {
160                            if capability.time().less_than(&t) {
161                                activator.activate();
162                            } else {
163                                activator.catchup_until = None;
164                            }
165                        }
166                    } else {
167                        capability = None;
168                    }
169                }
170            })
171            .sink(
172                Exchange::new(|x: &(usize, usize, T)| x.0 as u64),
173                "SequenceOutput",
174                move |(input, frontier)| {
175
176                    // grab each command and queue it up
177                    input.for_each_time(|time, data| {
178                        for (worker, counter, element) in data.flat_map(|d| d.drain(..)) {
179                            recvd.push(((*time.time(), worker, counter), element));
180                        }
181                    });
182
183                    recvd.sort_unstable_by(|x,y| x.0.cmp(&y.0));
184
185                    if let Some(last) = recvd.last() {
186                        let mut activator_borrow = activator_sink.borrow_mut();
187                        let activator = activator_borrow.as_mut().unwrap();
188
189                        activator.catchup_until = Some((last.0).0);
190                        activator.activate();
191                    }
192
193                    // determine how many (which) elements to read from `recvd`.
194                    let count = recvd.iter().filter(|&((ref time, _, _), _)| !frontier.less_equal(time)).count();
195                    let iter = recvd.drain(..count);
196
197                    if let Some(recv_queue) = recv_weak.upgrade() {
198                        recv_queue.borrow_mut().extend(iter.map(|(_,elem)| elem));
199                    }
200                }
201            );
202        });
203
204        Sequencer { activator, send, recv, }
205    }
206
207    /// Adds an element to the shared log.
208    pub fn push(&mut self, element: T) {
209        self.send.borrow_mut().push_back(element);
210        self.activator.borrow_mut().as_mut().unwrap().activate();
211    }
212}
213
214impl<T> Iterator for Sequencer<T> {
215    type Item = T;
216    fn next(&mut self) -> Option<T> {
217        self.recv.borrow_mut().pop_front()
218    }
219}
220
221// We should activate on drop, as this will cause the source to drop its capability.
222impl<T> Drop for Sequencer<T> {
223    fn drop(&mut self) {
224        self.activator
225            .borrow()
226            .as_ref()
227            .expect("Sequencer.activator unavailable")
228            .activate()
229    }
230}