1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
//! A shared ordered log.
use std::rc::Rc;
use std::cell::RefCell;
use std::time::{Instant, Duration};
use std::collections::VecDeque;
use crate::{communication::Allocate, ExchangeData, PartialOrder};
use crate::scheduling::Scheduler;
use crate::worker::Worker;
use crate::dataflow::channels::pact::Exchange;
use crate::dataflow::operators::generic::operator::source;
use crate::dataflow::operators::generic::operator::Operator;
use crate::scheduling::activate::Activator;
// A Sequencer needs all operators firing with high frequency, because
// it uses the timer to gauge progress. If other workers cease
// advancing their own capabilities, although they might receive a
// record they may not actually tick forward their own source clocks,
// and no one will actually form the sequence.
//
// A CatchupActivator is an activator with an optional timestamp
// attached. This allows us to represent a special state, where after
// receiving an action from another worker, each of the other workers
// will keep scheduling its source operator, until its capability
// timestamp exceeds the greatest timestamp that the sink has
// received.
//
// This allows operators to go quiet again until a new requests shows
// up. The operators lose the ability to confirm that nothing is
// scheduled for a particular time (they could request this with a
// no-op event bearing a timestamp), but everyone still sees the same
// sequence.
struct CatchupActivator {
pub catchup_until: Option<Duration>,
activator: Activator,
}
impl CatchupActivator {
pub fn activate(&self) {
self.activator.activate();
}
}
/// Orders elements inserted across all workers.
///
/// A Sequencer allows each worker to insert into a consistent ordered
/// sequence that is seen by all workers in the same order.
pub struct Sequencer<T> {
activator: Rc<RefCell<Option<CatchupActivator>>>,
send: Rc<RefCell<VecDeque<T>>>, // proposed items.
recv: Rc<RefCell<VecDeque<T>>>, // sequenced items.
}
impl<T: ExchangeData> Sequencer<T> {
/// Creates a new Sequencer.
///
/// The `timer` instant is used to synchronize the workers, who use this
/// elapsed time as their timestamp. Elements are ordered by this time,
/// and cannot be made visible until all workers have reached the time.
///
/// # Examples
///
/// ```rust
/// use std::time::{Instant, Duration};
///
/// use timely::Config;
/// use timely::synchronization::Sequencer;
///
/// timely::execute(Config::process(4), |worker| {
/// let timer = Instant::now();
/// let mut sequencer = Sequencer::new(worker, timer);
///
/// for round in 0 .. 10 {
///
/// // Sleep, and then send an announcement on wake-up.
/// std::thread::sleep(Duration::from_millis(1 + worker.index() as u64));
/// sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
///
/// // Ensures the pushed string is sent.
/// worker.step();
///
/// // Read out received announcements.
/// while let Some(element) = sequencer.next() {
/// println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
/// }
/// }
/// }).expect("Timely computation did not complete correctly.");
/// ```
pub fn new<A: Allocate>(worker: &mut Worker<A>, timer: Instant) -> Self {
Sequencer::preloaded(worker, timer, VecDeque::new())
}
/// Creates a new Sequencer preloaded with a queue of
/// elements.
pub fn preloaded<A: Allocate>(worker: &mut Worker<A>, timer: Instant, preload: VecDeque<T>) -> Self {
let send: Rc<RefCell<VecDeque<T>>> = Rc::new(RefCell::new(VecDeque::new()));
let recv = Rc::new(RefCell::new(preload));
let send_weak = Rc::downgrade(&send);
let recv_weak = Rc::downgrade(&recv);
// The SequenceInput activator will be held by the sequencer,
// by the operator itself, and by the sink operator. We can
// only initialize the activator once we obtain the operator
// address.
let activator = Rc::new(RefCell::new(None));
let activator_source = activator.clone();
let activator_sink = activator.clone();
// build a dataflow used to serialize and circulate commands
worker.dataflow::<Duration,_,_>(move |dataflow| {
let scope = dataflow.clone();
let peers = dataflow.peers();
let mut recvd = Vec::new();
// monotonic counter to maintain per-worker total order.
let mut counter = 0;
// a source that attempts to pull from `recv` and produce commands for everyone
source(dataflow, "SequenceInput", move |capability, info| {
// initialize activator, now that we have the address
activator_source
.borrow_mut()
.replace(CatchupActivator {
activator: scope.activator_for(info.address),
catchup_until: None,
});
// so we can drop, if input queue vanishes.
let mut capability = Some(capability);
// closure broadcasts any commands it grabs.
move |output| {
if let Some(send_queue) = send_weak.upgrade() {
// capability *should* still be non-None.
let capability = capability.as_mut().expect("Capability unavailable");
// downgrade capability to current time.
capability.downgrade(&timer.elapsed());
// drain and broadcast `send`.
let mut session = output.session(&capability);
let mut borrow = send_queue.borrow_mut();
for element in borrow.drain(..) {
for worker_index in 0 .. peers {
session.give((worker_index, counter, element.clone()));
}
counter += 1;
}
let mut activator_borrow = activator_source.borrow_mut();
let activator = activator_borrow.as_mut().unwrap();
if let Some(t) = activator.catchup_until {
if capability.time().less_than(&t) {
activator.activate();
} else {
activator.catchup_until = None;
}
}
} else {
capability = None;
}
}
})
.sink(
Exchange::new(|x: &(usize, usize, T)| x.0 as u64),
"SequenceOutput",
move |input| {
// grab each command and queue it up
input.for_each(|time, data| {
recvd.reserve(data.len());
for (worker, counter, element) in data.drain(..) {
recvd.push(((*time.time(), worker, counter), element));
}
});
recvd.sort_by(|x,y| x.0.cmp(&y.0));
if let Some(last) = recvd.last() {
let mut activator_borrow = activator_sink.borrow_mut();
let activator = activator_borrow.as_mut().unwrap();
activator.catchup_until = Some((last.0).0);
activator.activate();
}
// determine how many (which) elements to read from `recvd`.
let count = recvd.iter().filter(|&((ref time, _, _), _)| !input.frontier().less_equal(time)).count();
let iter = recvd.drain(..count);
if let Some(recv_queue) = recv_weak.upgrade() {
recv_queue.borrow_mut().extend(iter.map(|(_,elem)| elem));
}
}
);
});
Sequencer { activator, send, recv, }
}
/// Adds an element to the shared log.
pub fn push(&mut self, element: T) {
self.send.borrow_mut().push_back(element);
self.activator.borrow_mut().as_mut().unwrap().activate();
}
}
impl<T> Iterator for Sequencer<T> {
type Item = T;
fn next(&mut self) -> Option<T> {
self.recv.borrow_mut().pop_front()
}
}
// We should activate on drop, as this will cause the source to drop its capability.
impl<T> Drop for Sequencer<T> {
fn drop(&mut self) {
self.activator
.borrow()
.as_ref()
.expect("Sequencer.activator unavailable")
.activate()
}
}