timely/scheduling/
activate.rs

1//! Parking and unparking timely fibers.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::thread::Thread;
6use std::collections::BinaryHeap;
7use std::time::{Duration, Instant};
8use std::cmp::Reverse;
9use crossbeam_channel::{Sender, Receiver};
10
11/// Methods required to act as a timely scheduler.
12///
13/// The core methods are the activation of "paths", sequences of integers, and
14/// the enumeration of active paths by prefix. A scheduler may delay the report
15/// of a path indefinitely, but it should report at least one extension for the
16/// empty path `&[]` or risk parking the worker thread without a certain unpark.
17///
18/// There is no known harm to "spurious wake-ups" where a not-active path is
19/// returned through `extensions()`.
20pub trait Scheduler {
21    /// Mark a path as immediately scheduleable.
22    fn activate(&mut self, path: &[usize]);
23    /// Populates `dest` with next identifiers on active extensions of `path`.
24    ///
25    /// This method is where a scheduler is allowed to exercise some discretion,
26    /// in that it does not need to present *all* extensions, but it can instead
27    /// present only those that the runtime should schedule.
28    fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>);
29}
30
31// Trait objects can be schedulers too.
32impl Scheduler for Box<dyn Scheduler> {
33    fn activate(&mut self, path: &[usize]) { (**self).activate(path) }
34    fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>) { (**self).extensions(path, dest) }
35}
36
37/// Allocation-free activation tracker.
38#[derive(Debug)]
39pub struct Activations {
40    clean: usize,
41    /// `(offset, length)`
42    bounds: Vec<(usize, usize)>,
43    slices: Vec<usize>,
44    buffer: Vec<usize>,
45
46    // Inter-thread activations.
47    tx: Sender<Vec<usize>>,
48    rx: Receiver<Vec<usize>>,
49
50    // Delayed activations.
51    timer: Instant,
52    queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
53}
54
55impl Activations {
56
57    /// Creates a new activation tracker.
58    pub fn new(timer: Instant) -> Self {
59        let (tx, rx) = crossbeam_channel::unbounded();
60        Self {
61            clean: 0,
62            bounds: Vec::new(),
63            slices: Vec::new(),
64            buffer: Vec::new(),
65            tx,
66            rx,
67            timer,
68            queue: BinaryHeap::new(),
69        }
70    }
71
72    /// Activates the task addressed by `path`.
73    pub fn activate(&mut self, path: &[usize]) {
74        self.bounds.push((self.slices.len(), path.len()));
75        self.slices.extend(path);
76    }
77
78    /// Schedules a future activation for the task addressed by `path`.
79    pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
80        // TODO: We could have a minimum delay and immediately schedule anything less than that delay.
81        if delay == Duration::new(0, 0) {
82            self.activate(path);
83        }
84        else {
85            let moment = self.timer.elapsed() + delay;
86            self.queue.push(Reverse((moment, path.to_vec())));
87        }
88    }
89
90    /// Discards the current active set and presents the next active set.
91    pub fn advance(&mut self) {
92
93        // Drain inter-thread activations.
94        while let Ok(path) = self.rx.try_recv() {
95            self.activate(&path[..])
96        }
97
98        // Drain timer-based activations.
99        if !self.queue.is_empty() {
100            let now = self.timer.elapsed();
101            while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
102                let Reverse((_time, path)) = self.queue.pop().unwrap();
103                self.activate(&path[..]);
104            }
105        }
106
107        self.bounds.drain(.. self.clean);
108
109        {   // Scoped, to allow borrow to drop.
110            let slices = &self.slices[..];
111            self.bounds.sort_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
112            self.bounds.dedup_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
113        }
114
115        // Compact the slices.
116        self.buffer.clear();
117        for (offset, length) in self.bounds.iter_mut() {
118            self.buffer.extend(&self.slices[*offset .. (*offset + *length)]);
119            *offset = self.buffer.len() - *length;
120        }
121        ::std::mem::swap(&mut self.buffer, &mut self.slices);
122
123        self.clean = self.bounds.len();
124    }
125
126    /// Maps a function across activated paths.
127    pub fn map_active(&self, logic: impl Fn(&[usize])) {
128        for (offset, length) in self.bounds.iter() {
129            logic(&self.slices[*offset .. (*offset + *length)]);
130        }
131    }
132
133    /// Sets as active any symbols that follow `path`.
134    pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {
135
136        let position =
137        self.bounds[..self.clean]
138            .binary_search_by_key(&path, |x| &self.slices[x.0 .. (x.0 + x.1)]);
139        let position = match position {
140            Ok(x) => x,
141            Err(x) => x,
142        };
143
144        let mut previous = None;
145        self.bounds
146            .iter()
147            .cloned()
148            .skip(position)
149            .map(|(offset, length)| &self.slices[offset .. (offset + length)])
150            .take_while(|x| x.starts_with(path))
151            .for_each(|x| {
152                // push non-empty, non-duplicate extensions.
153                if let Some(extension) = x.get(path.len()) {
154                    if previous != Some(*extension) {
155                        action(*extension);
156                        previous = Some(*extension);
157                    }
158                }
159            });
160    }
161
162    /// Constructs a thread-safe `SyncActivations` handle to this activator.
163    pub fn sync(&self) -> SyncActivations {
164        SyncActivations {
165            tx: self.tx.clone(),
166            thread: std::thread::current(),
167        }
168    }
169
170    /// Time until next scheduled event.
171    ///
172    /// This method should be used before putting a worker thread to sleep, as it
173    /// indicates the amount of time before the thread should be unparked for the
174    /// next scheduled activation.
175    pub fn empty_for(&self) -> Option<Duration> {
176        if !self.bounds.is_empty() {
177            Some(Duration::new(0,0))
178        }
179        else {
180            self.queue.peek().map(|Reverse((t,_a))| {
181                let elapsed = self.timer.elapsed();
182                if t < &elapsed { Duration::new(0,0) }
183                else { *t - elapsed }
184            })
185        }
186    }
187}
188
189/// A thread-safe handle to an `Activations`.
190#[derive(Clone, Debug)]
191pub struct SyncActivations {
192    tx: Sender<Vec<usize>>,
193    thread: Thread,
194}
195
196impl SyncActivations {
197    /// Unparks the task addressed by `path` and unparks the associated worker
198    /// thread.
199    pub fn activate(&self, path: Vec<usize>) -> Result<(), SyncActivationError> {
200        self.activate_batch(std::iter::once(path))
201    }
202
203    /// Unparks the tasks addressed by `paths` and unparks the associated worker
204    /// thread.
205    ///
206    /// This method can be more efficient than calling `activate` repeatedly, as
207    /// it only unparks the worker thread after sending all of the activations.
208    pub fn activate_batch<I>(&self, paths: I) -> Result<(), SyncActivationError>
209    where
210        I: IntoIterator<Item = Vec<usize>>
211    {
212        for path in paths.into_iter() {
213            self.tx.send(path).map_err(|_| SyncActivationError)?;
214        }
215        self.thread.unpark();
216        Ok(())
217    }
218}
219
220/// A capability to activate a specific path.
221#[derive(Clone, Debug)]
222pub struct Activator {
223    path: Rc<[usize]>,
224    queue: Rc<RefCell<Activations>>,
225}
226
227impl Activator {
228    /// Creates a new activation handle
229    pub fn new(path: Rc<[usize]>, queue: Rc<RefCell<Activations>>) -> Self {
230        Self {
231            path,
232            queue,
233        }
234    }
235    /// Activates the associated path.
236    pub fn activate(&self) {
237        self.queue
238            .borrow_mut()
239            .activate(&self.path[..]);
240    }
241
242    /// Activates the associated path after a specified duration.
243    pub fn activate_after(&self, delay: Duration) {
244        if delay == Duration::new(0, 0) {
245            self.activate();
246        }
247        else {
248            self.queue
249                .borrow_mut()
250                .activate_after(&self.path[..], delay);
251        }
252    }
253}
254
255/// A thread-safe version of `Activator`.
256#[derive(Clone, Debug)]
257pub struct SyncActivator {
258    path: Vec<usize>,
259    queue: SyncActivations,
260}
261
262impl SyncActivator {
263    /// Creates a new thread-safe activation handle.
264    pub fn new(path: Vec<usize>, queue: SyncActivations) -> Self {
265        Self {
266            path,
267            queue,
268        }
269    }
270
271    /// Activates the associated path and unparks the associated worker thread.
272    pub fn activate(&self) -> Result<(), SyncActivationError> {
273        self.queue.activate(self.path.clone())
274    }
275}
276
277/// The error returned when activation fails across thread boundaries because
278/// the receiving end has hung up.
279#[derive(Clone, Copy, Debug)]
280pub struct SyncActivationError;
281
282impl std::fmt::Display for SyncActivationError {
283    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
284        f.write_str("sync activation error in timely")
285    }
286}
287
288impl std::error::Error for SyncActivationError {}
289
290/// A wrapper that unparks on drop.
291#[derive(Clone, Debug)]
292pub struct ActivateOnDrop<T>  {
293    wrapped: T,
294    address: Rc<[usize]>,
295    activator: Rc<RefCell<Activations>>,
296}
297
298use std::ops::{Deref, DerefMut};
299
300impl<T> ActivateOnDrop<T> {
301    /// Wraps an element so that it is unparked on drop.
302    pub fn new(wrapped: T, address: Rc<[usize]>, activator: Rc<RefCell<Activations>>) -> Self {
303        Self { wrapped, address, activator }
304    }
305}
306
307impl<T> Deref for ActivateOnDrop<T> {
308    type Target = T;
309    fn deref(&self) -> &Self::Target {
310        &self.wrapped
311    }
312}
313
314impl<T> DerefMut for ActivateOnDrop<T> {
315    fn deref_mut(&mut self) -> &mut Self::Target {
316        &mut self.wrapped
317    }
318}
319
320impl<T> Drop for ActivateOnDrop<T> {
321    fn drop(&mut self) {
322        self.activator.borrow_mut().activate(&self.address[..]);
323    }
324}