moka/common/
timer_wheel.rs

1// License and Copyright Notice:
2//
3// Some of the code and doc comments in this module were ported or copied from
4// a Java class `com.github.benmanes.caffeine.cache.TimerWheel` of Caffeine.
5// https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java
6//
7// The original code/comments from Caffeine are licensed under the Apache License,
8// Version 2.0 <https://github.com/ben-manes/caffeine/blob/master/LICENSE>
9//
10// Copyrights of the original code/comments are retained by their contributors.
11// For full authorship information, see the version control history of
12// https://github.com/ben-manes/caffeine/
13
14use std::{ptr::NonNull, time::Duration};
15
16use super::{
17    concurrent::{arc::MiniArc, entry_info::EntryInfo, DeqNodes},
18    deque::{DeqNode, Deque},
19    time::Instant,
20};
21
22use parking_lot::Mutex;
23
24const BUCKET_COUNTS: &[u64] = &[
25    64, // roughly seconds
26    64, // roughly minutes
27    32, // roughly hours
28    4,  // roughly days
29    1,  // overflow (> ~6.5 days)
30];
31
32const OVERFLOW_QUEUE_INDEX: usize = BUCKET_COUNTS.len() - 1;
33const NUM_LEVELS: usize = OVERFLOW_QUEUE_INDEX - 1;
34
35const DAY: Duration = Duration::from_secs(60 * 60 * 24);
36
37const SPANS: &[u64] = &[
38    aligned_duration(Duration::from_secs(1)),       // 1.07s
39    aligned_duration(Duration::from_secs(60)),      // 1.14m
40    aligned_duration(Duration::from_secs(60 * 60)), // 1.22h
41    aligned_duration(DAY),                          // 1.63d
42    BUCKET_COUNTS[3] * aligned_duration(DAY),       // 6.5d
43    BUCKET_COUNTS[3] * aligned_duration(DAY),       // 6.5d
44];
45
46const SHIFT: &[u64] = &[
47    SPANS[0].trailing_zeros() as u64,
48    SPANS[1].trailing_zeros() as u64,
49    SPANS[2].trailing_zeros() as u64,
50    SPANS[3].trailing_zeros() as u64,
51    SPANS[4].trailing_zeros() as u64,
52];
53
54/// Returns the next power of two of the duration in nanoseconds.
55const fn aligned_duration(duration: Duration) -> u64 {
56    // NOTE: as_nanos() returns u128, so convert it to u64 by using `as`.
57    // We cannot call TryInto::try_into() here because it is not a const fn.
58    (duration.as_nanos() as u64).next_power_of_two()
59}
60
61/// A timer node stored in a bucket of a timer wheel.
62pub(crate) enum TimerNode<K> {
63    /// A sentinel node that is used to mark the end of a timer wheel bucket.
64    Sentinel,
65    /// A timer entry that is holding Arc pointers to the data structures in a cache
66    /// entry.
67    Entry {
68        /// The position (level and index) of the timer wheel bucket.
69        pos: Option<(u8, u8)>,
70        /// An Arc pointer to the `EntryInfo` of the cache entry (`ValueEntry`).
71        entry_info: MiniArc<EntryInfo<K>>,
72        /// An Arc pointer to the `DeqNodes` of the cache entry (`ValueEntry`).
73        deq_nodes: MiniArc<Mutex<DeqNodes<K>>>,
74    },
75}
76
77impl<K> TimerNode<K> {
78    fn new(
79        entry_info: MiniArc<EntryInfo<K>>,
80        deq_nodes: MiniArc<Mutex<DeqNodes<K>>>,
81        level: usize,
82        index: usize,
83    ) -> Self {
84        Self::Entry {
85            pos: Some((level as u8, index as u8)),
86            entry_info,
87            deq_nodes,
88        }
89    }
90
91    /// Returns the position (level and index) of the timer wheel bucket.
92    fn position(&self) -> Option<(usize, usize)> {
93        if let Self::Entry { pos, .. } = &self {
94            pos.map(|(level, index)| (level as usize, index as usize))
95        } else {
96            unreachable!()
97        }
98    }
99
100    fn set_position(&mut self, level: usize, index: usize) {
101        if let Self::Entry { pos, .. } = self {
102            *pos = Some((level as u8, index as u8));
103        } else {
104            unreachable!()
105        }
106    }
107
108    fn unset_position(&mut self) {
109        if let Self::Entry { pos, .. } = self {
110            *pos = None;
111        } else {
112            unreachable!()
113        }
114    }
115
116    fn is_sentinel(&self) -> bool {
117        matches!(self, Self::Sentinel)
118    }
119
120    pub(crate) fn entry_info(&self) -> &MiniArc<EntryInfo<K>> {
121        if let Self::Entry { entry_info, .. } = &self {
122            entry_info
123        } else {
124            unreachable!()
125        }
126    }
127
128    fn unset_timer_node_in_deq_nodes(&self) {
129        if let Self::Entry { deq_nodes, .. } = &self {
130            deq_nodes.lock().set_timer_node(None);
131        } else {
132            unreachable!();
133        }
134    }
135}
136
137type Bucket<K> = Deque<TimerNode<K>>;
138
139#[must_use = "this `ReschedulingResult` may be an `Removed` variant, which should be handled"]
140pub(crate) enum ReschedulingResult<K> {
141    /// The timer event was rescheduled.
142    Rescheduled,
143    /// The timer event was not rescheduled because the entry has no expiration time.
144    Removed(Box<DeqNode<TimerNode<K>>>),
145}
146
147/// A hierarchical timer wheel to add, remove, and fire expiration events in
148/// amortized O(1) time.
149///
150/// The expiration events are deferred until the timer is advanced, which is
151/// performed as part of the cache's housekeeping cycle.
152pub(crate) struct TimerWheel<K> {
153    /// The hierarchical timer wheels.
154    wheels: Box<[Box<[Bucket<K>]>]>,
155    /// The time when this `TimerWheel` was created.
156    origin: Instant,
157    /// The time when this `TimerWheel` was last advanced.
158    current: Instant,
159}
160
161#[cfg(feature = "future")]
162// TODO: https://github.com/moka-rs/moka/issues/54
163#[allow(clippy::non_send_fields_in_send_ty)]
164// Multi-threaded async runtimes require base_cache::Inner to be Send, but it will
165// not be without this `unsafe impl`. This is because DeqNodes have NonNull
166// pointers.
167unsafe impl<K> Send for TimerWheel<K> {}
168
169impl<K> TimerWheel<K> {
170    pub(crate) fn new(now: Instant) -> Self {
171        Self {
172            wheels: Box::default(), // Empty.
173            origin: now,
174            current: now,
175        }
176    }
177
178    pub(crate) fn is_enabled(&self) -> bool {
179        !self.wheels.is_empty()
180    }
181
182    pub(crate) fn enable(&mut self) {
183        assert!(!self.is_enabled());
184
185        // Populate each bucket with a queue having a sentinel node.
186        self.wheels = BUCKET_COUNTS
187            .iter()
188            .map(|b| {
189                (0..*b)
190                    .map(|_| {
191                        let mut deq = Deque::new(super::CacheRegion::Other);
192                        deq.push_back(Box::new(DeqNode::new(TimerNode::Sentinel)));
193                        deq
194                    })
195                    .collect::<Vec<_>>()
196                    .into_boxed_slice()
197            })
198            .collect::<Vec<_>>()
199            .into_boxed_slice();
200    }
201
202    /// Schedules a timer event for the node.
203    pub(crate) fn schedule(
204        &mut self,
205        entry_info: MiniArc<EntryInfo<K>>,
206        deq_nodes: MiniArc<Mutex<DeqNodes<K>>>,
207    ) -> Option<NonNull<DeqNode<TimerNode<K>>>> {
208        debug_assert!(self.is_enabled());
209
210        if let Some(t) = entry_info.expiration_time() {
211            let (level, index) = self.bucket_indices(t);
212            let node = Box::new(DeqNode::new(TimerNode::new(
213                entry_info, deq_nodes, level, index,
214            )));
215            let node = self.wheels[level][index].push_back(node);
216            Some(node)
217        } else {
218            None
219        }
220    }
221
222    fn schedule_existing_node(
223        &mut self,
224        mut node: NonNull<DeqNode<TimerNode<K>>>,
225    ) -> ReschedulingResult<K> {
226        debug_assert!(self.is_enabled());
227
228        // Since cache entry's ValueEntry has a pointer to this node, we must reuse
229        // the node.
230        //
231        // SAFETY on `node.as_mut()`: The self (`TimerWheel`) is the only owner of
232        // the node, and we have `&mut self` here. We are the only one who can mutate
233        // the node.
234        if let entry @ TimerNode::Entry { .. } = &mut unsafe { node.as_mut() }.element {
235            if let Some(t) = entry.entry_info().expiration_time() {
236                let (level, index) = self.bucket_indices(t);
237                entry.set_position(level, index);
238                let node = unsafe { Box::from_raw(node.as_ptr()) };
239                self.wheels[level][index].push_back(node);
240                ReschedulingResult::Rescheduled
241            } else {
242                entry.unset_position();
243                entry.unset_timer_node_in_deq_nodes();
244                ReschedulingResult::Removed(unsafe { Box::from_raw(node.as_ptr()) })
245            }
246        } else {
247            unreachable!()
248        }
249    }
250
251    /// Reschedules an active timer event for the node.
252    pub(crate) fn reschedule(
253        &mut self,
254        node: NonNull<DeqNode<TimerNode<K>>>,
255    ) -> ReschedulingResult<K> {
256        debug_assert!(self.is_enabled());
257        unsafe { self.unlink_timer(node) };
258        self.schedule_existing_node(node)
259    }
260
261    /// Removes a timer event for this node if present.
262    pub(crate) fn deschedule(&mut self, node: NonNull<DeqNode<TimerNode<K>>>) {
263        debug_assert!(self.is_enabled());
264        unsafe {
265            self.unlink_timer(node);
266            Self::drop_node(node);
267        }
268    }
269
270    /// Removes a timer event for this node if present.
271    ///
272    /// IMPORTANT: This method does not drop the node.
273    unsafe fn unlink_timer(&mut self, mut node: NonNull<DeqNode<TimerNode<K>>>) {
274        // SAFETY: The self (`TimerWheel`) is the only owner of the node, and we have
275        // `&mut self` here. We are the only one who can mutate the node.
276        let p = node.as_mut();
277        if let entry @ TimerNode::Entry { .. } = &mut p.element {
278            if let Some((level, index)) = entry.position() {
279                self.wheels[level][index].unlink(node);
280                entry.unset_position();
281            }
282        } else {
283            unreachable!();
284        }
285    }
286
287    unsafe fn drop_node(node: NonNull<DeqNode<TimerNode<K>>>) {
288        std::mem::drop(Box::from_raw(node.as_ptr()));
289    }
290
291    /// Advances the timer wheel to the current time, and returns an iterator over
292    /// timer events.
293    pub(crate) fn advance(
294        &mut self,
295        current_time: Instant,
296    ) -> impl Iterator<Item = TimerEvent<K>> + '_ {
297        debug_assert!(self.is_enabled());
298
299        let previous_time = self.current;
300        self.current = current_time;
301        TimerEventsIter::new(self, previous_time, current_time)
302    }
303
304    /// Returns a pointer to the timer event (cache entry) at the front of the queue.
305    /// Returns `None` if the front node is a sentinel.
306    fn pop_timer_node(&mut self, level: usize, index: usize) -> Option<Box<DeqNode<TimerNode<K>>>> {
307        let deque = &mut self.wheels[level][index];
308        if let Some(node) = deque.peek_front() {
309            if node.element.is_sentinel() {
310                return None;
311            }
312        }
313
314        deque.pop_front()
315    }
316
317    /// Reset the positions of the nodes in the queue at the given level and index.
318    /// When done, the sentinel is at the back of the queue.
319    fn reset_timer_node_positions(&mut self, level: usize, index: usize) {
320        let deque = &mut self.wheels[level][index];
321        debug_assert!(
322            deque.len() > 0,
323            "BUG: The queue is empty. level: {level}, index: {index}"
324        );
325
326        // Rotate the nodes in the queue until we see the sentinel at the back of the
327        // queue.
328        while !deque.peek_back().unwrap().element.is_sentinel() {
329            deque.move_front_to_back();
330        }
331    }
332
333    /// Returns the bucket indices to locate the bucket that the timer event
334    /// should be added to.
335    fn bucket_indices(&self, time: Instant) -> (usize, usize) {
336        let duration_nanos = self.duration_nanos_since_last_advanced(time);
337        let time_nanos = self.time_nanos(time);
338        for level in 0..=NUM_LEVELS {
339            if duration_nanos < SPANS[level + 1] {
340                let ticks = time_nanos >> SHIFT[level];
341                let index = ticks & (BUCKET_COUNTS[level] - 1);
342                return (level, index as usize);
343            }
344        }
345        (OVERFLOW_QUEUE_INDEX, 0)
346    }
347
348    // Returns nano-seconds between the given `time` and the time when this timer
349    // wheel was advanced. If the `time` is earlier than other, returns zero.
350    fn duration_nanos_since_last_advanced(&self, time: Instant) -> u64 {
351        // If `time` is earlier than `self.current`, use zero. This could happen
352        // when a user provided `Expiry` method returned zero or a very short
353        // duration.
354        time.saturating_duration_since(self.current).as_nanos() as u64
355    }
356
357    // Returns nano-seconds between the given `time` and `self.origin`, the time when
358    // this timer wheel was created.
359    //
360    // - If the `time` is earlier than other, returns zero.
361    // - If the `time` is later than `self.origin + u64::MAX`, returns `u64::MAX`,
362    //   which is ~584 years in nanoseconds.
363    //
364    fn time_nanos(&self, time: Instant) -> u64 {
365        let nanos_u128 = time
366            // If `time` is earlier than `self.origin`, use zero. This would never
367            // happen in practice as there should be some delay between the timer
368            // wheel was created and the first timer event is scheduled. But we will
369            // do this just in case.
370            .saturating_duration_since(self.origin)
371            .as_nanos();
372
373        // Convert an `u128` into an `u64`. If the value is too large, use `u64::MAX`
374        // (~584 years)
375        nanos_u128.try_into().unwrap_or(u64::MAX)
376    }
377}
378
379/// A timer event, which is either an expired/rescheduled cache entry, or a
380/// descheduled timer. `TimerWheel::advance` method returns an iterator over timer
381/// events.
382#[derive(Debug)]
383pub(crate) enum TimerEvent<K> {
384    /// This cache entry has expired.
385    Expired(Box<DeqNode<TimerNode<K>>>),
386    // This cache entry has been rescheduled. Rescheduling includes moving a timer
387    // from one wheel to another in a lower level of the hierarchy. (This variant
388    // is mainly used for testing)
389    #[cfg(test)]
390    Rescheduled(MiniArc<EntryInfo<K>>),
391    #[cfg(not(test))]
392    Rescheduled(()),
393    /// This timer node (containing a cache entry) has been removed from the timer.
394    /// (This variant is mainly used for testing)
395    Descheduled,
396}
397
398/// An iterator over expired cache entries.
399pub(crate) struct TimerEventsIter<'iter, K> {
400    timer_wheel: &'iter mut TimerWheel<K>,
401    previous_time: Instant,
402    current_time: Instant,
403    is_done: bool,
404    level: usize,
405    index: u8,
406    end_index: u8,
407    index_mask: u64,
408    is_new_level: bool,
409    is_new_index: bool,
410}
411
412impl<'iter, K> TimerEventsIter<'iter, K> {
413    fn new(
414        timer_wheel: &'iter mut TimerWheel<K>,
415        previous_time: Instant,
416        current_time: Instant,
417    ) -> Self {
418        Self {
419            timer_wheel,
420            previous_time,
421            current_time,
422            is_done: false,
423            level: 0,
424            index: 0,
425            end_index: 0,
426            index_mask: 0,
427            is_new_level: true,
428            is_new_index: true,
429        }
430    }
431}
432
433impl<K> Drop for TimerEventsIter<'_, K> {
434    fn drop(&mut self) {
435        if !self.is_done {
436            // This iterator was dropped before consuming all events. Reset the
437            // `current` to the time when the timer wheel was last successfully
438            // advanced.
439            self.timer_wheel.current = self.previous_time;
440        }
441    }
442}
443
444impl<K> Iterator for TimerEventsIter<'_, K> {
445    type Item = TimerEvent<K>;
446
447    /// NOTE: When necessary, this iterator will unset the timer node pointer in the
448    /// `ValueEntry`.
449    fn next(&mut self) -> Option<Self::Item> {
450        if self.is_done {
451            return None;
452        }
453
454        loop {
455            if self.is_new_level {
456                let previous_time_nanos = self.timer_wheel.time_nanos(self.previous_time);
457                let current_time_nanos = self.timer_wheel.time_nanos(self.current_time);
458                let previous_ticks = previous_time_nanos >> SHIFT[self.level];
459                let current_ticks = current_time_nanos >> SHIFT[self.level];
460
461                if current_ticks <= previous_ticks {
462                    self.is_done = true;
463                    return None;
464                }
465
466                self.index_mask = BUCKET_COUNTS[self.level] - 1;
467                self.index = (previous_ticks & self.index_mask) as u8;
468                let steps =
469                    (current_ticks - previous_ticks + 1).min(BUCKET_COUNTS[self.level]) as u8;
470                self.end_index = self.index + steps;
471
472                self.is_new_level = false;
473                self.is_new_index = true;
474
475                // dbg!(self.level, self.index, self.end_index);
476            }
477
478            let i = self.index & self.index_mask as u8;
479
480            if self.is_new_index {
481                // Move the sentinel to the back of the queue.
482                self.timer_wheel
483                    .reset_timer_node_positions(self.level, i as usize);
484
485                self.is_new_index = false;
486            }
487
488            // Pop the next timer event (cache entry) from the queue at the current
489            // level and index.
490            //
491            // We will repeat processing this level until we see the sentinel.
492            // (`pop_timer_node` will return `None` when it sees the sentinel)
493            if let Some(node) = self.timer_wheel.pop_timer_node(self.level, i as usize) {
494                let expiration_time = node.as_ref().element.entry_info().expiration_time();
495                if let Some(t) = expiration_time {
496                    if t <= self.current_time {
497                        // The cache entry has expired. Unset the timer node from
498                        // the ValueEntry and return the node.
499                        node.as_ref().element.unset_timer_node_in_deq_nodes();
500                        return Some(TimerEvent::Expired(node));
501                    }
502
503                    // The cache entry has not expired. Reschedule it.
504                    let node_p = NonNull::new(Box::into_raw(node)).expect("Got a null ptr");
505
506                    #[cfg(test)]
507                    // Get the entry info before rescheduling (mutating) the node to
508                    // avoid Stacked Borrows/Tree Borrows violations on `node_p`.
509                    let entry_info =
510                        MiniArc::clone(unsafe { node_p.as_ref() }.element.entry_info());
511
512                    match self.timer_wheel.schedule_existing_node(node_p) {
513                        ReschedulingResult::Rescheduled => {
514                            #[cfg(test)]
515                            return Some(TimerEvent::Rescheduled(entry_info));
516                            #[cfg(not(test))]
517                            return Some(TimerEvent::Rescheduled(()));
518                        }
519                        ReschedulingResult::Removed(node) => {
520                            // The timer event has been removed from the timer
521                            // wheel. Unset the timer node from the ValueEntry.
522                            node.as_ref().element.unset_timer_node_in_deq_nodes();
523                            return Some(TimerEvent::Descheduled);
524                        }
525                    }
526                }
527            } else {
528                // Done with the current queue. Move to the next index
529                // and/or next level.
530                self.index += 1;
531                self.is_new_index = true;
532
533                if self.index >= self.end_index {
534                    self.level += 1;
535                    // No more levels to process. We are done.
536                    if self.level >= BUCKET_COUNTS.len() {
537                        self.is_done = true;
538                        return None;
539                    }
540                    self.is_new_level = true;
541                }
542            }
543        }
544    }
545}
546
547#[cfg(test)]
548mod tests {
549    use std::{sync::Arc, time::Duration};
550
551    use super::{TimerEvent, TimerWheel, SPANS};
552    use crate::common::{
553        concurrent::{arc::MiniArc, entry_info::EntryInfo, KeyHash},
554        time::{Clock, Instant, Mock},
555    };
556
557    #[test]
558    fn test_bucket_indices() {
559        fn bi(timer: &TimerWheel<()>, now: Instant, dur: Duration) -> (usize, usize) {
560            let t = now.saturating_add(dur);
561            timer.bucket_indices(t)
562        }
563
564        let (clock, mock) = Clock::mock();
565        let now = clock.now();
566
567        let mut timer = TimerWheel::<()>::new(now);
568        timer.enable();
569
570        assert_eq!(timer.bucket_indices(now), (0, 0));
571
572        // Level 0: 1.07s
573        assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 0));
574        assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 1));
575        assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 63));
576
577        // Level 1: 1.14m
578        assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 1));
579        assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 1));
580        assert_eq!(bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * 63)), (1, 63));
581
582        // Level 2: 1.22h
583        assert_eq!(bi(&timer, now, n2d(SPANS[1] * 64)), (2, 1));
584        assert_eq!(bi(&timer, now, n2d(SPANS[2])), (2, 1));
585        assert_eq!(
586            bi(
587                &timer,
588                now,
589                n2d(SPANS[2] * 31 + SPANS[1] * 63 + SPANS[0] * 63)
590            ),
591            (2, 31)
592        );
593
594        // Level 3: 1.63dh
595        assert_eq!(bi(&timer, now, n2d(SPANS[2] * 32)), (3, 1));
596        assert_eq!(bi(&timer, now, n2d(SPANS[3])), (3, 1));
597        assert_eq!(bi(&timer, now, n2d(SPANS[3] * 3)), (3, 3));
598
599        // Overflow
600        assert_eq!(bi(&timer, now, n2d(SPANS[3] * 4)), (4, 0));
601        assert_eq!(bi(&timer, now, n2d(SPANS[4])), (4, 0));
602        assert_eq!(bi(&timer, now, n2d(SPANS[4] * 100)), (4, 0));
603
604        // Increment the clock by 5 ticks. (1 tick ~= 1.07s)
605        let now = advance_clock(&clock, &mock, n2d(SPANS[0] * 5));
606        timer.current = now;
607
608        // Level 0: 1.07s
609        assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 5));
610        assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 6));
611        assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 4));
612
613        // Level 1: 1.14m
614        assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 1));
615        assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 1));
616        assert_eq!(
617            bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * (63 - 5))),
618            (1, 63)
619        );
620
621        // Increment the clock by 61 ticks. (total 66 ticks)
622        let now = advance_clock(&clock, &mock, n2d(SPANS[0] * 61));
623        timer.current = now;
624
625        // Level 0: 1.07s
626        assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 2));
627        assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 3));
628        assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 1));
629
630        // Level 1: 1.14m
631        assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 2));
632        assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 2));
633        assert_eq!(
634            bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * (63 - 2))),
635            (1, 0)
636        );
637    }
638
639    #[test]
640    fn test_advance() {
641        fn schedule_timer(timer: &mut TimerWheel<u32>, key: u32, now: Instant, ttl: Duration) {
642            let hash = key as u64;
643            let key_hash = KeyHash::new(Arc::new(key), hash);
644            let policy_weight = 0;
645            let entry_info = MiniArc::new(EntryInfo::new(key_hash, now, policy_weight));
646            entry_info.set_expiration_time(Some(now.saturating_add(ttl)));
647            let deq_nodes = Default::default();
648            let timer_node = timer.schedule(entry_info, MiniArc::clone(&deq_nodes));
649            deq_nodes.lock().set_timer_node(timer_node);
650        }
651
652        fn expired_key(maybe_entry: Option<TimerEvent<u32>>) -> u32 {
653            let entry = maybe_entry.expect("entry is none");
654            match entry {
655                TimerEvent::Expired(node) => *node.element.entry_info().key_hash().key,
656                _ => panic!("Expected an expired entry. Got {entry:?}"),
657            }
658        }
659
660        fn rescheduled_key(maybe_entry: Option<TimerEvent<u32>>) -> u32 {
661            let entry = maybe_entry.expect("entry is none");
662            match entry {
663                TimerEvent::Rescheduled(entry) => *entry.key_hash().key,
664                _ => panic!("Expected a rescheduled entry. Got {entry:?}"),
665            }
666        }
667
668        let (clock, mock) = Clock::mock();
669        let now = advance_clock(&clock, &mock, s2d(10));
670
671        let mut timer = TimerWheel::<u32>::new(now);
672        timer.enable();
673
674        // Add timers that will expire in some seconds.
675        schedule_timer(&mut timer, 1, now, s2d(5));
676        schedule_timer(&mut timer, 2, now, s2d(1));
677        schedule_timer(&mut timer, 3, now, s2d(63));
678        schedule_timer(&mut timer, 4, now, s2d(3));
679
680        let now = advance_clock(&clock, &mock, s2d(4));
681        let mut expired_entries = timer.advance(now);
682        assert_eq!(expired_key(expired_entries.next()), 2);
683        assert_eq!(expired_key(expired_entries.next()), 4);
684        assert!(expired_entries.next().is_none());
685        drop(expired_entries);
686
687        let now = advance_clock(&clock, &mock, s2d(4));
688        let mut expired_entries = timer.advance(now);
689        assert_eq!(expired_key(expired_entries.next()), 1);
690        assert!(expired_entries.next().is_none());
691        drop(expired_entries);
692
693        let now = advance_clock(&clock, &mock, s2d(64 - 8));
694        let mut expired_entries = timer.advance(now);
695        assert_eq!(expired_key(expired_entries.next()), 3);
696        assert!(expired_entries.next().is_none());
697        drop(expired_entries);
698
699        // Add timers that will expire in some minutes.
700        const MINUTES: u64 = 60;
701        schedule_timer(&mut timer, 1, now, s2d(5 * MINUTES));
702        #[allow(clippy::identity_op)]
703        schedule_timer(&mut timer, 2, now, s2d(1 * MINUTES));
704        schedule_timer(&mut timer, 3, now, s2d(63 * MINUTES));
705        schedule_timer(&mut timer, 4, now, s2d(3 * MINUTES));
706
707        let now = advance_clock(&clock, &mock, s2d(4 * MINUTES));
708        let mut expired_entries = timer.advance(now);
709        assert_eq!(expired_key(expired_entries.next()), 2);
710        assert_eq!(expired_key(expired_entries.next()), 4);
711        assert!(expired_entries.next().is_none());
712        drop(expired_entries);
713
714        let now = advance_clock(&clock, &mock, s2d(4 * MINUTES));
715        let mut expired_entries = timer.advance(now);
716        assert_eq!(expired_key(expired_entries.next()), 1);
717        assert!(expired_entries.next().is_none());
718        drop(expired_entries);
719
720        let now = advance_clock(&clock, &mock, s2d((64 - 8) * MINUTES));
721        let mut expired_entries = timer.advance(now);
722        assert_eq!(expired_key(expired_entries.next()), 3);
723        assert!(expired_entries.next().is_none());
724        drop(expired_entries);
725
726        // Add timers that will expire in some hours.
727        const HOURS: u64 = 60 * 60;
728        schedule_timer(&mut timer, 1, now, s2d(5 * HOURS));
729        #[allow(clippy::identity_op)]
730        schedule_timer(&mut timer, 2, now, s2d(1 * HOURS));
731        schedule_timer(&mut timer, 3, now, s2d(31 * HOURS));
732        schedule_timer(&mut timer, 4, now, s2d(3 * HOURS));
733
734        let now = advance_clock(&clock, &mock, s2d(4 * HOURS));
735        let mut expired_entries = timer.advance(now);
736        assert_eq!(expired_key(expired_entries.next()), 2);
737        assert_eq!(expired_key(expired_entries.next()), 4);
738        assert_eq!(rescheduled_key(expired_entries.next()), 1);
739        assert!(expired_entries.next().is_none());
740        drop(expired_entries);
741
742        let now = advance_clock(&clock, &mock, s2d(4 * HOURS));
743        let mut expired_entries = timer.advance(now);
744        assert_eq!(expired_key(expired_entries.next()), 1);
745        assert!(expired_entries.next().is_none());
746        drop(expired_entries);
747
748        let now = advance_clock(&clock, &mock, s2d((32 - 8) * HOURS));
749        let mut expired_entries = timer.advance(now);
750        assert_eq!(expired_key(expired_entries.next()), 3);
751        assert!(expired_entries.next().is_none());
752        drop(expired_entries);
753
754        // Add timers that will expire in a few days.
755        const DAYS: u64 = 24 * 60 * 60;
756        schedule_timer(&mut timer, 1, now, s2d(5 * DAYS));
757        #[allow(clippy::identity_op)]
758        schedule_timer(&mut timer, 2, now, s2d(1 * DAYS));
759        schedule_timer(&mut timer, 3, now, s2d(2 * DAYS));
760        // Longer than ~6.5 days, so this should be stored in the overflow area.
761        schedule_timer(&mut timer, 4, now, s2d(8 * DAYS));
762
763        let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
764        let mut expired_entries = timer.advance(now);
765        assert_eq!(expired_key(expired_entries.next()), 2);
766        assert_eq!(expired_key(expired_entries.next()), 3);
767        assert!(expired_entries.next().is_none());
768        drop(expired_entries);
769
770        let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
771        let mut expired_entries = timer.advance(now);
772        assert_eq!(expired_key(expired_entries.next()), 1);
773        assert_eq!(rescheduled_key(expired_entries.next()), 4);
774        assert!(expired_entries.next().is_none());
775        drop(expired_entries);
776
777        let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
778        let mut expired_entries = timer.advance(now);
779        assert_eq!(expired_key(expired_entries.next()), 4);
780        assert!(expired_entries.next().is_none());
781        drop(expired_entries);
782    }
783
784    //
785    // Utility functions
786    //
787
788    fn advance_clock(clock: &Clock, mock: &Arc<Mock>, duration: Duration) -> Instant {
789        mock.increment(duration);
790        clock.now()
791    }
792
793    /// Convert nano-seconds to duration.
794    fn n2d(nanos: u64) -> Duration {
795        Duration::from_nanos(nanos)
796    }
797
798    /// Convert seconds to duration.
799    fn s2d(secs: u64) -> Duration {
800        Duration::from_secs(secs)
801    }
802}