1use std::rc::Rc;
4use std::cell::RefCell;
5use std::thread::Thread;
6use std::collections::BinaryHeap;
7use std::time::{Duration, Instant};
8use std::cmp::Reverse;
9use std::sync::mpsc::{Sender, Receiver};
10
11pub trait Scheduler {
21 fn activate(&mut self, path: &[usize]);
23 fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>);
29}
30
31impl Scheduler for Box<dyn Scheduler> {
33 fn activate(&mut self, path: &[usize]) { (**self).activate(path) }
34 fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>) { (**self).extensions(path, dest) }
35}
36
37#[derive(Debug)]
39pub struct Activations {
40 clean: usize,
41 bounds: Vec<(usize, usize)>,
43 slices: Vec<usize>,
44 buffer: Vec<usize>,
45
46 tx: Sender<Vec<usize>>,
48 rx: Receiver<Vec<usize>>,
49
50 timer: Option<Instant>,
52 queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
53}
54
55impl Activations {
56
57 pub fn new(timer: Option<Instant>) -> Self {
59 let (tx, rx) = std::sync::mpsc::channel();
60 Self {
61 clean: 0,
62 bounds: Vec::new(),
63 slices: Vec::new(),
64 buffer: Vec::new(),
65 tx,
66 rx,
67 timer,
68 queue: BinaryHeap::new(),
69 }
70 }
71
72 pub fn activate(&mut self, path: &[usize]) {
74 self.bounds.push((self.slices.len(), path.len()));
75 self.slices.extend(path);
76 }
77
78 pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
80 if let Some(timer) = self.timer {
81 if delay == Duration::new(0, 0) {
83 self.activate(path);
84 }
85 else {
86 let moment = timer.elapsed() + delay;
87 self.queue.push(Reverse((moment, path.to_vec())));
88 }
89 }
90 else {
91 self.activate(path);
92 }
93 }
94
95 pub fn advance(&mut self) {
97
98 while let Ok(path) = self.rx.try_recv() {
100 self.activate(&path[..])
101 }
102
103 if let Some(timer) = self.timer {
105 if !self.queue.is_empty() {
106 let now = timer.elapsed();
107 while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
108 let Reverse((_time, path)) = self.queue.pop().unwrap();
109 self.activate(&path[..]);
110 }
111 }
112 }
113
114 self.bounds.drain(.. self.clean);
115
116 { let slices = &self.slices[..];
118 self.bounds.sort_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
119 self.bounds.dedup_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
120 }
121
122 self.buffer.clear();
124 for (offset, length) in self.bounds.iter_mut() {
125 self.buffer.extend(&self.slices[*offset .. (*offset + *length)]);
126 *offset = self.buffer.len() - *length;
127 }
128 ::std::mem::swap(&mut self.buffer, &mut self.slices);
129
130 self.clean = self.bounds.len();
131 }
132
133 pub fn map_active(&self, logic: impl Fn(&[usize])) {
135 for (offset, length) in self.bounds.iter() {
136 logic(&self.slices[*offset .. (*offset + *length)]);
137 }
138 }
139
140 pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {
142
143 let position =
144 self.bounds[..self.clean]
145 .binary_search_by_key(&path, |x| &self.slices[x.0 .. (x.0 + x.1)]);
146 let position = match position {
147 Ok(x) => x,
148 Err(x) => x,
149 };
150
151 let mut previous = None;
152 self.bounds
153 .iter()
154 .cloned()
155 .skip(position)
156 .map(|(offset, length)| &self.slices[offset .. (offset + length)])
157 .take_while(|x| x.starts_with(path))
158 .for_each(|x| {
159 if let Some(extension) = x.get(path.len()) {
161 if previous != Some(*extension) {
162 action(*extension);
163 previous = Some(*extension);
164 }
165 }
166 });
167 }
168
169 pub fn sync(&self) -> SyncActivations {
171 SyncActivations {
172 tx: self.tx.clone(),
173 thread: std::thread::current(),
174 }
175 }
176
177 pub fn empty_for(&self) -> Option<Duration> {
183 if !self.bounds.is_empty() || self.timer.is_none() {
184 Some(Duration::new(0,0))
185 }
186 else {
187 self.queue.peek().map(|Reverse((t,_a))| {
188 let elapsed = self.timer.unwrap().elapsed();
189 if t < &elapsed { Duration::new(0,0) }
190 else { *t - elapsed }
191 })
192 }
193 }
194}
195
196#[derive(Clone, Debug)]
198pub struct SyncActivations {
199 tx: Sender<Vec<usize>>,
200 thread: Thread,
201}
202
203impl SyncActivations {
204 pub fn activate(&self, path: Vec<usize>) -> Result<(), SyncActivationError> {
207 self.activate_batch(std::iter::once(path))
208 }
209
210 pub fn activate_batch<I>(&self, paths: I) -> Result<(), SyncActivationError>
216 where
217 I: IntoIterator<Item = Vec<usize>>
218 {
219 for path in paths.into_iter() {
220 self.tx.send(path).map_err(|_| SyncActivationError)?;
221 }
222 self.thread.unpark();
223 Ok(())
224 }
225}
226
227#[derive(Clone, Debug)]
229pub struct Activator {
230 path: Rc<[usize]>,
231 queue: Rc<RefCell<Activations>>,
232}
233
234impl Activator {
235 pub fn new(path: Rc<[usize]>, queue: Rc<RefCell<Activations>>) -> Self {
237 Self {
238 path,
239 queue,
240 }
241 }
242 pub fn activate(&self) {
244 self.queue
245 .borrow_mut()
246 .activate(&self.path[..]);
247 }
248
249 pub fn activate_after(&self, delay: Duration) {
251 if delay == Duration::new(0, 0) {
252 self.activate();
253 }
254 else {
255 self.queue
256 .borrow_mut()
257 .activate_after(&self.path[..], delay);
258 }
259 }
260}
261
262#[derive(Clone, Debug)]
264pub struct SyncActivator {
265 path: Vec<usize>,
266 queue: SyncActivations,
267}
268
269impl SyncActivator {
270 pub fn new(path: Vec<usize>, queue: SyncActivations) -> Self {
272 Self {
273 path,
274 queue,
275 }
276 }
277
278 pub fn activate(&self) -> Result<(), SyncActivationError> {
280 self.queue.activate(self.path.clone())
281 }
282}
283
284#[derive(Clone, Copy, Debug)]
287pub struct SyncActivationError;
288
289impl std::fmt::Display for SyncActivationError {
290 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
291 f.write_str("sync activation error in timely")
292 }
293}
294
295impl std::error::Error for SyncActivationError {}
296
297#[derive(Clone, Debug)]
299pub struct ActivateOnDrop<T> {
300 wrapped: T,
301 address: Rc<[usize]>,
302 activator: Rc<RefCell<Activations>>,
303}
304
305use std::ops::{Deref, DerefMut};
306
307impl<T> ActivateOnDrop<T> {
308 pub fn new(wrapped: T, address: Rc<[usize]>, activator: Rc<RefCell<Activations>>) -> Self {
310 Self { wrapped, address, activator }
311 }
312}
313
314impl<T> Deref for ActivateOnDrop<T> {
315 type Target = T;
316 fn deref(&self) -> &Self::Target {
317 &self.wrapped
318 }
319}
320
321impl<T> DerefMut for ActivateOnDrop<T> {
322 fn deref_mut(&mut self) -> &mut Self::Target {
323 &mut self.wrapped
324 }
325}
326
327impl<T> Drop for ActivateOnDrop<T> {
328 fn drop(&mut self) {
329 self.activator.borrow_mut().activate(&self.address[..]);
330 }
331}