1use std::rc::Rc;
4use std::cell::RefCell;
5use std::thread::Thread;
6use std::collections::BinaryHeap;
7use std::time::{Duration, Instant};
8use std::cmp::Reverse;
9use crossbeam_channel::{Sender, Receiver};
10
11pub trait Scheduler {
21 fn activate(&mut self, path: &[usize]);
23 fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>);
29}
30
31impl Scheduler for Box<dyn Scheduler> {
33 fn activate(&mut self, path: &[usize]) { (**self).activate(path) }
34 fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>) { (**self).extensions(path, dest) }
35}
36
37#[derive(Debug)]
39pub struct Activations {
40 clean: usize,
41 bounds: Vec<(usize, usize)>,
43 slices: Vec<usize>,
44 buffer: Vec<usize>,
45
46 tx: Sender<Vec<usize>>,
48 rx: Receiver<Vec<usize>>,
49
50 timer: Instant,
52 queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
53}
54
55impl Activations {
56
57 pub fn new(timer: Instant) -> Self {
59 let (tx, rx) = crossbeam_channel::unbounded();
60 Self {
61 clean: 0,
62 bounds: Vec::new(),
63 slices: Vec::new(),
64 buffer: Vec::new(),
65 tx,
66 rx,
67 timer,
68 queue: BinaryHeap::new(),
69 }
70 }
71
72 pub fn activate(&mut self, path: &[usize]) {
74 self.bounds.push((self.slices.len(), path.len()));
75 self.slices.extend(path);
76 }
77
78 pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
80 if delay == Duration::new(0, 0) {
82 self.activate(path);
83 }
84 else {
85 let moment = self.timer.elapsed() + delay;
86 self.queue.push(Reverse((moment, path.to_vec())));
87 }
88 }
89
90 pub fn advance(&mut self) {
92
93 while let Ok(path) = self.rx.try_recv() {
95 self.activate(&path[..])
96 }
97
98 if !self.queue.is_empty() {
100 let now = self.timer.elapsed();
101 while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
102 let Reverse((_time, path)) = self.queue.pop().unwrap();
103 self.activate(&path[..]);
104 }
105 }
106
107 self.bounds.drain(.. self.clean);
108
109 { let slices = &self.slices[..];
111 self.bounds.sort_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
112 self.bounds.dedup_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
113 }
114
115 self.buffer.clear();
117 for (offset, length) in self.bounds.iter_mut() {
118 self.buffer.extend(&self.slices[*offset .. (*offset + *length)]);
119 *offset = self.buffer.len() - *length;
120 }
121 ::std::mem::swap(&mut self.buffer, &mut self.slices);
122
123 self.clean = self.bounds.len();
124 }
125
126 pub fn map_active(&self, logic: impl Fn(&[usize])) {
128 for (offset, length) in self.bounds.iter() {
129 logic(&self.slices[*offset .. (*offset + *length)]);
130 }
131 }
132
133 pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {
135
136 let position =
137 self.bounds[..self.clean]
138 .binary_search_by_key(&path, |x| &self.slices[x.0 .. (x.0 + x.1)]);
139 let position = match position {
140 Ok(x) => x,
141 Err(x) => x,
142 };
143
144 let mut previous = None;
145 self.bounds
146 .iter()
147 .cloned()
148 .skip(position)
149 .map(|(offset, length)| &self.slices[offset .. (offset + length)])
150 .take_while(|x| x.starts_with(path))
151 .for_each(|x| {
152 if let Some(extension) = x.get(path.len()) {
154 if previous != Some(*extension) {
155 action(*extension);
156 previous = Some(*extension);
157 }
158 }
159 });
160 }
161
162 pub fn sync(&self) -> SyncActivations {
164 SyncActivations {
165 tx: self.tx.clone(),
166 thread: std::thread::current(),
167 }
168 }
169
170 pub fn empty_for(&self) -> Option<Duration> {
176 if !self.bounds.is_empty() {
177 Some(Duration::new(0,0))
178 }
179 else {
180 self.queue.peek().map(|Reverse((t,_a))| {
181 let elapsed = self.timer.elapsed();
182 if t < &elapsed { Duration::new(0,0) }
183 else { *t - elapsed }
184 })
185 }
186 }
187}
188
189#[derive(Clone, Debug)]
191pub struct SyncActivations {
192 tx: Sender<Vec<usize>>,
193 thread: Thread,
194}
195
196impl SyncActivations {
197 pub fn activate(&self, path: Vec<usize>) -> Result<(), SyncActivationError> {
200 self.activate_batch(std::iter::once(path))
201 }
202
203 pub fn activate_batch<I>(&self, paths: I) -> Result<(), SyncActivationError>
209 where
210 I: IntoIterator<Item = Vec<usize>>
211 {
212 for path in paths.into_iter() {
213 self.tx.send(path).map_err(|_| SyncActivationError)?;
214 }
215 self.thread.unpark();
216 Ok(())
217 }
218}
219
220#[derive(Clone, Debug)]
222pub struct Activator {
223 path: Rc<[usize]>,
224 queue: Rc<RefCell<Activations>>,
225}
226
227impl Activator {
228 pub fn new(path: Rc<[usize]>, queue: Rc<RefCell<Activations>>) -> Self {
230 Self {
231 path,
232 queue,
233 }
234 }
235 pub fn activate(&self) {
237 self.queue
238 .borrow_mut()
239 .activate(&self.path[..]);
240 }
241
242 pub fn activate_after(&self, delay: Duration) {
244 if delay == Duration::new(0, 0) {
245 self.activate();
246 }
247 else {
248 self.queue
249 .borrow_mut()
250 .activate_after(&self.path[..], delay);
251 }
252 }
253}
254
255#[derive(Clone, Debug)]
257pub struct SyncActivator {
258 path: Vec<usize>,
259 queue: SyncActivations,
260}
261
262impl SyncActivator {
263 pub fn new(path: Vec<usize>, queue: SyncActivations) -> Self {
265 Self {
266 path,
267 queue,
268 }
269 }
270
271 pub fn activate(&self) -> Result<(), SyncActivationError> {
273 self.queue.activate(self.path.clone())
274 }
275}
276
277#[derive(Clone, Copy, Debug)]
280pub struct SyncActivationError;
281
282impl std::fmt::Display for SyncActivationError {
283 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
284 f.write_str("sync activation error in timely")
285 }
286}
287
288impl std::error::Error for SyncActivationError {}
289
290#[derive(Clone, Debug)]
292pub struct ActivateOnDrop<T> {
293 wrapped: T,
294 address: Rc<[usize]>,
295 activator: Rc<RefCell<Activations>>,
296}
297
298use std::ops::{Deref, DerefMut};
299
300impl<T> ActivateOnDrop<T> {
301 pub fn new(wrapped: T, address: Rc<[usize]>, activator: Rc<RefCell<Activations>>) -> Self {
303 Self { wrapped, address, activator }
304 }
305}
306
307impl<T> Deref for ActivateOnDrop<T> {
308 type Target = T;
309 fn deref(&self) -> &Self::Target {
310 &self.wrapped
311 }
312}
313
314impl<T> DerefMut for ActivateOnDrop<T> {
315 fn deref_mut(&mut self) -> &mut Self::Target {
316 &mut self.wrapped
317 }
318}
319
320impl<T> Drop for ActivateOnDrop<T> {
321 fn drop(&mut self) {
322 self.activator.borrow_mut().activate(&self.address[..]);
323 }
324}