tokio_util/time/delay_queue.rs
1//! A queue of delayed elements.
2//!
3//! See [`DelayQueue`] for more details.
4//!
5//! [`DelayQueue`]: struct@DelayQueue
6
7use crate::time::wheel::{self, Wheel};
8
9use tokio::time::{sleep_until, Duration, Instant, Sleep};
10
11use core::ops::{Index, IndexMut};
12use slab::Slab;
13use std::cmp;
14use std::collections::HashMap;
15use std::convert::From;
16use std::fmt;
17use std::fmt::Debug;
18use std::future::Future;
19use std::marker::PhantomData;
20use std::pin::Pin;
21use std::task::{self, ready, Poll, Waker};
22
23/// A queue of delayed elements.
24///
25/// Once an element is inserted into the `DelayQueue`, it is yielded once the
26/// specified deadline has been reached.
27///
28/// # Usage
29///
30/// Elements are inserted into `DelayQueue` using the [`insert`] or
31/// [`insert_at`] methods. A deadline is provided with the item and a [`Key`] is
32/// returned. The key is used to remove the entry or to change the deadline at
33/// which it should be yielded back.
34///
35/// Once delays have been configured, the `DelayQueue` is used via its
36/// [`Stream`] implementation. [`poll_expired`] is called. If an entry has reached its
37/// deadline, it is returned. If not, `Poll::Pending` is returned indicating that the
38/// current task will be notified once the deadline has been reached.
39///
40/// # `Stream` implementation
41///
42/// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have
43/// expired, no items are returned. In this case, [`Poll::Pending`] is returned and the
44/// current task is registered to be notified once the next item's delay has
45/// expired.
46///
47/// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll`
48/// returns `Poll::Ready(None)`. This indicates that the stream has reached an end.
49/// However, if a new item is inserted *after*, `poll` will once again start
50/// returning items or `Poll::Pending`.
51///
52/// Items are returned ordered by their expirations. Items that are configured
53/// to expire first will be returned first. There are no ordering guarantees
54/// for items configured to expire at the same instant. Also note that delays are
55/// rounded to the closest millisecond.
56///
57/// # Implementation
58///
59/// The [`DelayQueue`] is backed by a separate instance of a timer wheel similar to that used internally
60/// by Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same
61/// performance and scalability benefits.
62///
63/// State associated with each entry is stored in a [`slab`]. This amortizes the cost of allocation,
64/// and allows reuse of the memory allocated for expired entries.
65///
66/// Capacity can be checked using [`capacity`] and allocated preemptively by using
67/// the [`reserve`] method.
68///
69/// # Cancellation safety
70///
71/// [`DelayQueue`]'s implementation of [`StreamExt::next`] is cancellation safe.
72///
73/// # Usage
74///
75/// Using [`DelayQueue`] to manage cache entries.
76///
77/// ```rust,no_run
78/// use tokio_util::time::{DelayQueue, delay_queue};
79///
80/// use std::collections::HashMap;
81/// use std::task::{ready, Context, Poll};
82/// use std::time::Duration;
83/// # type CacheKey = String;
84/// # type Value = String;
85///
86/// struct Cache {
87/// entries: HashMap<CacheKey, (Value, delay_queue::Key)>,
88/// expirations: DelayQueue<CacheKey>,
89/// }
90///
91/// const TTL_SECS: u64 = 30;
92///
93/// impl Cache {
94/// fn insert(&mut self, key: CacheKey, value: Value) {
95/// let delay = self.expirations
96/// .insert(key.clone(), Duration::from_secs(TTL_SECS));
97///
98/// self.entries.insert(key, (value, delay));
99/// }
100///
101/// fn get(&self, key: &CacheKey) -> Option<&Value> {
102/// self.entries.get(key)
103/// .map(|&(ref v, _)| v)
104/// }
105///
106/// fn remove(&mut self, key: &CacheKey) {
107/// if let Some((_, cache_key)) = self.entries.remove(key) {
108/// self.expirations.remove(&cache_key);
109/// }
110/// }
111///
112/// fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<()> {
113/// while let Some(entry) = ready!(self.expirations.poll_expired(cx)) {
114/// self.entries.remove(entry.get_ref());
115/// }
116///
117/// Poll::Ready(())
118/// }
119/// }
120/// ```
121///
122/// [`insert`]: method@Self::insert
123/// [`insert_at`]: method@Self::insert_at
124/// [`Key`]: struct@Key
125/// [`Stream`]: https://docs.rs/futures/0.3.31/futures/stream/trait.Stream.html
126/// [`StreamExt::next`]: https://docs.rs/tokio-stream/0.1.17/tokio_stream/trait.StreamExt.html#method.next
127/// [`poll_expired`]: method@Self::poll_expired
128/// [`Stream::poll_expired`]: method@Self::poll_expired
129/// [`DelayQueue`]: struct@DelayQueue
130/// [`sleep`]: fn@tokio::time::sleep
131/// [`slab`]: slab
132/// [`capacity`]: method@Self::capacity
133/// [`reserve`]: method@Self::reserve
134#[derive(Debug)]
135pub struct DelayQueue<T> {
136 /// Stores data associated with entries
137 slab: SlabStorage<T>,
138
139 /// Lookup structure tracking all delays in the queue
140 wheel: Wheel<Stack<T>>,
141
142 /// Delays that were inserted when already expired. These cannot be stored
143 /// in the wheel
144 expired: Stack<T>,
145
146 /// Delay expiring when the *first* item in the queue expires
147 delay: Option<Pin<Box<Sleep>>>,
148
149 /// Wheel polling state
150 wheel_now: u64,
151
152 /// Instant at which the timer starts
153 start: Instant,
154
155 /// Waker that is invoked when we potentially need to reset the timer.
156 /// Because we lazily create the timer when the first entry is created, we
157 /// need to awaken any poller that polled us before that point.
158 waker: Option<Waker>,
159}
160
161#[derive(Default)]
162struct SlabStorage<T> {
163 inner: Slab<Data<T>>,
164
165 // A `compact` call requires a re-mapping of the `Key`s that were changed
166 // during the `compact` call of the `slab`. Since the keys that were given out
167 // cannot be changed retroactively we need to keep track of these re-mappings.
168 // The keys of `key_map` correspond to the old keys that were given out and
169 // the values to the `Key`s that were re-mapped by the `compact` call.
170 key_map: HashMap<Key, KeyInternal>,
171
172 // Index used to create new keys to hand out.
173 next_key_index: usize,
174
175 // Whether `compact` has been called, necessary in order to decide whether
176 // to include keys in `key_map`.
177 compact_called: bool,
178}
179
180impl<T> SlabStorage<T> {
181 pub(crate) fn with_capacity(capacity: usize) -> SlabStorage<T> {
182 SlabStorage {
183 inner: Slab::with_capacity(capacity),
184 key_map: HashMap::new(),
185 next_key_index: 0,
186 compact_called: false,
187 }
188 }
189
190 // Inserts data into the inner slab and re-maps keys if necessary
191 pub(crate) fn insert(&mut self, val: Data<T>) -> Key {
192 let mut key = KeyInternal::new(self.inner.insert(val));
193 let key_contained = self.key_map.contains_key(&key.into());
194
195 if key_contained {
196 // It's possible that a `compact` call creates capacity in `self.inner` in
197 // such a way that a `self.inner.insert` call creates a `key` which was
198 // previously given out during an `insert` call prior to the `compact` call.
199 // If `key` is contained in `self.key_map`, we have encountered this exact situation,
200 // We need to create a new key `key_to_give_out` and include the relation
201 // `key_to_give_out` -> `key` in `self.key_map`.
202 let key_to_give_out = self.create_new_key();
203 assert!(!self.key_map.contains_key(&key_to_give_out.into()));
204 self.key_map.insert(key_to_give_out.into(), key);
205 key = key_to_give_out;
206 } else if self.compact_called {
207 // Include an identity mapping in `self.key_map` in order to allow us to
208 // panic if a key that was handed out is removed more than once.
209 self.key_map.insert(key.into(), key);
210 }
211
212 key.into()
213 }
214
215 // Re-map the key in case compact was previously called.
216 // Note: Since we include identity mappings in key_map after compact was called,
217 // we have information about all keys that were handed out. In the case in which
218 // compact was called and we try to remove a Key that was previously removed
219 // we can detect invalid keys if no key is found in `key_map`. This is necessary
220 // in order to prevent situations in which a previously removed key
221 // corresponds to a re-mapped key internally and which would then be incorrectly
222 // removed from the slab.
223 //
224 // Example to illuminate this problem:
225 //
226 // Let's assume our `key_map` is {1 -> 2, 2 -> 1} and we call remove(1). If we
227 // were to remove 1 again, we would not find it inside `key_map` anymore.
228 // If we were to imply from this that no re-mapping was necessary, we would
229 // incorrectly remove 1 from `self.slab.inner`, which corresponds to the
230 // handed-out key 2.
231 pub(crate) fn remove(&mut self, key: &Key) -> Data<T> {
232 let remapped_key = if self.compact_called {
233 match self.key_map.remove(key) {
234 Some(key_internal) => key_internal,
235 None => panic!("invalid key"),
236 }
237 } else {
238 (*key).into()
239 };
240
241 self.inner.remove(remapped_key.index)
242 }
243
244 pub(crate) fn shrink_to_fit(&mut self) {
245 self.inner.shrink_to_fit();
246 self.key_map.shrink_to_fit();
247 }
248
249 pub(crate) fn compact(&mut self) {
250 if !self.compact_called {
251 for (key, _) in self.inner.iter() {
252 self.key_map.insert(Key::new(key), KeyInternal::new(key));
253 }
254 }
255
256 let mut remapping = HashMap::new();
257 self.inner.compact(|_, from, to| {
258 remapping.insert(from, to);
259 true
260 });
261
262 // At this point `key_map` contains a mapping for every element.
263 for internal_key in self.key_map.values_mut() {
264 if let Some(new_internal_key) = remapping.get(&internal_key.index) {
265 *internal_key = KeyInternal::new(*new_internal_key);
266 }
267 }
268
269 if self.key_map.capacity() > 2 * self.key_map.len() {
270 self.key_map.shrink_to_fit();
271 }
272
273 self.compact_called = true;
274 }
275
276 // Tries to re-map a `Key` that was given out to the user to its
277 // corresponding internal key.
278 fn remap_key(&self, key: &Key) -> Option<KeyInternal> {
279 let key_map = &self.key_map;
280 if self.compact_called {
281 key_map.get(key).copied()
282 } else {
283 Some((*key).into())
284 }
285 }
286
287 fn create_new_key(&mut self) -> KeyInternal {
288 while self.key_map.contains_key(&Key::new(self.next_key_index)) {
289 self.next_key_index = self.next_key_index.wrapping_add(1);
290 }
291
292 KeyInternal::new(self.next_key_index)
293 }
294
295 pub(crate) fn len(&self) -> usize {
296 self.inner.len()
297 }
298
299 pub(crate) fn capacity(&self) -> usize {
300 self.inner.capacity()
301 }
302
303 pub(crate) fn clear(&mut self) {
304 self.inner.clear();
305 self.key_map.clear();
306 self.compact_called = false;
307 }
308
309 pub(crate) fn reserve(&mut self, additional: usize) {
310 self.inner.reserve(additional);
311
312 if self.compact_called {
313 self.key_map.reserve(additional);
314 }
315 }
316
317 pub(crate) fn is_empty(&self) -> bool {
318 self.inner.is_empty()
319 }
320
321 pub(crate) fn contains(&self, key: &Key) -> bool {
322 let remapped_key = self.remap_key(key);
323
324 match remapped_key {
325 Some(internal_key) => self.inner.contains(internal_key.index),
326 None => false,
327 }
328 }
329}
330
331impl<T> fmt::Debug for SlabStorage<T>
332where
333 T: fmt::Debug,
334{
335 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
336 if fmt.alternate() {
337 fmt.debug_map().entries(self.inner.iter()).finish()
338 } else {
339 fmt.debug_struct("Slab")
340 .field("len", &self.len())
341 .field("cap", &self.capacity())
342 .finish()
343 }
344 }
345}
346
347impl<T> Index<Key> for SlabStorage<T> {
348 type Output = Data<T>;
349
350 fn index(&self, key: Key) -> &Self::Output {
351 let remapped_key = self.remap_key(&key);
352
353 match remapped_key {
354 Some(internal_key) => &self.inner[internal_key.index],
355 None => panic!("Invalid index {}", key.index),
356 }
357 }
358}
359
360impl<T> IndexMut<Key> for SlabStorage<T> {
361 fn index_mut(&mut self, key: Key) -> &mut Data<T> {
362 let remapped_key = self.remap_key(&key);
363
364 match remapped_key {
365 Some(internal_key) => &mut self.inner[internal_key.index],
366 None => panic!("Invalid index {}", key.index),
367 }
368 }
369}
370
371/// An entry in `DelayQueue` that has expired and been removed.
372///
373/// Values are returned by [`DelayQueue::poll_expired`].
374///
375/// [`DelayQueue::poll_expired`]: method@DelayQueue::poll_expired
376#[derive(Debug)]
377pub struct Expired<T> {
378 /// The data stored in the queue
379 data: T,
380
381 /// The expiration time
382 deadline: Instant,
383
384 /// The key associated with the entry
385 key: Key,
386}
387
388/// Token to a value stored in a `DelayQueue`.
389///
390/// Instances of `Key` are returned by [`DelayQueue::insert`]. See [`DelayQueue`]
391/// documentation for more details.
392///
393/// [`DelayQueue`]: struct@DelayQueue
394/// [`DelayQueue::insert`]: method@DelayQueue::insert
395#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
396pub struct Key {
397 index: usize,
398}
399
400// Whereas `Key` is given out to users that use `DelayQueue`, internally we use
401// `KeyInternal` as the key type in order to make the logic of mapping between keys
402// as a result of `compact` calls clearer.
403#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
404struct KeyInternal {
405 index: usize,
406}
407
408#[derive(Debug)]
409struct Stack<T> {
410 /// Head of the stack
411 head: Option<Key>,
412 _p: PhantomData<fn() -> T>,
413}
414
415#[derive(Debug)]
416struct Data<T> {
417 /// The data being stored in the queue and will be returned at the requested
418 /// instant.
419 inner: T,
420
421 /// The instant at which the item is returned.
422 when: u64,
423
424 /// Set to true when stored in the `expired` queue
425 expired: bool,
426
427 /// Next entry in the stack
428 next: Option<Key>,
429
430 /// Previous entry in the stack
431 prev: Option<Key>,
432}
433
434/// Maximum number of entries the queue can handle
435const MAX_ENTRIES: usize = (1 << 30) - 1;
436
437impl<T> DelayQueue<T> {
438 /// Creates a new, empty, `DelayQueue`.
439 ///
440 /// The queue will not allocate storage until items are inserted into it.
441 ///
442 /// # Examples
443 ///
444 /// ```rust
445 /// # use tokio_util::time::DelayQueue;
446 /// let delay_queue: DelayQueue<u32> = DelayQueue::new();
447 /// ```
448 pub fn new() -> DelayQueue<T> {
449 DelayQueue::with_capacity(0)
450 }
451
452 /// Creates a new, empty, `DelayQueue` with the specified capacity.
453 ///
454 /// The queue will be able to hold at least `capacity` elements without
455 /// reallocating. If `capacity` is 0, the queue will not allocate for
456 /// storage.
457 ///
458 /// # Examples
459 ///
460 /// ```rust
461 /// # use tokio_util::time::DelayQueue;
462 /// # use std::time::Duration;
463 ///
464 /// # #[tokio::main(flavor = "current_thread")]
465 /// # async fn main() {
466 /// let mut delay_queue = DelayQueue::with_capacity(10);
467 ///
468 /// // These insertions are done without further allocation
469 /// for i in 0..10 {
470 /// delay_queue.insert(i, Duration::from_secs(i));
471 /// }
472 ///
473 /// // This will make the queue allocate additional storage
474 /// delay_queue.insert(11, Duration::from_secs(11));
475 /// # }
476 /// ```
477 pub fn with_capacity(capacity: usize) -> DelayQueue<T> {
478 DelayQueue {
479 wheel: Wheel::new(),
480 slab: SlabStorage::with_capacity(capacity),
481 expired: Stack::default(),
482 delay: None,
483 wheel_now: 0,
484 start: Instant::now(),
485 waker: None,
486 }
487 }
488
489 /// Inserts `value` into the queue set to expire at a specific instant in
490 /// time.
491 ///
492 /// This function is identical to `insert`, but takes an `Instant` instead
493 /// of a `Duration`.
494 ///
495 /// `value` is stored in the queue until `when` is reached. At which point,
496 /// `value` will be returned from [`poll_expired`]. If `when` has already been
497 /// reached, then `value` is immediately made available to poll.
498 ///
499 /// The return value represents the insertion and is used as an argument to
500 /// [`remove`] and [`reset`]. Note that [`Key`] is a token and is reused once
501 /// `value` is removed from the queue either by calling [`poll_expired`] after
502 /// `when` is reached or by calling [`remove`]. At this point, the caller
503 /// must take care to not use the returned [`Key`] again as it may reference
504 /// a different item in the queue.
505 ///
506 /// See [type] level documentation for more details.
507 ///
508 /// # Panics
509 ///
510 /// This function panics if `when` is too far in the future.
511 ///
512 /// # Examples
513 ///
514 /// Basic usage
515 ///
516 /// ```rust
517 /// use tokio::time::{Duration, Instant};
518 /// use tokio_util::time::DelayQueue;
519 ///
520 /// # #[tokio::main(flavor = "current_thread")]
521 /// # async fn main() {
522 /// let mut delay_queue = DelayQueue::new();
523 /// let key = delay_queue.insert_at(
524 /// "foo", Instant::now() + Duration::from_secs(5));
525 ///
526 /// // Remove the entry
527 /// let item = delay_queue.remove(&key);
528 /// assert_eq!(*item.get_ref(), "foo");
529 /// # }
530 /// ```
531 ///
532 /// [`poll_expired`]: method@Self::poll_expired
533 /// [`remove`]: method@Self::remove
534 /// [`reset`]: method@Self::reset
535 /// [`Key`]: struct@Key
536 /// [type]: #
537 #[track_caller]
538 pub fn insert_at(&mut self, value: T, when: Instant) -> Key {
539 assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded");
540
541 // Normalize the deadline. Values cannot be set to expire in the past.
542 let when = self.normalize_deadline(when);
543
544 // Insert the value in the store
545 let key = self.slab.insert(Data {
546 inner: value,
547 when,
548 expired: false,
549 next: None,
550 prev: None,
551 });
552
553 self.insert_idx(when, key);
554
555 // Set a new delay if the current's deadline is later than the one of the new item
556 let should_set_delay = if let Some(ref delay) = self.delay {
557 let current_exp = self.normalize_deadline(delay.deadline());
558 current_exp > when
559 } else {
560 true
561 };
562
563 if should_set_delay {
564 if let Some(waker) = self.waker.take() {
565 waker.wake();
566 }
567
568 let delay_time = self.start + Duration::from_millis(when);
569 if let Some(ref mut delay) = &mut self.delay {
570 delay.as_mut().reset(delay_time);
571 } else {
572 self.delay = Some(Box::pin(sleep_until(delay_time)));
573 }
574 }
575
576 key
577 }
578
579 /// Attempts to pull out the next value of the delay queue, registering the
580 /// current task for wakeup if the value is not yet available, and returning
581 /// `None` if the queue is exhausted.
582 pub fn poll_expired(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Expired<T>>> {
583 if !self
584 .waker
585 .as_ref()
586 .map(|w| w.will_wake(cx.waker()))
587 .unwrap_or(false)
588 {
589 self.waker = Some(cx.waker().clone());
590 }
591
592 let item = ready!(self.poll_idx(cx));
593 Poll::Ready(item.map(|key| {
594 let data = self.slab.remove(&key);
595 debug_assert!(data.next.is_none());
596 debug_assert!(data.prev.is_none());
597
598 Expired {
599 key,
600 data: data.inner,
601 deadline: self.start + Duration::from_millis(data.when),
602 }
603 }))
604 }
605
606 /// Inserts `value` into the queue set to expire after the requested duration
607 /// elapses.
608 ///
609 /// This function is identical to `insert_at`, but takes a `Duration`
610 /// instead of an `Instant`.
611 ///
612 /// `value` is stored in the queue until `timeout` duration has
613 /// elapsed after `insert` was called. At that point, `value` will
614 /// be returned from [`poll_expired`]. If `timeout` is a `Duration` of
615 /// zero, then `value` is immediately made available to poll.
616 ///
617 /// The return value represents the insertion and is used as an
618 /// argument to [`remove`] and [`reset`]. Note that [`Key`] is a
619 /// token and is reused once `value` is removed from the queue
620 /// either by calling [`poll_expired`] after `timeout` has elapsed
621 /// or by calling [`remove`]. At this point, the caller must not
622 /// use the returned [`Key`] again as it may reference a different
623 /// item in the queue.
624 ///
625 /// See [type] level documentation for more details.
626 ///
627 /// # Panics
628 ///
629 /// This function panics if `timeout` is greater than the maximum
630 /// duration supported by the timer in the current `Runtime`.
631 ///
632 /// # Examples
633 ///
634 /// Basic usage
635 ///
636 /// ```rust
637 /// use tokio_util::time::DelayQueue;
638 /// use std::time::Duration;
639 ///
640 /// # #[tokio::main(flavor = "current_thread")]
641 /// # async fn main() {
642 /// let mut delay_queue = DelayQueue::new();
643 /// let key = delay_queue.insert("foo", Duration::from_secs(5));
644 ///
645 /// // Remove the entry
646 /// let item = delay_queue.remove(&key);
647 /// assert_eq!(*item.get_ref(), "foo");
648 /// # }
649 /// ```
650 ///
651 /// [`poll_expired`]: method@Self::poll_expired
652 /// [`remove`]: method@Self::remove
653 /// [`reset`]: method@Self::reset
654 /// [`Key`]: struct@Key
655 /// [type]: #
656 #[track_caller]
657 pub fn insert(&mut self, value: T, timeout: Duration) -> Key {
658 self.insert_at(value, Instant::now() + timeout)
659 }
660
661 #[track_caller]
662 fn insert_idx(&mut self, when: u64, key: Key) {
663 use self::wheel::{InsertError, Stack};
664
665 // Register the deadline with the timer wheel
666 match self.wheel.insert(when, key, &mut self.slab) {
667 Ok(_) => {}
668 Err((_, InsertError::Elapsed)) => {
669 self.slab[key].expired = true;
670 // The delay is already expired, store it in the expired queue
671 self.expired.push(key, &mut self.slab);
672 }
673 Err((_, err)) => panic!("invalid deadline; err={err:?}"),
674 }
675 }
676
677 /// Returns the deadline of the item associated with `key`.
678 ///
679 /// Since the queue operates at millisecond granularity, the returned
680 /// deadline may not exactly match the value that was given when initially
681 /// inserting the item into the queue.
682 ///
683 /// # Panics
684 ///
685 /// This function panics if `key` is not contained by the queue.
686 ///
687 /// # Examples
688 ///
689 /// Basic usage
690 ///
691 /// ```rust
692 /// use tokio_util::time::DelayQueue;
693 /// use std::time::Duration;
694 ///
695 /// # #[tokio::main(flavor = "current_thread")]
696 /// # async fn main() {
697 /// let mut delay_queue = DelayQueue::new();
698 ///
699 /// let key1 = delay_queue.insert("foo", Duration::from_secs(5));
700 /// let key2 = delay_queue.insert("bar", Duration::from_secs(10));
701 ///
702 /// assert!(delay_queue.deadline(&key1) < delay_queue.deadline(&key2));
703 /// # }
704 /// ```
705 #[track_caller]
706 pub fn deadline(&self, key: &Key) -> Instant {
707 self.start + Duration::from_millis(self.slab[*key].when)
708 }
709
710 /// Removes the key from the expired queue or the timer wheel
711 /// depending on its expiration status.
712 ///
713 /// # Panics
714 ///
715 /// Panics if the key is not contained in the expired queue or the wheel.
716 #[track_caller]
717 fn remove_key(&mut self, key: &Key) {
718 use crate::time::wheel::Stack;
719
720 // Special case the `expired` queue
721 if self.slab[*key].expired {
722 self.expired.remove(key, &mut self.slab);
723 } else {
724 self.wheel.remove(key, &mut self.slab);
725 }
726 }
727
728 /// Removes the item associated with `key` from the queue.
729 ///
730 /// There must be an item associated with `key`. The function returns the
731 /// removed item as well as the `Instant` at which it will the delay will
732 /// have expired.
733 ///
734 /// # Panics
735 ///
736 /// The function panics if `key` is not contained by the queue.
737 ///
738 /// # Examples
739 ///
740 /// Basic usage
741 ///
742 /// ```rust
743 /// use tokio_util::time::DelayQueue;
744 /// use std::time::Duration;
745 ///
746 /// # #[tokio::main(flavor = "current_thread")]
747 /// # async fn main() {
748 /// let mut delay_queue = DelayQueue::new();
749 /// let key = delay_queue.insert("foo", Duration::from_secs(5));
750 ///
751 /// // Remove the entry
752 /// let item = delay_queue.remove(&key);
753 /// assert_eq!(*item.get_ref(), "foo");
754 /// # }
755 /// ```
756 #[track_caller]
757 pub fn remove(&mut self, key: &Key) -> Expired<T> {
758 let prev_deadline = self.next_deadline();
759
760 self.remove_key(key);
761 let data = self.slab.remove(key);
762
763 let next_deadline = self.next_deadline();
764 if prev_deadline != next_deadline {
765 match (next_deadline, &mut self.delay) {
766 (None, _) => self.delay = None,
767 (Some(deadline), Some(delay)) => delay.as_mut().reset(deadline),
768 (Some(deadline), None) => self.delay = Some(Box::pin(sleep_until(deadline))),
769 }
770 }
771
772 if self.slab.is_empty() {
773 if let Some(waker) = self.waker.take() {
774 waker.wake();
775 }
776 }
777
778 Expired {
779 key: Key::new(key.index),
780 data: data.inner,
781 deadline: self.start + Duration::from_millis(data.when),
782 }
783 }
784
785 /// Attempts to remove the item associated with `key` from the queue.
786 ///
787 /// Removes the item associated with `key`, and returns it along with the
788 /// `Instant` at which it would have expired, if it exists.
789 ///
790 /// Returns `None` if `key` is not in the queue.
791 ///
792 /// # Examples
793 ///
794 /// Basic usage
795 ///
796 /// ```rust
797 /// use tokio_util::time::DelayQueue;
798 /// use std::time::Duration;
799 ///
800 /// # #[tokio::main(flavor = "current_thread")]
801 /// # async fn main() {
802 /// let mut delay_queue = DelayQueue::new();
803 /// let key = delay_queue.insert("foo", Duration::from_secs(5));
804 ///
805 /// // The item is in the queue, `try_remove` returns `Some(Expired("foo"))`.
806 /// let item = delay_queue.try_remove(&key);
807 /// assert_eq!(item.unwrap().into_inner(), "foo");
808 ///
809 /// // The item is not in the queue anymore, `try_remove` returns `None`.
810 /// let item = delay_queue.try_remove(&key);
811 /// assert!(item.is_none());
812 /// # }
813 /// ```
814 pub fn try_remove(&mut self, key: &Key) -> Option<Expired<T>> {
815 if self.slab.contains(key) {
816 Some(self.remove(key))
817 } else {
818 None
819 }
820 }
821
822 /// Sets the delay of the item associated with `key` to expire at `when`.
823 ///
824 /// This function is identical to `reset` but takes an `Instant` instead of
825 /// a `Duration`.
826 ///
827 /// The item remains in the queue but the delay is set to expire at `when`.
828 /// If `when` is in the past, then the item is immediately made available to
829 /// the caller.
830 ///
831 /// # Panics
832 ///
833 /// This function panics if `when` is too far in the future or if `key` is
834 /// not contained by the queue.
835 ///
836 /// # Examples
837 ///
838 /// Basic usage
839 ///
840 /// ```rust
841 /// use tokio::time::{Duration, Instant};
842 /// use tokio_util::time::DelayQueue;
843 ///
844 /// # #[tokio::main(flavor = "current_thread")]
845 /// # async fn main() {
846 /// let mut delay_queue = DelayQueue::new();
847 /// let key = delay_queue.insert("foo", Duration::from_secs(5));
848 ///
849 /// // "foo" is scheduled to be returned in 5 seconds
850 ///
851 /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
852 ///
853 /// // "foo" is now scheduled to be returned in 10 seconds
854 /// # }
855 /// ```
856 #[track_caller]
857 pub fn reset_at(&mut self, key: &Key, when: Instant) {
858 self.remove_key(key);
859
860 // Normalize the deadline. Values cannot be set to expire in the past.
861 let when = self.normalize_deadline(when);
862
863 self.slab[*key].when = when;
864 self.slab[*key].expired = false;
865
866 self.insert_idx(when, *key);
867
868 let next_deadline = self.next_deadline();
869 if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) {
870 // This should awaken us if necessary (ie, if already expired)
871 delay.as_mut().reset(deadline);
872 }
873 }
874
875 /// Shrink the capacity of the slab, which `DelayQueue` uses internally for storage allocation.
876 /// This function is not guaranteed to, and in most cases, won't decrease the capacity of the slab
877 /// to the number of elements still contained in it, because elements cannot be moved to a different
878 /// index. To decrease the capacity to the size of the slab use [`compact`].
879 ///
880 /// This function can take O(n) time even when the capacity cannot be reduced or the allocation is
881 /// shrunk in place. Repeated calls run in O(1) though.
882 ///
883 /// [`compact`]: method@Self::compact
884 pub fn shrink_to_fit(&mut self) {
885 self.slab.shrink_to_fit();
886 }
887
888 /// Shrink the capacity of the slab, which `DelayQueue` uses internally for storage allocation,
889 /// to the number of elements that are contained in it.
890 ///
891 /// This methods runs in O(n).
892 ///
893 /// # Examples
894 ///
895 /// Basic usage
896 ///
897 /// ```rust
898 /// use tokio_util::time::DelayQueue;
899 /// use std::time::Duration;
900 ///
901 /// # #[tokio::main(flavor = "current_thread")]
902 /// # async fn main() {
903 /// let mut delay_queue = DelayQueue::with_capacity(10);
904 ///
905 /// let key1 = delay_queue.insert(5, Duration::from_secs(5));
906 /// let key2 = delay_queue.insert(10, Duration::from_secs(10));
907 /// let key3 = delay_queue.insert(15, Duration::from_secs(15));
908 ///
909 /// delay_queue.remove(&key2);
910 ///
911 /// delay_queue.compact();
912 /// assert_eq!(delay_queue.capacity(), 2);
913 /// # }
914 /// ```
915 pub fn compact(&mut self) {
916 self.slab.compact();
917 }
918
919 /// Gets the [`Key`] that [`poll_expired`] will pull out of the queue next, without
920 /// pulling it out or waiting for the deadline to expire.
921 ///
922 /// Entries that have already expired may be returned in any order, but it is
923 /// guaranteed that this method returns them in the same order as when items
924 /// are popped from the `DelayQueue`.
925 ///
926 /// # Examples
927 ///
928 /// Basic usage
929 ///
930 /// ```rust
931 /// use tokio_util::time::DelayQueue;
932 /// use std::time::Duration;
933 ///
934 /// # #[tokio::main(flavor = "current_thread")]
935 /// # async fn main() {
936 /// let mut delay_queue = DelayQueue::new();
937 ///
938 /// let key1 = delay_queue.insert("foo", Duration::from_secs(10));
939 /// let key2 = delay_queue.insert("bar", Duration::from_secs(5));
940 /// let key3 = delay_queue.insert("baz", Duration::from_secs(15));
941 ///
942 /// assert_eq!(delay_queue.peek().unwrap(), key2);
943 /// # }
944 /// ```
945 ///
946 /// [`Key`]: struct@Key
947 /// [`poll_expired`]: method@Self::poll_expired
948 pub fn peek(&self) -> Option<Key> {
949 use self::wheel::Stack;
950
951 self.expired.peek().or_else(|| self.wheel.peek())
952 }
953
954 /// Returns the next time to poll as determined by the wheel.
955 ///
956 /// Note that this does not include deadlines in the `expired` queue.
957 fn next_deadline(&self) -> Option<Instant> {
958 self.wheel
959 .poll_at()
960 .map(|poll_at| self.start + Duration::from_millis(poll_at))
961 }
962
963 /// Sets the delay of the item associated with `key` to expire after
964 /// `timeout`.
965 ///
966 /// This function is identical to `reset_at` but takes a `Duration` instead
967 /// of an `Instant`.
968 ///
969 /// The item remains in the queue but the delay is set to expire after
970 /// `timeout`. If `timeout` is zero, then the item is immediately made
971 /// available to the caller.
972 ///
973 /// # Panics
974 ///
975 /// This function panics if `timeout` is greater than the maximum supported
976 /// duration or if `key` is not contained by the queue.
977 ///
978 /// # Examples
979 ///
980 /// Basic usage
981 ///
982 /// ```rust
983 /// use tokio_util::time::DelayQueue;
984 /// use std::time::Duration;
985 ///
986 /// # #[tokio::main(flavor = "current_thread")]
987 /// # async fn main() {
988 /// let mut delay_queue = DelayQueue::new();
989 /// let key = delay_queue.insert("foo", Duration::from_secs(5));
990 ///
991 /// // "foo" is scheduled to be returned in 5 seconds
992 ///
993 /// delay_queue.reset(&key, Duration::from_secs(10));
994 ///
995 /// // "foo"is now scheduled to be returned in 10 seconds
996 /// # }
997 /// ```
998 #[track_caller]
999 pub fn reset(&mut self, key: &Key, timeout: Duration) {
1000 self.reset_at(key, Instant::now() + timeout);
1001 }
1002
1003 /// Clears the queue, removing all items.
1004 ///
1005 /// After calling `clear`, [`poll_expired`] will return `Ok(Ready(None))`.
1006 ///
1007 /// Note that this method has no effect on the allocated capacity.
1008 ///
1009 /// [`poll_expired`]: method@Self::poll_expired
1010 ///
1011 /// # Examples
1012 ///
1013 /// ```rust
1014 /// use tokio_util::time::DelayQueue;
1015 /// use std::time::Duration;
1016 ///
1017 /// # #[tokio::main(flavor = "current_thread")]
1018 /// # async fn main() {
1019 /// let mut delay_queue = DelayQueue::new();
1020 ///
1021 /// delay_queue.insert("foo", Duration::from_secs(5));
1022 ///
1023 /// assert!(!delay_queue.is_empty());
1024 ///
1025 /// delay_queue.clear();
1026 ///
1027 /// assert!(delay_queue.is_empty());
1028 /// # }
1029 /// ```
1030 pub fn clear(&mut self) {
1031 self.slab.clear();
1032 self.expired = Stack::default();
1033 self.wheel = Wheel::new();
1034 self.delay = None;
1035 }
1036
1037 /// Returns the number of elements the queue can hold without reallocating.
1038 ///
1039 /// # Examples
1040 ///
1041 /// ```rust
1042 /// use tokio_util::time::DelayQueue;
1043 ///
1044 /// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10);
1045 /// assert_eq!(delay_queue.capacity(), 10);
1046 /// ```
1047 pub fn capacity(&self) -> usize {
1048 self.slab.capacity()
1049 }
1050
1051 /// Returns the number of elements currently in the queue.
1052 ///
1053 /// # Examples
1054 ///
1055 /// ```rust
1056 /// use tokio_util::time::DelayQueue;
1057 /// use std::time::Duration;
1058 ///
1059 /// # #[tokio::main(flavor = "current_thread")]
1060 /// # async fn main() {
1061 /// let mut delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10);
1062 /// assert_eq!(delay_queue.len(), 0);
1063 /// delay_queue.insert(3, Duration::from_secs(5));
1064 /// assert_eq!(delay_queue.len(), 1);
1065 /// # }
1066 /// ```
1067 pub fn len(&self) -> usize {
1068 self.slab.len()
1069 }
1070
1071 /// Reserves capacity for at least `additional` more items to be queued
1072 /// without allocating.
1073 ///
1074 /// `reserve` does nothing if the queue already has sufficient capacity for
1075 /// `additional` more values. If more capacity is required, a new segment of
1076 /// memory will be allocated and all existing values will be copied into it.
1077 /// As such, if the queue is already very large, a call to `reserve` can end
1078 /// up being expensive.
1079 ///
1080 /// The queue may reserve more than `additional` extra space in order to
1081 /// avoid frequent reallocations.
1082 ///
1083 /// # Panics
1084 ///
1085 /// Panics if the new capacity exceeds the maximum number of entries the
1086 /// queue can contain.
1087 ///
1088 /// # Examples
1089 ///
1090 /// ```
1091 /// use tokio_util::time::DelayQueue;
1092 /// use std::time::Duration;
1093 ///
1094 /// # #[tokio::main(flavor = "current_thread")]
1095 /// # async fn main() {
1096 /// let mut delay_queue = DelayQueue::new();
1097 ///
1098 /// delay_queue.insert("hello", Duration::from_secs(10));
1099 /// delay_queue.reserve(10);
1100 ///
1101 /// assert!(delay_queue.capacity() >= 11);
1102 /// # }
1103 /// ```
1104 #[track_caller]
1105 pub fn reserve(&mut self, additional: usize) {
1106 assert!(
1107 self.slab.capacity() + additional <= MAX_ENTRIES,
1108 "max queue capacity exceeded"
1109 );
1110 self.slab.reserve(additional);
1111 }
1112
1113 /// Returns `true` if there are no items in the queue.
1114 ///
1115 /// Note that this function returns `false` even if all items have not yet
1116 /// expired and a call to `poll` will return `Poll::Pending`.
1117 ///
1118 /// # Examples
1119 ///
1120 /// ```
1121 /// use tokio_util::time::DelayQueue;
1122 /// use std::time::Duration;
1123 ///
1124 /// # #[tokio::main(flavor = "current_thread")]
1125 /// # async fn main() {
1126 /// let mut delay_queue = DelayQueue::new();
1127 /// assert!(delay_queue.is_empty());
1128 ///
1129 /// delay_queue.insert("hello", Duration::from_secs(5));
1130 /// assert!(!delay_queue.is_empty());
1131 /// # }
1132 /// ```
1133 pub fn is_empty(&self) -> bool {
1134 self.slab.is_empty()
1135 }
1136
1137 /// Polls the queue, returning the index of the next slot in the slab that
1138 /// should be returned.
1139 ///
1140 /// A slot should be returned when the associated deadline has been reached.
1141 fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Key>> {
1142 use self::wheel::Stack;
1143
1144 let expired = self.expired.pop(&mut self.slab);
1145
1146 if expired.is_some() {
1147 return Poll::Ready(expired);
1148 }
1149
1150 loop {
1151 if let Some(ref mut delay) = self.delay {
1152 if !delay.is_elapsed() {
1153 ready!(Pin::new(&mut *delay).poll(cx));
1154 }
1155
1156 let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down);
1157
1158 self.wheel_now = now;
1159 }
1160
1161 // We poll the wheel to get the next value out before finding the next deadline.
1162 let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab);
1163
1164 self.delay = self.next_deadline().map(|when| Box::pin(sleep_until(when)));
1165
1166 if let Some(idx) = wheel_idx {
1167 return Poll::Ready(Some(idx));
1168 }
1169
1170 if self.delay.is_none() {
1171 return Poll::Ready(None);
1172 }
1173 }
1174 }
1175
1176 fn normalize_deadline(&self, when: Instant) -> u64 {
1177 let when = if when < self.start {
1178 0
1179 } else {
1180 crate::time::ms(when - self.start, crate::time::Round::Up)
1181 };
1182
1183 cmp::max(when, self.wheel.elapsed())
1184 }
1185}
1186
1187// We never put `T` in a `Pin`...
1188impl<T> Unpin for DelayQueue<T> {}
1189
1190impl<T> Default for DelayQueue<T> {
1191 fn default() -> DelayQueue<T> {
1192 DelayQueue::new()
1193 }
1194}
1195
1196impl<T> futures_core::Stream for DelayQueue<T> {
1197 // DelayQueue seems much more specific, where a user may care that it
1198 // has reached capacity, so return those errors instead of panicking.
1199 type Item = Expired<T>;
1200
1201 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
1202 DelayQueue::poll_expired(self.get_mut(), cx)
1203 }
1204}
1205
1206impl<T> wheel::Stack for Stack<T> {
1207 type Owned = Key;
1208 type Borrowed = Key;
1209 type Store = SlabStorage<T>;
1210
1211 fn is_empty(&self) -> bool {
1212 self.head.is_none()
1213 }
1214
1215 fn push(&mut self, item: Self::Owned, store: &mut Self::Store) {
1216 // Ensure the entry is not already in a stack.
1217 debug_assert!(store[item].next.is_none());
1218 debug_assert!(store[item].prev.is_none());
1219
1220 // Remove the old head entry
1221 let old = self.head.take();
1222
1223 if let Some(idx) = old {
1224 store[idx].prev = Some(item);
1225 }
1226
1227 store[item].next = old;
1228 self.head = Some(item);
1229 }
1230
1231 fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned> {
1232 if let Some(key) = self.head {
1233 self.head = store[key].next;
1234
1235 if let Some(idx) = self.head {
1236 store[idx].prev = None;
1237 }
1238
1239 store[key].next = None;
1240 debug_assert!(store[key].prev.is_none());
1241
1242 Some(key)
1243 } else {
1244 None
1245 }
1246 }
1247
1248 fn peek(&self) -> Option<Self::Owned> {
1249 self.head
1250 }
1251
1252 #[track_caller]
1253 fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) {
1254 let key = *item;
1255 assert!(store.contains(item));
1256
1257 // Ensure that the entry is in fact contained by the stack
1258 debug_assert!({
1259 // This walks the full linked list even if an entry is found.
1260 let mut next = self.head;
1261 let mut contains = false;
1262
1263 while let Some(idx) = next {
1264 let data = &store[idx];
1265
1266 if idx == *item {
1267 debug_assert!(!contains);
1268 contains = true;
1269 }
1270
1271 next = data.next;
1272 }
1273
1274 contains
1275 });
1276
1277 if let Some(next) = store[key].next {
1278 store[next].prev = store[key].prev;
1279 }
1280
1281 if let Some(prev) = store[key].prev {
1282 store[prev].next = store[key].next;
1283 } else {
1284 self.head = store[key].next;
1285 }
1286
1287 store[key].next = None;
1288 store[key].prev = None;
1289 }
1290
1291 fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 {
1292 store[*item].when
1293 }
1294}
1295
1296impl<T> Default for Stack<T> {
1297 fn default() -> Stack<T> {
1298 Stack {
1299 head: None,
1300 _p: PhantomData,
1301 }
1302 }
1303}
1304
1305impl Key {
1306 pub(crate) fn new(index: usize) -> Key {
1307 Key { index }
1308 }
1309}
1310
1311impl KeyInternal {
1312 pub(crate) fn new(index: usize) -> KeyInternal {
1313 KeyInternal { index }
1314 }
1315}
1316
1317impl From<Key> for KeyInternal {
1318 fn from(item: Key) -> Self {
1319 KeyInternal::new(item.index)
1320 }
1321}
1322
1323impl From<KeyInternal> for Key {
1324 fn from(item: KeyInternal) -> Self {
1325 Key::new(item.index)
1326 }
1327}
1328
1329impl<T> Expired<T> {
1330 /// Returns a reference to the inner value.
1331 pub fn get_ref(&self) -> &T {
1332 &self.data
1333 }
1334
1335 /// Returns a mutable reference to the inner value.
1336 pub fn get_mut(&mut self) -> &mut T {
1337 &mut self.data
1338 }
1339
1340 /// Consumes `self` and returns the inner value.
1341 pub fn into_inner(self) -> T {
1342 self.data
1343 }
1344
1345 /// Returns the deadline that the expiration was set to.
1346 pub fn deadline(&self) -> Instant {
1347 self.deadline
1348 }
1349
1350 /// Returns the key that the expiration is indexed by.
1351 pub fn key(&self) -> Key {
1352 self.key
1353 }
1354}