timely/scheduling/
activate.rs

1//! Parking and unparking timely fibers.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::thread::Thread;
6use std::collections::BinaryHeap;
7use std::time::{Duration, Instant};
8use std::cmp::Reverse;
9use std::sync::mpsc::{Sender, Receiver};
10
11/// Methods required to act as a timely scheduler.
12///
13/// The core methods are the activation of "paths", sequences of integers, and
14/// the enumeration of active paths by prefix. A scheduler may delay the report
15/// of a path indefinitely, but it should report at least one extension for the
16/// empty path `&[]` or risk parking the worker thread without a certain unpark.
17///
18/// There is no known harm to "spurious wake-ups" where a not-active path is
19/// returned through `extensions()`.
20pub trait Scheduler {
21    /// Mark a path as immediately scheduleable.
22    fn activate(&mut self, path: &[usize]);
23    /// Populates `dest` with next identifiers on active extensions of `path`.
24    ///
25    /// This method is where a scheduler is allowed to exercise some discretion,
26    /// in that it does not need to present *all* extensions, but it can instead
27    /// present only those that the runtime should schedule.
28    fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>);
29}
30
31// Trait objects can be schedulers too.
32impl Scheduler for Box<dyn Scheduler> {
33    fn activate(&mut self, path: &[usize]) { (**self).activate(path) }
34    fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>) { (**self).extensions(path, dest) }
35}
36
37/// Allocation-free activation tracker.
38#[derive(Debug)]
39pub struct Activations {
40    clean: usize,
41    /// `(offset, length)`
42    bounds: Vec<(usize, usize)>,
43    slices: Vec<usize>,
44    buffer: Vec<usize>,
45
46    // Inter-thread activations.
47    tx: Sender<Vec<usize>>,
48    rx: Receiver<Vec<usize>>,
49
50    // Delayed activations.
51    timer: Option<Instant>,
52    queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
53}
54
55impl Activations {
56
57    /// Creates a new activation tracker.
58    pub fn new(timer: Option<Instant>) -> Self {
59        let (tx, rx) = std::sync::mpsc::channel();
60        Self {
61            clean: 0,
62            bounds: Vec::new(),
63            slices: Vec::new(),
64            buffer: Vec::new(),
65            tx,
66            rx,
67            timer,
68            queue: BinaryHeap::new(),
69        }
70    }
71
72    /// Activates the task addressed by `path`.
73    pub fn activate(&mut self, path: &[usize]) {
74        self.bounds.push((self.slices.len(), path.len()));
75        self.slices.extend(path);
76    }
77
78    /// Schedules a future activation for the task addressed by `path`.
79    pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
80        if let Some(timer) = self.timer {
81            // TODO: We could have a minimum delay and immediately schedule anything less than that delay.
82            if delay == Duration::new(0, 0) {
83                self.activate(path);
84            }
85            else {
86                let moment = timer.elapsed() + delay;
87                self.queue.push(Reverse((moment, path.to_vec())));
88            }
89        } 
90        else {
91            self.activate(path);
92        }
93    }
94
95    /// Discards the current active set and presents the next active set.
96    pub fn advance(&mut self) {
97
98        // Drain inter-thread activations.
99        while let Ok(path) = self.rx.try_recv() {
100            self.activate(&path[..])
101        }
102
103        // Drain timer-based activations.
104        if let Some(timer) = self.timer {
105            if !self.queue.is_empty() {
106                let now = timer.elapsed();
107                while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
108                    let Reverse((_time, path)) = self.queue.pop().unwrap();
109                    self.activate(&path[..]);
110                }
111            }
112        }
113
114        self.bounds.drain(.. self.clean);
115
116        {   // Scoped, to allow borrow to drop.
117            let slices = &self.slices[..];
118            self.bounds.sort_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
119            self.bounds.dedup_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
120        }
121
122        // Compact the slices.
123        self.buffer.clear();
124        for (offset, length) in self.bounds.iter_mut() {
125            self.buffer.extend(&self.slices[*offset .. (*offset + *length)]);
126            *offset = self.buffer.len() - *length;
127        }
128        ::std::mem::swap(&mut self.buffer, &mut self.slices);
129
130        self.clean = self.bounds.len();
131    }
132
133    /// Maps a function across activated paths.
134    pub fn map_active(&self, logic: impl Fn(&[usize])) {
135        for (offset, length) in self.bounds.iter() {
136            logic(&self.slices[*offset .. (*offset + *length)]);
137        }
138    }
139
140    /// Sets as active any symbols that follow `path`.
141    pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {
142
143        let position =
144        self.bounds[..self.clean]
145            .binary_search_by_key(&path, |x| &self.slices[x.0 .. (x.0 + x.1)]);
146        let position = match position {
147            Ok(x) => x,
148            Err(x) => x,
149        };
150
151        let mut previous = None;
152        self.bounds
153            .iter()
154            .cloned()
155            .skip(position)
156            .map(|(offset, length)| &self.slices[offset .. (offset + length)])
157            .take_while(|x| x.starts_with(path))
158            .for_each(|x| {
159                // push non-empty, non-duplicate extensions.
160                if let Some(extension) = x.get(path.len()) {
161                    if previous != Some(*extension) {
162                        action(*extension);
163                        previous = Some(*extension);
164                    }
165                }
166            });
167    }
168
169    /// Constructs a thread-safe `SyncActivations` handle to this activator.
170    pub fn sync(&self) -> SyncActivations {
171        SyncActivations {
172            tx: self.tx.clone(),
173            thread: std::thread::current(),
174        }
175    }
176
177    /// Time until next scheduled event.
178    ///
179    /// This method should be used before putting a worker thread to sleep, as it
180    /// indicates the amount of time before the thread should be unparked for the
181    /// next scheduled activation.
182    pub fn empty_for(&self) -> Option<Duration> {
183        if !self.bounds.is_empty() || self.timer.is_none() {
184            Some(Duration::new(0,0))
185        }
186        else {
187            self.queue.peek().map(|Reverse((t,_a))| {
188                let elapsed = self.timer.unwrap().elapsed();
189                if t < &elapsed { Duration::new(0,0) }
190                else { *t - elapsed }
191            })
192        }
193    }
194}
195
196/// A thread-safe handle to an `Activations`.
197#[derive(Clone, Debug)]
198pub struct SyncActivations {
199    tx: Sender<Vec<usize>>,
200    thread: Thread,
201}
202
203impl SyncActivations {
204    /// Unparks the task addressed by `path` and unparks the associated worker
205    /// thread.
206    pub fn activate(&self, path: Vec<usize>) -> Result<(), SyncActivationError> {
207        self.activate_batch(std::iter::once(path))
208    }
209
210    /// Unparks the tasks addressed by `paths` and unparks the associated worker
211    /// thread.
212    ///
213    /// This method can be more efficient than calling `activate` repeatedly, as
214    /// it only unparks the worker thread after sending all of the activations.
215    pub fn activate_batch<I>(&self, paths: I) -> Result<(), SyncActivationError>
216    where
217        I: IntoIterator<Item = Vec<usize>>
218    {
219        for path in paths.into_iter() {
220            self.tx.send(path).map_err(|_| SyncActivationError)?;
221        }
222        self.thread.unpark();
223        Ok(())
224    }
225}
226
227/// A capability to activate a specific path.
228#[derive(Clone, Debug)]
229pub struct Activator {
230    path: Rc<[usize]>,
231    queue: Rc<RefCell<Activations>>,
232}
233
234impl Activator {
235    /// Creates a new activation handle
236    pub fn new(path: Rc<[usize]>, queue: Rc<RefCell<Activations>>) -> Self {
237        Self {
238            path,
239            queue,
240        }
241    }
242    /// Activates the associated path.
243    pub fn activate(&self) {
244        self.queue
245            .borrow_mut()
246            .activate(&self.path[..]);
247    }
248
249    /// Activates the associated path after a specified duration.
250    pub fn activate_after(&self, delay: Duration) {
251        if delay == Duration::new(0, 0) {
252            self.activate();
253        }
254        else {
255            self.queue
256                .borrow_mut()
257                .activate_after(&self.path[..], delay);
258        }
259    }
260}
261
262/// A thread-safe version of `Activator`.
263#[derive(Clone, Debug)]
264pub struct SyncActivator {
265    path: Vec<usize>,
266    queue: SyncActivations,
267}
268
269impl SyncActivator {
270    /// Creates a new thread-safe activation handle.
271    pub fn new(path: Vec<usize>, queue: SyncActivations) -> Self {
272        Self {
273            path,
274            queue,
275        }
276    }
277
278    /// Activates the associated path and unparks the associated worker thread.
279    pub fn activate(&self) -> Result<(), SyncActivationError> {
280        self.queue.activate(self.path.clone())
281    }
282}
283
284/// The error returned when activation fails across thread boundaries because
285/// the receiving end has hung up.
286#[derive(Clone, Copy, Debug)]
287pub struct SyncActivationError;
288
289impl std::fmt::Display for SyncActivationError {
290    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
291        f.write_str("sync activation error in timely")
292    }
293}
294
295impl std::error::Error for SyncActivationError {}
296
297/// A wrapper that unparks on drop.
298#[derive(Clone, Debug)]
299pub struct ActivateOnDrop<T>  {
300    wrapped: T,
301    address: Rc<[usize]>,
302    activator: Rc<RefCell<Activations>>,
303}
304
305use std::ops::{Deref, DerefMut};
306
307impl<T> ActivateOnDrop<T> {
308    /// Wraps an element so that it is unparked on drop.
309    pub fn new(wrapped: T, address: Rc<[usize]>, activator: Rc<RefCell<Activations>>) -> Self {
310        Self { wrapped, address, activator }
311    }
312}
313
314impl<T> Deref for ActivateOnDrop<T> {
315    type Target = T;
316    fn deref(&self) -> &Self::Target {
317        &self.wrapped
318    }
319}
320
321impl<T> DerefMut for ActivateOnDrop<T> {
322    fn deref_mut(&mut self) -> &mut Self::Target {
323        &mut self.wrapped
324    }
325}
326
327impl<T> Drop for ActivateOnDrop<T> {
328    fn drop(&mut self) {
329        self.activator.borrow_mut().activate(&self.address[..]);
330    }
331}