tokio_util/time/
delay_queue.rs

1//! A queue of delayed elements.
2//!
3//! See [`DelayQueue`] for more details.
4//!
5//! [`DelayQueue`]: struct@DelayQueue
6
7use crate::time::wheel::{self, Wheel};
8
9use tokio::time::{sleep_until, Duration, Instant, Sleep};
10
11use core::ops::{Index, IndexMut};
12use slab::Slab;
13use std::cmp;
14use std::collections::HashMap;
15use std::convert::From;
16use std::fmt;
17use std::fmt::Debug;
18use std::future::Future;
19use std::marker::PhantomData;
20use std::pin::Pin;
21use std::task::{self, ready, Poll, Waker};
22
23/// A queue of delayed elements.
24///
25/// Once an element is inserted into the `DelayQueue`, it is yielded once the
26/// specified deadline has been reached.
27///
28/// # Usage
29///
30/// Elements are inserted into `DelayQueue` using the [`insert`] or
31/// [`insert_at`] methods. A deadline is provided with the item and a [`Key`] is
32/// returned. The key is used to remove the entry or to change the deadline at
33/// which it should be yielded back.
34///
35/// Once delays have been configured, the `DelayQueue` is used via its
36/// [`Stream`] implementation. [`poll_expired`] is called. If an entry has reached its
37/// deadline, it is returned. If not, `Poll::Pending` is returned indicating that the
38/// current task will be notified once the deadline has been reached.
39///
40/// # `Stream` implementation
41///
42/// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have
43/// expired, no items are returned. In this case, [`Poll::Pending`] is returned and the
44/// current task is registered to be notified once the next item's delay has
45/// expired.
46///
47/// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll`
48/// returns `Poll::Ready(None)`. This indicates that the stream has reached an end.
49/// However, if a new item is inserted *after*, `poll` will once again start
50/// returning items or `Poll::Pending`.
51///
52/// Items are returned ordered by their expirations. Items that are configured
53/// to expire first will be returned first. There are no ordering guarantees
54/// for items configured to expire at the same instant. Also note that delays are
55/// rounded to the closest millisecond.
56///
57/// # Implementation
58///
59/// The [`DelayQueue`] is backed by a separate instance of a timer wheel similar to that used internally
60/// by Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same
61/// performance and scalability benefits.
62///
63/// State associated with each entry is stored in a [`slab`]. This amortizes the cost of allocation,
64/// and allows reuse of the memory allocated for expired entries.
65///
66/// Capacity can be checked using [`capacity`] and allocated preemptively by using
67/// the [`reserve`] method.
68///
69/// # Cancellation safety
70///
71/// [`DelayQueue`]'s implementation of [`StreamExt::next`] is cancellation safe.
72///
73/// # Usage
74///
75/// Using [`DelayQueue`] to manage cache entries.
76///
77/// ```rust,no_run
78/// use tokio_util::time::{DelayQueue, delay_queue};
79///
80/// use std::collections::HashMap;
81/// use std::task::{ready, Context, Poll};
82/// use std::time::Duration;
83/// # type CacheKey = String;
84/// # type Value = String;
85///
86/// struct Cache {
87///     entries: HashMap<CacheKey, (Value, delay_queue::Key)>,
88///     expirations: DelayQueue<CacheKey>,
89/// }
90///
91/// const TTL_SECS: u64 = 30;
92///
93/// impl Cache {
94///     fn insert(&mut self, key: CacheKey, value: Value) {
95///         let delay = self.expirations
96///             .insert(key.clone(), Duration::from_secs(TTL_SECS));
97///
98///         self.entries.insert(key, (value, delay));
99///     }
100///
101///     fn get(&self, key: &CacheKey) -> Option<&Value> {
102///         self.entries.get(key)
103///             .map(|&(ref v, _)| v)
104///     }
105///
106///     fn remove(&mut self, key: &CacheKey) {
107///         if let Some((_, cache_key)) = self.entries.remove(key) {
108///             self.expirations.remove(&cache_key);
109///         }
110///     }
111///
112///     fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<()> {
113///         while let Some(entry) = ready!(self.expirations.poll_expired(cx)) {
114///             self.entries.remove(entry.get_ref());
115///         }
116///
117///         Poll::Ready(())
118///     }
119/// }
120/// ```
121///
122/// [`insert`]: method@Self::insert
123/// [`insert_at`]: method@Self::insert_at
124/// [`Key`]: struct@Key
125/// [`Stream`]: https://docs.rs/futures/0.3.31/futures/stream/trait.Stream.html
126/// [`StreamExt::next`]: https://docs.rs/tokio-stream/0.1.17/tokio_stream/trait.StreamExt.html#method.next
127/// [`poll_expired`]: method@Self::poll_expired
128/// [`Stream::poll_expired`]: method@Self::poll_expired
129/// [`DelayQueue`]: struct@DelayQueue
130/// [`sleep`]: fn@tokio::time::sleep
131/// [`slab`]: slab
132/// [`capacity`]: method@Self::capacity
133/// [`reserve`]: method@Self::reserve
134#[derive(Debug)]
135pub struct DelayQueue<T> {
136    /// Stores data associated with entries
137    slab: SlabStorage<T>,
138
139    /// Lookup structure tracking all delays in the queue
140    wheel: Wheel<Stack<T>>,
141
142    /// Delays that were inserted when already expired. These cannot be stored
143    /// in the wheel
144    expired: Stack<T>,
145
146    /// Delay expiring when the *first* item in the queue expires
147    delay: Option<Pin<Box<Sleep>>>,
148
149    /// Wheel polling state
150    wheel_now: u64,
151
152    /// Instant at which the timer starts
153    start: Instant,
154
155    /// Waker that is invoked when we potentially need to reset the timer.
156    /// Because we lazily create the timer when the first entry is created, we
157    /// need to awaken any poller that polled us before that point.
158    waker: Option<Waker>,
159}
160
161#[derive(Default)]
162struct SlabStorage<T> {
163    inner: Slab<Data<T>>,
164
165    // A `compact` call requires a re-mapping of the `Key`s that were changed
166    // during the `compact` call of the `slab`. Since the keys that were given out
167    // cannot be changed retroactively we need to keep track of these re-mappings.
168    // The keys of `key_map` correspond to the old keys that were given out and
169    // the values to the `Key`s that were re-mapped by the `compact` call.
170    key_map: HashMap<Key, KeyInternal>,
171
172    // Index used to create new keys to hand out.
173    next_key_index: usize,
174
175    // Whether `compact` has been called, necessary in order to decide whether
176    // to include keys in `key_map`.
177    compact_called: bool,
178}
179
180impl<T> SlabStorage<T> {
181    pub(crate) fn with_capacity(capacity: usize) -> SlabStorage<T> {
182        SlabStorage {
183            inner: Slab::with_capacity(capacity),
184            key_map: HashMap::new(),
185            next_key_index: 0,
186            compact_called: false,
187        }
188    }
189
190    // Inserts data into the inner slab and re-maps keys if necessary
191    pub(crate) fn insert(&mut self, val: Data<T>) -> Key {
192        let mut key = KeyInternal::new(self.inner.insert(val));
193        let key_contained = self.key_map.contains_key(&key.into());
194
195        if key_contained {
196            // It's possible that a `compact` call creates capacity in `self.inner` in
197            // such a way that a `self.inner.insert` call creates a `key` which was
198            // previously given out during an `insert` call prior to the `compact` call.
199            // If `key` is contained in `self.key_map`, we have encountered this exact situation,
200            // We need to create a new key `key_to_give_out` and include the relation
201            // `key_to_give_out` -> `key` in `self.key_map`.
202            let key_to_give_out = self.create_new_key();
203            assert!(!self.key_map.contains_key(&key_to_give_out.into()));
204            self.key_map.insert(key_to_give_out.into(), key);
205            key = key_to_give_out;
206        } else if self.compact_called {
207            // Include an identity mapping in `self.key_map` in order to allow us to
208            // panic if a key that was handed out is removed more than once.
209            self.key_map.insert(key.into(), key);
210        }
211
212        key.into()
213    }
214
215    // Re-map the key in case compact was previously called.
216    // Note: Since we include identity mappings in key_map after compact was called,
217    // we have information about all keys that were handed out. In the case in which
218    // compact was called and we try to remove a Key that was previously removed
219    // we can detect invalid keys if no key is found in `key_map`. This is necessary
220    // in order to prevent situations in which a previously removed key
221    // corresponds to a re-mapped key internally and which would then be incorrectly
222    // removed from the slab.
223    //
224    // Example to illuminate this problem:
225    //
226    // Let's assume our `key_map` is {1 -> 2, 2 -> 1} and we call remove(1). If we
227    // were to remove 1 again, we would not find it inside `key_map` anymore.
228    // If we were to imply from this that no re-mapping was necessary, we would
229    // incorrectly remove 1 from `self.slab.inner`, which corresponds to the
230    // handed-out key 2.
231    pub(crate) fn remove(&mut self, key: &Key) -> Data<T> {
232        let remapped_key = if self.compact_called {
233            match self.key_map.remove(key) {
234                Some(key_internal) => key_internal,
235                None => panic!("invalid key"),
236            }
237        } else {
238            (*key).into()
239        };
240
241        self.inner.remove(remapped_key.index)
242    }
243
244    pub(crate) fn shrink_to_fit(&mut self) {
245        self.inner.shrink_to_fit();
246        self.key_map.shrink_to_fit();
247    }
248
249    pub(crate) fn compact(&mut self) {
250        if !self.compact_called {
251            for (key, _) in self.inner.iter() {
252                self.key_map.insert(Key::new(key), KeyInternal::new(key));
253            }
254        }
255
256        let mut remapping = HashMap::new();
257        self.inner.compact(|_, from, to| {
258            remapping.insert(from, to);
259            true
260        });
261
262        // At this point `key_map` contains a mapping for every element.
263        for internal_key in self.key_map.values_mut() {
264            if let Some(new_internal_key) = remapping.get(&internal_key.index) {
265                *internal_key = KeyInternal::new(*new_internal_key);
266            }
267        }
268
269        if self.key_map.capacity() > 2 * self.key_map.len() {
270            self.key_map.shrink_to_fit();
271        }
272
273        self.compact_called = true;
274    }
275
276    // Tries to re-map a `Key` that was given out to the user to its
277    // corresponding internal key.
278    fn remap_key(&self, key: &Key) -> Option<KeyInternal> {
279        let key_map = &self.key_map;
280        if self.compact_called {
281            key_map.get(key).copied()
282        } else {
283            Some((*key).into())
284        }
285    }
286
287    fn create_new_key(&mut self) -> KeyInternal {
288        while self.key_map.contains_key(&Key::new(self.next_key_index)) {
289            self.next_key_index = self.next_key_index.wrapping_add(1);
290        }
291
292        KeyInternal::new(self.next_key_index)
293    }
294
295    pub(crate) fn len(&self) -> usize {
296        self.inner.len()
297    }
298
299    pub(crate) fn capacity(&self) -> usize {
300        self.inner.capacity()
301    }
302
303    pub(crate) fn clear(&mut self) {
304        self.inner.clear();
305        self.key_map.clear();
306        self.compact_called = false;
307    }
308
309    pub(crate) fn reserve(&mut self, additional: usize) {
310        self.inner.reserve(additional);
311
312        if self.compact_called {
313            self.key_map.reserve(additional);
314        }
315    }
316
317    pub(crate) fn is_empty(&self) -> bool {
318        self.inner.is_empty()
319    }
320
321    pub(crate) fn contains(&self, key: &Key) -> bool {
322        let remapped_key = self.remap_key(key);
323
324        match remapped_key {
325            Some(internal_key) => self.inner.contains(internal_key.index),
326            None => false,
327        }
328    }
329}
330
331impl<T> fmt::Debug for SlabStorage<T>
332where
333    T: fmt::Debug,
334{
335    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
336        if fmt.alternate() {
337            fmt.debug_map().entries(self.inner.iter()).finish()
338        } else {
339            fmt.debug_struct("Slab")
340                .field("len", &self.len())
341                .field("cap", &self.capacity())
342                .finish()
343        }
344    }
345}
346
347impl<T> Index<Key> for SlabStorage<T> {
348    type Output = Data<T>;
349
350    fn index(&self, key: Key) -> &Self::Output {
351        let remapped_key = self.remap_key(&key);
352
353        match remapped_key {
354            Some(internal_key) => &self.inner[internal_key.index],
355            None => panic!("Invalid index {}", key.index),
356        }
357    }
358}
359
360impl<T> IndexMut<Key> for SlabStorage<T> {
361    fn index_mut(&mut self, key: Key) -> &mut Data<T> {
362        let remapped_key = self.remap_key(&key);
363
364        match remapped_key {
365            Some(internal_key) => &mut self.inner[internal_key.index],
366            None => panic!("Invalid index {}", key.index),
367        }
368    }
369}
370
371/// An entry in `DelayQueue` that has expired and been removed.
372///
373/// Values are returned by [`DelayQueue::poll_expired`].
374///
375/// [`DelayQueue::poll_expired`]: method@DelayQueue::poll_expired
376#[derive(Debug)]
377pub struct Expired<T> {
378    /// The data stored in the queue
379    data: T,
380
381    /// The expiration time
382    deadline: Instant,
383
384    /// The key associated with the entry
385    key: Key,
386}
387
388/// Token to a value stored in a `DelayQueue`.
389///
390/// Instances of `Key` are returned by [`DelayQueue::insert`]. See [`DelayQueue`]
391/// documentation for more details.
392///
393/// [`DelayQueue`]: struct@DelayQueue
394/// [`DelayQueue::insert`]: method@DelayQueue::insert
395#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
396pub struct Key {
397    index: usize,
398}
399
400// Whereas `Key` is given out to users that use `DelayQueue`, internally we use
401// `KeyInternal` as the key type in order to make the logic of mapping between keys
402// as a result of `compact` calls clearer.
403#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
404struct KeyInternal {
405    index: usize,
406}
407
408#[derive(Debug)]
409struct Stack<T> {
410    /// Head of the stack
411    head: Option<Key>,
412    _p: PhantomData<fn() -> T>,
413}
414
415#[derive(Debug)]
416struct Data<T> {
417    /// The data being stored in the queue and will be returned at the requested
418    /// instant.
419    inner: T,
420
421    /// The instant at which the item is returned.
422    when: u64,
423
424    /// Set to true when stored in the `expired` queue
425    expired: bool,
426
427    /// Next entry in the stack
428    next: Option<Key>,
429
430    /// Previous entry in the stack
431    prev: Option<Key>,
432}
433
434/// Maximum number of entries the queue can handle
435const MAX_ENTRIES: usize = (1 << 30) - 1;
436
437impl<T> DelayQueue<T> {
438    /// Creates a new, empty, `DelayQueue`.
439    ///
440    /// The queue will not allocate storage until items are inserted into it.
441    ///
442    /// # Examples
443    ///
444    /// ```rust
445    /// # use tokio_util::time::DelayQueue;
446    /// let delay_queue: DelayQueue<u32> = DelayQueue::new();
447    /// ```
448    pub fn new() -> DelayQueue<T> {
449        DelayQueue::with_capacity(0)
450    }
451
452    /// Creates a new, empty, `DelayQueue` with the specified capacity.
453    ///
454    /// The queue will be able to hold at least `capacity` elements without
455    /// reallocating. If `capacity` is 0, the queue will not allocate for
456    /// storage.
457    ///
458    /// # Examples
459    ///
460    /// ```rust
461    /// # use tokio_util::time::DelayQueue;
462    /// # use std::time::Duration;
463    ///
464    /// # #[tokio::main(flavor = "current_thread")]
465    /// # async fn main() {
466    /// let mut delay_queue = DelayQueue::with_capacity(10);
467    ///
468    /// // These insertions are done without further allocation
469    /// for i in 0..10 {
470    ///     delay_queue.insert(i, Duration::from_secs(i));
471    /// }
472    ///
473    /// // This will make the queue allocate additional storage
474    /// delay_queue.insert(11, Duration::from_secs(11));
475    /// # }
476    /// ```
477    pub fn with_capacity(capacity: usize) -> DelayQueue<T> {
478        DelayQueue {
479            wheel: Wheel::new(),
480            slab: SlabStorage::with_capacity(capacity),
481            expired: Stack::default(),
482            delay: None,
483            wheel_now: 0,
484            start: Instant::now(),
485            waker: None,
486        }
487    }
488
489    /// Inserts `value` into the queue set to expire at a specific instant in
490    /// time.
491    ///
492    /// This function is identical to `insert`, but takes an `Instant` instead
493    /// of a `Duration`.
494    ///
495    /// `value` is stored in the queue until `when` is reached. At which point,
496    /// `value` will be returned from [`poll_expired`]. If `when` has already been
497    /// reached, then `value` is immediately made available to poll.
498    ///
499    /// The return value represents the insertion and is used as an argument to
500    /// [`remove`] and [`reset`]. Note that [`Key`] is a token and is reused once
501    /// `value` is removed from the queue either by calling [`poll_expired`] after
502    /// `when` is reached or by calling [`remove`]. At this point, the caller
503    /// must take care to not use the returned [`Key`] again as it may reference
504    /// a different item in the queue.
505    ///
506    /// See [type] level documentation for more details.
507    ///
508    /// # Panics
509    ///
510    /// This function panics if `when` is too far in the future.
511    ///
512    /// # Examples
513    ///
514    /// Basic usage
515    ///
516    /// ```rust
517    /// use tokio::time::{Duration, Instant};
518    /// use tokio_util::time::DelayQueue;
519    ///
520    /// # #[tokio::main(flavor = "current_thread")]
521    /// # async fn main() {
522    /// let mut delay_queue = DelayQueue::new();
523    /// let key = delay_queue.insert_at(
524    ///     "foo", Instant::now() + Duration::from_secs(5));
525    ///
526    /// // Remove the entry
527    /// let item = delay_queue.remove(&key);
528    /// assert_eq!(*item.get_ref(), "foo");
529    /// # }
530    /// ```
531    ///
532    /// [`poll_expired`]: method@Self::poll_expired
533    /// [`remove`]: method@Self::remove
534    /// [`reset`]: method@Self::reset
535    /// [`Key`]: struct@Key
536    /// [type]: #
537    #[track_caller]
538    pub fn insert_at(&mut self, value: T, when: Instant) -> Key {
539        assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded");
540
541        // Normalize the deadline. Values cannot be set to expire in the past.
542        let when = self.normalize_deadline(when);
543
544        // Insert the value in the store
545        let key = self.slab.insert(Data {
546            inner: value,
547            when,
548            expired: false,
549            next: None,
550            prev: None,
551        });
552
553        self.insert_idx(when, key);
554
555        // Set a new delay if the current's deadline is later than the one of the new item
556        let should_set_delay = if let Some(ref delay) = self.delay {
557            let current_exp = self.normalize_deadline(delay.deadline());
558            current_exp > when
559        } else {
560            true
561        };
562
563        if should_set_delay {
564            if let Some(waker) = self.waker.take() {
565                waker.wake();
566            }
567
568            let delay_time = self.start + Duration::from_millis(when);
569            if let Some(ref mut delay) = &mut self.delay {
570                delay.as_mut().reset(delay_time);
571            } else {
572                self.delay = Some(Box::pin(sleep_until(delay_time)));
573            }
574        }
575
576        key
577    }
578
579    /// Attempts to pull out the next value of the delay queue, registering the
580    /// current task for wakeup if the value is not yet available, and returning
581    /// `None` if the queue is exhausted.
582    pub fn poll_expired(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Expired<T>>> {
583        if !self
584            .waker
585            .as_ref()
586            .map(|w| w.will_wake(cx.waker()))
587            .unwrap_or(false)
588        {
589            self.waker = Some(cx.waker().clone());
590        }
591
592        let item = ready!(self.poll_idx(cx));
593        Poll::Ready(item.map(|key| {
594            let data = self.slab.remove(&key);
595            debug_assert!(data.next.is_none());
596            debug_assert!(data.prev.is_none());
597
598            Expired {
599                key,
600                data: data.inner,
601                deadline: self.start + Duration::from_millis(data.when),
602            }
603        }))
604    }
605
606    /// Inserts `value` into the queue set to expire after the requested duration
607    /// elapses.
608    ///
609    /// This function is identical to `insert_at`, but takes a `Duration`
610    /// instead of an `Instant`.
611    ///
612    /// `value` is stored in the queue until `timeout` duration has
613    /// elapsed after `insert` was called. At that point, `value` will
614    /// be returned from [`poll_expired`]. If `timeout` is a `Duration` of
615    /// zero, then `value` is immediately made available to poll.
616    ///
617    /// The return value represents the insertion and is used as an
618    /// argument to [`remove`] and [`reset`]. Note that [`Key`] is a
619    /// token and is reused once `value` is removed from the queue
620    /// either by calling [`poll_expired`] after `timeout` has elapsed
621    /// or by calling [`remove`]. At this point, the caller must not
622    /// use the returned [`Key`] again as it may reference a different
623    /// item in the queue.
624    ///
625    /// See [type] level documentation for more details.
626    ///
627    /// # Panics
628    ///
629    /// This function panics if `timeout` is greater than the maximum
630    /// duration supported by the timer in the current `Runtime`.
631    ///
632    /// # Examples
633    ///
634    /// Basic usage
635    ///
636    /// ```rust
637    /// use tokio_util::time::DelayQueue;
638    /// use std::time::Duration;
639    ///
640    /// # #[tokio::main(flavor = "current_thread")]
641    /// # async fn main() {
642    /// let mut delay_queue = DelayQueue::new();
643    /// let key = delay_queue.insert("foo", Duration::from_secs(5));
644    ///
645    /// // Remove the entry
646    /// let item = delay_queue.remove(&key);
647    /// assert_eq!(*item.get_ref(), "foo");
648    /// # }
649    /// ```
650    ///
651    /// [`poll_expired`]: method@Self::poll_expired
652    /// [`remove`]: method@Self::remove
653    /// [`reset`]: method@Self::reset
654    /// [`Key`]: struct@Key
655    /// [type]: #
656    #[track_caller]
657    pub fn insert(&mut self, value: T, timeout: Duration) -> Key {
658        self.insert_at(value, Instant::now() + timeout)
659    }
660
661    #[track_caller]
662    fn insert_idx(&mut self, when: u64, key: Key) {
663        use self::wheel::{InsertError, Stack};
664
665        // Register the deadline with the timer wheel
666        match self.wheel.insert(when, key, &mut self.slab) {
667            Ok(_) => {}
668            Err((_, InsertError::Elapsed)) => {
669                self.slab[key].expired = true;
670                // The delay is already expired, store it in the expired queue
671                self.expired.push(key, &mut self.slab);
672            }
673            Err((_, err)) => panic!("invalid deadline; err={err:?}"),
674        }
675    }
676
677    /// Returns the deadline of the item associated with `key`.
678    ///
679    /// Since the queue operates at millisecond granularity, the returned
680    /// deadline may not exactly match the value that was given when initially
681    /// inserting the item into the queue.
682    ///
683    /// # Panics
684    ///
685    /// This function panics if `key` is not contained by the queue.
686    ///
687    /// # Examples
688    ///
689    /// Basic usage
690    ///
691    /// ```rust
692    /// use tokio_util::time::DelayQueue;
693    /// use std::time::Duration;
694    ///
695    /// # #[tokio::main(flavor = "current_thread")]
696    /// # async fn main() {
697    /// let mut delay_queue = DelayQueue::new();
698    ///
699    /// let key1 = delay_queue.insert("foo", Duration::from_secs(5));
700    /// let key2 = delay_queue.insert("bar", Duration::from_secs(10));
701    ///
702    /// assert!(delay_queue.deadline(&key1) < delay_queue.deadline(&key2));
703    /// # }
704    /// ```
705    #[track_caller]
706    pub fn deadline(&self, key: &Key) -> Instant {
707        self.start + Duration::from_millis(self.slab[*key].when)
708    }
709
710    /// Removes the key from the expired queue or the timer wheel
711    /// depending on its expiration status.
712    ///
713    /// # Panics
714    ///
715    /// Panics if the key is not contained in the expired queue or the wheel.
716    #[track_caller]
717    fn remove_key(&mut self, key: &Key) {
718        use crate::time::wheel::Stack;
719
720        // Special case the `expired` queue
721        if self.slab[*key].expired {
722            self.expired.remove(key, &mut self.slab);
723        } else {
724            self.wheel.remove(key, &mut self.slab);
725        }
726    }
727
728    /// Removes the item associated with `key` from the queue.
729    ///
730    /// There must be an item associated with `key`. The function returns the
731    /// removed item as well as the `Instant` at which it will the delay will
732    /// have expired.
733    ///
734    /// # Panics
735    ///
736    /// The function panics if `key` is not contained by the queue.
737    ///
738    /// # Examples
739    ///
740    /// Basic usage
741    ///
742    /// ```rust
743    /// use tokio_util::time::DelayQueue;
744    /// use std::time::Duration;
745    ///
746    /// # #[tokio::main(flavor = "current_thread")]
747    /// # async fn main() {
748    /// let mut delay_queue = DelayQueue::new();
749    /// let key = delay_queue.insert("foo", Duration::from_secs(5));
750    ///
751    /// // Remove the entry
752    /// let item = delay_queue.remove(&key);
753    /// assert_eq!(*item.get_ref(), "foo");
754    /// # }
755    /// ```
756    #[track_caller]
757    pub fn remove(&mut self, key: &Key) -> Expired<T> {
758        let prev_deadline = self.next_deadline();
759
760        self.remove_key(key);
761        let data = self.slab.remove(key);
762
763        let next_deadline = self.next_deadline();
764        if prev_deadline != next_deadline {
765            match (next_deadline, &mut self.delay) {
766                (None, _) => self.delay = None,
767                (Some(deadline), Some(delay)) => delay.as_mut().reset(deadline),
768                (Some(deadline), None) => self.delay = Some(Box::pin(sleep_until(deadline))),
769            }
770        }
771
772        if self.slab.is_empty() {
773            if let Some(waker) = self.waker.take() {
774                waker.wake();
775            }
776        }
777
778        Expired {
779            key: Key::new(key.index),
780            data: data.inner,
781            deadline: self.start + Duration::from_millis(data.when),
782        }
783    }
784
785    /// Attempts to remove the item associated with `key` from the queue.
786    ///
787    /// Removes the item associated with `key`, and returns it along with the
788    /// `Instant` at which it would have expired, if it exists.
789    ///
790    /// Returns `None` if `key` is not in the queue.
791    ///
792    /// # Examples
793    ///
794    /// Basic usage
795    ///
796    /// ```rust
797    /// use tokio_util::time::DelayQueue;
798    /// use std::time::Duration;
799    ///
800    /// # #[tokio::main(flavor = "current_thread")]
801    /// # async fn main() {
802    /// let mut delay_queue = DelayQueue::new();
803    /// let key = delay_queue.insert("foo", Duration::from_secs(5));
804    ///
805    /// // The item is in the queue, `try_remove` returns `Some(Expired("foo"))`.
806    /// let item = delay_queue.try_remove(&key);
807    /// assert_eq!(item.unwrap().into_inner(), "foo");
808    ///
809    /// // The item is not in the queue anymore, `try_remove` returns `None`.
810    /// let item = delay_queue.try_remove(&key);
811    /// assert!(item.is_none());
812    /// # }
813    /// ```
814    pub fn try_remove(&mut self, key: &Key) -> Option<Expired<T>> {
815        if self.slab.contains(key) {
816            Some(self.remove(key))
817        } else {
818            None
819        }
820    }
821
822    /// Sets the delay of the item associated with `key` to expire at `when`.
823    ///
824    /// This function is identical to `reset` but takes an `Instant` instead of
825    /// a `Duration`.
826    ///
827    /// The item remains in the queue but the delay is set to expire at `when`.
828    /// If `when` is in the past, then the item is immediately made available to
829    /// the caller.
830    ///
831    /// # Panics
832    ///
833    /// This function panics if `when` is too far in the future or if `key` is
834    /// not contained by the queue.
835    ///
836    /// # Examples
837    ///
838    /// Basic usage
839    ///
840    /// ```rust
841    /// use tokio::time::{Duration, Instant};
842    /// use tokio_util::time::DelayQueue;
843    ///
844    /// # #[tokio::main(flavor = "current_thread")]
845    /// # async fn main() {
846    /// let mut delay_queue = DelayQueue::new();
847    /// let key = delay_queue.insert("foo", Duration::from_secs(5));
848    ///
849    /// // "foo" is scheduled to be returned in 5 seconds
850    ///
851    /// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
852    ///
853    /// // "foo" is now scheduled to be returned in 10 seconds
854    /// # }
855    /// ```
856    #[track_caller]
857    pub fn reset_at(&mut self, key: &Key, when: Instant) {
858        self.remove_key(key);
859
860        // Normalize the deadline. Values cannot be set to expire in the past.
861        let when = self.normalize_deadline(when);
862
863        self.slab[*key].when = when;
864        self.slab[*key].expired = false;
865
866        self.insert_idx(when, *key);
867
868        let next_deadline = self.next_deadline();
869        if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) {
870            // This should awaken us if necessary (ie, if already expired)
871            delay.as_mut().reset(deadline);
872        }
873    }
874
875    /// Shrink the capacity of the slab, which `DelayQueue` uses internally for storage allocation.
876    /// This function is not guaranteed to, and in most cases, won't decrease the capacity of the slab
877    /// to the number of elements still contained in it, because elements cannot be moved to a different
878    /// index. To decrease the capacity to the size of the slab use [`compact`].
879    ///
880    /// This function can take O(n) time even when the capacity cannot be reduced or the allocation is
881    /// shrunk in place. Repeated calls run in O(1) though.
882    ///
883    /// [`compact`]: method@Self::compact
884    pub fn shrink_to_fit(&mut self) {
885        self.slab.shrink_to_fit();
886    }
887
888    /// Shrink the capacity of the slab, which `DelayQueue` uses internally for storage allocation,
889    /// to the number of elements that are contained in it.
890    ///
891    /// This methods runs in O(n).
892    ///
893    /// # Examples
894    ///
895    /// Basic usage
896    ///
897    /// ```rust
898    /// use tokio_util::time::DelayQueue;
899    /// use std::time::Duration;
900    ///
901    /// # #[tokio::main(flavor = "current_thread")]
902    /// # async fn main() {
903    /// let mut delay_queue = DelayQueue::with_capacity(10);
904    ///
905    /// let key1 = delay_queue.insert(5, Duration::from_secs(5));
906    /// let key2 = delay_queue.insert(10, Duration::from_secs(10));
907    /// let key3 = delay_queue.insert(15, Duration::from_secs(15));
908    ///
909    /// delay_queue.remove(&key2);
910    ///
911    /// delay_queue.compact();
912    /// assert_eq!(delay_queue.capacity(), 2);
913    /// # }
914    /// ```
915    pub fn compact(&mut self) {
916        self.slab.compact();
917    }
918
919    /// Gets the [`Key`] that [`poll_expired`] will pull out of the queue next, without
920    /// pulling it out or waiting for the deadline to expire.
921    ///
922    /// Entries that have already expired may be returned in any order, but it is
923    /// guaranteed that this method returns them in the same order as when items
924    /// are popped from the `DelayQueue`.
925    ///
926    /// # Examples
927    ///
928    /// Basic usage
929    ///
930    /// ```rust
931    /// use tokio_util::time::DelayQueue;
932    /// use std::time::Duration;
933    ///
934    /// # #[tokio::main(flavor = "current_thread")]
935    /// # async fn main() {
936    /// let mut delay_queue = DelayQueue::new();
937    ///
938    /// let key1 = delay_queue.insert("foo", Duration::from_secs(10));
939    /// let key2 = delay_queue.insert("bar", Duration::from_secs(5));
940    /// let key3 = delay_queue.insert("baz", Duration::from_secs(15));
941    ///
942    /// assert_eq!(delay_queue.peek().unwrap(), key2);
943    /// # }
944    /// ```
945    ///
946    /// [`Key`]: struct@Key
947    /// [`poll_expired`]: method@Self::poll_expired
948    pub fn peek(&self) -> Option<Key> {
949        use self::wheel::Stack;
950
951        self.expired.peek().or_else(|| self.wheel.peek())
952    }
953
954    /// Returns the next time to poll as determined by the wheel.
955    ///
956    /// Note that this does not include deadlines in the `expired` queue.
957    fn next_deadline(&self) -> Option<Instant> {
958        self.wheel
959            .poll_at()
960            .map(|poll_at| self.start + Duration::from_millis(poll_at))
961    }
962
963    /// Sets the delay of the item associated with `key` to expire after
964    /// `timeout`.
965    ///
966    /// This function is identical to `reset_at` but takes a `Duration` instead
967    /// of an `Instant`.
968    ///
969    /// The item remains in the queue but the delay is set to expire after
970    /// `timeout`. If `timeout` is zero, then the item is immediately made
971    /// available to the caller.
972    ///
973    /// # Panics
974    ///
975    /// This function panics if `timeout` is greater than the maximum supported
976    /// duration or if `key` is not contained by the queue.
977    ///
978    /// # Examples
979    ///
980    /// Basic usage
981    ///
982    /// ```rust
983    /// use tokio_util::time::DelayQueue;
984    /// use std::time::Duration;
985    ///
986    /// # #[tokio::main(flavor = "current_thread")]
987    /// # async fn main() {
988    /// let mut delay_queue = DelayQueue::new();
989    /// let key = delay_queue.insert("foo", Duration::from_secs(5));
990    ///
991    /// // "foo" is scheduled to be returned in 5 seconds
992    ///
993    /// delay_queue.reset(&key, Duration::from_secs(10));
994    ///
995    /// // "foo"is now scheduled to be returned in 10 seconds
996    /// # }
997    /// ```
998    #[track_caller]
999    pub fn reset(&mut self, key: &Key, timeout: Duration) {
1000        self.reset_at(key, Instant::now() + timeout);
1001    }
1002
1003    /// Clears the queue, removing all items.
1004    ///
1005    /// After calling `clear`, [`poll_expired`] will return `Ok(Ready(None))`.
1006    ///
1007    /// Note that this method has no effect on the allocated capacity.
1008    ///
1009    /// [`poll_expired`]: method@Self::poll_expired
1010    ///
1011    /// # Examples
1012    ///
1013    /// ```rust
1014    /// use tokio_util::time::DelayQueue;
1015    /// use std::time::Duration;
1016    ///
1017    /// # #[tokio::main(flavor = "current_thread")]
1018    /// # async fn main() {
1019    /// let mut delay_queue = DelayQueue::new();
1020    ///
1021    /// delay_queue.insert("foo", Duration::from_secs(5));
1022    ///
1023    /// assert!(!delay_queue.is_empty());
1024    ///
1025    /// delay_queue.clear();
1026    ///
1027    /// assert!(delay_queue.is_empty());
1028    /// # }
1029    /// ```
1030    pub fn clear(&mut self) {
1031        self.slab.clear();
1032        self.expired = Stack::default();
1033        self.wheel = Wheel::new();
1034        self.delay = None;
1035    }
1036
1037    /// Returns the number of elements the queue can hold without reallocating.
1038    ///
1039    /// # Examples
1040    ///
1041    /// ```rust
1042    /// use tokio_util::time::DelayQueue;
1043    ///
1044    /// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10);
1045    /// assert_eq!(delay_queue.capacity(), 10);
1046    /// ```
1047    pub fn capacity(&self) -> usize {
1048        self.slab.capacity()
1049    }
1050
1051    /// Returns the number of elements currently in the queue.
1052    ///
1053    /// # Examples
1054    ///
1055    /// ```rust
1056    /// use tokio_util::time::DelayQueue;
1057    /// use std::time::Duration;
1058    ///
1059    /// # #[tokio::main(flavor = "current_thread")]
1060    /// # async fn main() {
1061    /// let mut delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10);
1062    /// assert_eq!(delay_queue.len(), 0);
1063    /// delay_queue.insert(3, Duration::from_secs(5));
1064    /// assert_eq!(delay_queue.len(), 1);
1065    /// # }
1066    /// ```
1067    pub fn len(&self) -> usize {
1068        self.slab.len()
1069    }
1070
1071    /// Reserves capacity for at least `additional` more items to be queued
1072    /// without allocating.
1073    ///
1074    /// `reserve` does nothing if the queue already has sufficient capacity for
1075    /// `additional` more values. If more capacity is required, a new segment of
1076    /// memory will be allocated and all existing values will be copied into it.
1077    /// As such, if the queue is already very large, a call to `reserve` can end
1078    /// up being expensive.
1079    ///
1080    /// The queue may reserve more than `additional` extra space in order to
1081    /// avoid frequent reallocations.
1082    ///
1083    /// # Panics
1084    ///
1085    /// Panics if the new capacity exceeds the maximum number of entries the
1086    /// queue can contain.
1087    ///
1088    /// # Examples
1089    ///
1090    /// ```
1091    /// use tokio_util::time::DelayQueue;
1092    /// use std::time::Duration;
1093    ///
1094    /// # #[tokio::main(flavor = "current_thread")]
1095    /// # async fn main() {
1096    /// let mut delay_queue = DelayQueue::new();
1097    ///
1098    /// delay_queue.insert("hello", Duration::from_secs(10));
1099    /// delay_queue.reserve(10);
1100    ///
1101    /// assert!(delay_queue.capacity() >= 11);
1102    /// # }
1103    /// ```
1104    #[track_caller]
1105    pub fn reserve(&mut self, additional: usize) {
1106        assert!(
1107            self.slab.capacity() + additional <= MAX_ENTRIES,
1108            "max queue capacity exceeded"
1109        );
1110        self.slab.reserve(additional);
1111    }
1112
1113    /// Returns `true` if there are no items in the queue.
1114    ///
1115    /// Note that this function returns `false` even if all items have not yet
1116    /// expired and a call to `poll` will return `Poll::Pending`.
1117    ///
1118    /// # Examples
1119    ///
1120    /// ```
1121    /// use tokio_util::time::DelayQueue;
1122    /// use std::time::Duration;
1123    ///
1124    /// # #[tokio::main(flavor = "current_thread")]
1125    /// # async fn main() {
1126    /// let mut delay_queue = DelayQueue::new();
1127    /// assert!(delay_queue.is_empty());
1128    ///
1129    /// delay_queue.insert("hello", Duration::from_secs(5));
1130    /// assert!(!delay_queue.is_empty());
1131    /// # }
1132    /// ```
1133    pub fn is_empty(&self) -> bool {
1134        self.slab.is_empty()
1135    }
1136
1137    /// Polls the queue, returning the index of the next slot in the slab that
1138    /// should be returned.
1139    ///
1140    /// A slot should be returned when the associated deadline has been reached.
1141    fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Key>> {
1142        use self::wheel::Stack;
1143
1144        let expired = self.expired.pop(&mut self.slab);
1145
1146        if expired.is_some() {
1147            return Poll::Ready(expired);
1148        }
1149
1150        loop {
1151            if let Some(ref mut delay) = self.delay {
1152                if !delay.is_elapsed() {
1153                    ready!(Pin::new(&mut *delay).poll(cx));
1154                }
1155
1156                let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down);
1157
1158                self.wheel_now = now;
1159            }
1160
1161            // We poll the wheel to get the next value out before finding the next deadline.
1162            let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab);
1163
1164            self.delay = self.next_deadline().map(|when| Box::pin(sleep_until(when)));
1165
1166            if let Some(idx) = wheel_idx {
1167                return Poll::Ready(Some(idx));
1168            }
1169
1170            if self.delay.is_none() {
1171                return Poll::Ready(None);
1172            }
1173        }
1174    }
1175
1176    fn normalize_deadline(&self, when: Instant) -> u64 {
1177        let when = if when < self.start {
1178            0
1179        } else {
1180            crate::time::ms(when - self.start, crate::time::Round::Up)
1181        };
1182
1183        cmp::max(when, self.wheel.elapsed())
1184    }
1185}
1186
1187// We never put `T` in a `Pin`...
1188impl<T> Unpin for DelayQueue<T> {}
1189
1190impl<T> Default for DelayQueue<T> {
1191    fn default() -> DelayQueue<T> {
1192        DelayQueue::new()
1193    }
1194}
1195
1196impl<T> futures_core::Stream for DelayQueue<T> {
1197    // DelayQueue seems much more specific, where a user may care that it
1198    // has reached capacity, so return those errors instead of panicking.
1199    type Item = Expired<T>;
1200
1201    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
1202        DelayQueue::poll_expired(self.get_mut(), cx)
1203    }
1204}
1205
1206impl<T> wheel::Stack for Stack<T> {
1207    type Owned = Key;
1208    type Borrowed = Key;
1209    type Store = SlabStorage<T>;
1210
1211    fn is_empty(&self) -> bool {
1212        self.head.is_none()
1213    }
1214
1215    fn push(&mut self, item: Self::Owned, store: &mut Self::Store) {
1216        // Ensure the entry is not already in a stack.
1217        debug_assert!(store[item].next.is_none());
1218        debug_assert!(store[item].prev.is_none());
1219
1220        // Remove the old head entry
1221        let old = self.head.take();
1222
1223        if let Some(idx) = old {
1224            store[idx].prev = Some(item);
1225        }
1226
1227        store[item].next = old;
1228        self.head = Some(item);
1229    }
1230
1231    fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned> {
1232        if let Some(key) = self.head {
1233            self.head = store[key].next;
1234
1235            if let Some(idx) = self.head {
1236                store[idx].prev = None;
1237            }
1238
1239            store[key].next = None;
1240            debug_assert!(store[key].prev.is_none());
1241
1242            Some(key)
1243        } else {
1244            None
1245        }
1246    }
1247
1248    fn peek(&self) -> Option<Self::Owned> {
1249        self.head
1250    }
1251
1252    #[track_caller]
1253    fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) {
1254        let key = *item;
1255        assert!(store.contains(item));
1256
1257        // Ensure that the entry is in fact contained by the stack
1258        debug_assert!({
1259            // This walks the full linked list even if an entry is found.
1260            let mut next = self.head;
1261            let mut contains = false;
1262
1263            while let Some(idx) = next {
1264                let data = &store[idx];
1265
1266                if idx == *item {
1267                    debug_assert!(!contains);
1268                    contains = true;
1269                }
1270
1271                next = data.next;
1272            }
1273
1274            contains
1275        });
1276
1277        if let Some(next) = store[key].next {
1278            store[next].prev = store[key].prev;
1279        }
1280
1281        if let Some(prev) = store[key].prev {
1282            store[prev].next = store[key].next;
1283        } else {
1284            self.head = store[key].next;
1285        }
1286
1287        store[key].next = None;
1288        store[key].prev = None;
1289    }
1290
1291    fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 {
1292        store[*item].when
1293    }
1294}
1295
1296impl<T> Default for Stack<T> {
1297    fn default() -> Stack<T> {
1298        Stack {
1299            head: None,
1300            _p: PhantomData,
1301        }
1302    }
1303}
1304
1305impl Key {
1306    pub(crate) fn new(index: usize) -> Key {
1307        Key { index }
1308    }
1309}
1310
1311impl KeyInternal {
1312    pub(crate) fn new(index: usize) -> KeyInternal {
1313        KeyInternal { index }
1314    }
1315}
1316
1317impl From<Key> for KeyInternal {
1318    fn from(item: Key) -> Self {
1319        KeyInternal::new(item.index)
1320    }
1321}
1322
1323impl From<KeyInternal> for Key {
1324    fn from(item: KeyInternal) -> Self {
1325        Key::new(item.index)
1326    }
1327}
1328
1329impl<T> Expired<T> {
1330    /// Returns a reference to the inner value.
1331    pub fn get_ref(&self) -> &T {
1332        &self.data
1333    }
1334
1335    /// Returns a mutable reference to the inner value.
1336    pub fn get_mut(&mut self) -> &mut T {
1337        &mut self.data
1338    }
1339
1340    /// Consumes `self` and returns the inner value.
1341    pub fn into_inner(self) -> T {
1342        self.data
1343    }
1344
1345    /// Returns the deadline that the expiration was set to.
1346    pub fn deadline(&self) -> Instant {
1347        self.deadline
1348    }
1349
1350    /// Returns the key that the expiration is indexed by.
1351    pub fn key(&self) -> Key {
1352        self.key
1353    }
1354}