1use std::{ptr::NonNull, time::Duration};
15
16use super::{
17 concurrent::{arc::MiniArc, entry_info::EntryInfo, DeqNodes},
18 deque::{DeqNode, Deque},
19 time::Instant,
20};
21
22use parking_lot::Mutex;
23
24const BUCKET_COUNTS: &[u64] = &[
25 64, 64, 32, 4, 1, ];
31
32const OVERFLOW_QUEUE_INDEX: usize = BUCKET_COUNTS.len() - 1;
33const NUM_LEVELS: usize = OVERFLOW_QUEUE_INDEX - 1;
34
35const DAY: Duration = Duration::from_secs(60 * 60 * 24);
36
37const SPANS: &[u64] = &[
38 aligned_duration(Duration::from_secs(1)), aligned_duration(Duration::from_secs(60)), aligned_duration(Duration::from_secs(60 * 60)), aligned_duration(DAY), BUCKET_COUNTS[3] * aligned_duration(DAY), BUCKET_COUNTS[3] * aligned_duration(DAY), ];
45
46const SHIFT: &[u64] = &[
47 SPANS[0].trailing_zeros() as u64,
48 SPANS[1].trailing_zeros() as u64,
49 SPANS[2].trailing_zeros() as u64,
50 SPANS[3].trailing_zeros() as u64,
51 SPANS[4].trailing_zeros() as u64,
52];
53
54const fn aligned_duration(duration: Duration) -> u64 {
56 (duration.as_nanos() as u64).next_power_of_two()
59}
60
61pub(crate) enum TimerNode<K> {
63 Sentinel,
65 Entry {
68 pos: Option<(u8, u8)>,
70 entry_info: MiniArc<EntryInfo<K>>,
72 deq_nodes: MiniArc<Mutex<DeqNodes<K>>>,
74 },
75}
76
77impl<K> TimerNode<K> {
78 fn new(
79 entry_info: MiniArc<EntryInfo<K>>,
80 deq_nodes: MiniArc<Mutex<DeqNodes<K>>>,
81 level: usize,
82 index: usize,
83 ) -> Self {
84 Self::Entry {
85 pos: Some((level as u8, index as u8)),
86 entry_info,
87 deq_nodes,
88 }
89 }
90
91 fn position(&self) -> Option<(usize, usize)> {
93 if let Self::Entry { pos, .. } = &self {
94 pos.map(|(level, index)| (level as usize, index as usize))
95 } else {
96 unreachable!()
97 }
98 }
99
100 fn set_position(&mut self, level: usize, index: usize) {
101 if let Self::Entry { pos, .. } = self {
102 *pos = Some((level as u8, index as u8));
103 } else {
104 unreachable!()
105 }
106 }
107
108 fn unset_position(&mut self) {
109 if let Self::Entry { pos, .. } = self {
110 *pos = None;
111 } else {
112 unreachable!()
113 }
114 }
115
116 fn is_sentinel(&self) -> bool {
117 matches!(self, Self::Sentinel)
118 }
119
120 pub(crate) fn entry_info(&self) -> &MiniArc<EntryInfo<K>> {
121 if let Self::Entry { entry_info, .. } = &self {
122 entry_info
123 } else {
124 unreachable!()
125 }
126 }
127
128 fn unset_timer_node_in_deq_nodes(&self) {
129 if let Self::Entry { deq_nodes, .. } = &self {
130 deq_nodes.lock().set_timer_node(None);
131 } else {
132 unreachable!();
133 }
134 }
135}
136
137type Bucket<K> = Deque<TimerNode<K>>;
138
139#[must_use = "this `ReschedulingResult` may be an `Removed` variant, which should be handled"]
140pub(crate) enum ReschedulingResult<K> {
141 Rescheduled,
143 Removed(Box<DeqNode<TimerNode<K>>>),
145}
146
147pub(crate) struct TimerWheel<K> {
153 wheels: Box<[Box<[Bucket<K>]>]>,
155 origin: Instant,
157 current: Instant,
159}
160
161#[cfg(feature = "future")]
162#[allow(clippy::non_send_fields_in_send_ty)]
164unsafe impl<K> Send for TimerWheel<K> {}
168
169impl<K> TimerWheel<K> {
170 pub(crate) fn new(now: Instant) -> Self {
171 Self {
172 wheels: Box::default(), origin: now,
174 current: now,
175 }
176 }
177
178 pub(crate) fn is_enabled(&self) -> bool {
179 !self.wheels.is_empty()
180 }
181
182 pub(crate) fn enable(&mut self) {
183 assert!(!self.is_enabled());
184
185 self.wheels = BUCKET_COUNTS
187 .iter()
188 .map(|b| {
189 (0..*b)
190 .map(|_| {
191 let mut deq = Deque::new(super::CacheRegion::Other);
192 deq.push_back(Box::new(DeqNode::new(TimerNode::Sentinel)));
193 deq
194 })
195 .collect::<Vec<_>>()
196 .into_boxed_slice()
197 })
198 .collect::<Vec<_>>()
199 .into_boxed_slice();
200 }
201
202 pub(crate) fn schedule(
204 &mut self,
205 entry_info: MiniArc<EntryInfo<K>>,
206 deq_nodes: MiniArc<Mutex<DeqNodes<K>>>,
207 ) -> Option<NonNull<DeqNode<TimerNode<K>>>> {
208 debug_assert!(self.is_enabled());
209
210 if let Some(t) = entry_info.expiration_time() {
211 let (level, index) = self.bucket_indices(t);
212 let node = Box::new(DeqNode::new(TimerNode::new(
213 entry_info, deq_nodes, level, index,
214 )));
215 let node = self.wheels[level][index].push_back(node);
216 Some(node)
217 } else {
218 None
219 }
220 }
221
222 fn schedule_existing_node(
223 &mut self,
224 mut node: NonNull<DeqNode<TimerNode<K>>>,
225 ) -> ReschedulingResult<K> {
226 debug_assert!(self.is_enabled());
227
228 if let entry @ TimerNode::Entry { .. } = &mut unsafe { node.as_mut() }.element {
235 if let Some(t) = entry.entry_info().expiration_time() {
236 let (level, index) = self.bucket_indices(t);
237 entry.set_position(level, index);
238 let node = unsafe { Box::from_raw(node.as_ptr()) };
239 self.wheels[level][index].push_back(node);
240 ReschedulingResult::Rescheduled
241 } else {
242 entry.unset_position();
243 entry.unset_timer_node_in_deq_nodes();
244 ReschedulingResult::Removed(unsafe { Box::from_raw(node.as_ptr()) })
245 }
246 } else {
247 unreachable!()
248 }
249 }
250
251 pub(crate) fn reschedule(
253 &mut self,
254 node: NonNull<DeqNode<TimerNode<K>>>,
255 ) -> ReschedulingResult<K> {
256 debug_assert!(self.is_enabled());
257 unsafe { self.unlink_timer(node) };
258 self.schedule_existing_node(node)
259 }
260
261 pub(crate) fn deschedule(&mut self, node: NonNull<DeqNode<TimerNode<K>>>) {
263 debug_assert!(self.is_enabled());
264 unsafe {
265 self.unlink_timer(node);
266 Self::drop_node(node);
267 }
268 }
269
270 unsafe fn unlink_timer(&mut self, mut node: NonNull<DeqNode<TimerNode<K>>>) {
274 let p = node.as_mut();
277 if let entry @ TimerNode::Entry { .. } = &mut p.element {
278 if let Some((level, index)) = entry.position() {
279 self.wheels[level][index].unlink(node);
280 entry.unset_position();
281 }
282 } else {
283 unreachable!();
284 }
285 }
286
287 unsafe fn drop_node(node: NonNull<DeqNode<TimerNode<K>>>) {
288 std::mem::drop(Box::from_raw(node.as_ptr()));
289 }
290
291 pub(crate) fn advance(
294 &mut self,
295 current_time: Instant,
296 ) -> impl Iterator<Item = TimerEvent<K>> + '_ {
297 debug_assert!(self.is_enabled());
298
299 let previous_time = self.current;
300 self.current = current_time;
301 TimerEventsIter::new(self, previous_time, current_time)
302 }
303
304 fn pop_timer_node(&mut self, level: usize, index: usize) -> Option<Box<DeqNode<TimerNode<K>>>> {
307 let deque = &mut self.wheels[level][index];
308 if let Some(node) = deque.peek_front() {
309 if node.element.is_sentinel() {
310 return None;
311 }
312 }
313
314 deque.pop_front()
315 }
316
317 fn reset_timer_node_positions(&mut self, level: usize, index: usize) {
320 let deque = &mut self.wheels[level][index];
321 debug_assert!(
322 deque.len() > 0,
323 "BUG: The queue is empty. level: {level}, index: {index}"
324 );
325
326 while !deque.peek_back().unwrap().element.is_sentinel() {
329 deque.move_front_to_back();
330 }
331 }
332
333 fn bucket_indices(&self, time: Instant) -> (usize, usize) {
336 let duration_nanos = self.duration_nanos_since_last_advanced(time);
337 let time_nanos = self.time_nanos(time);
338 for level in 0..=NUM_LEVELS {
339 if duration_nanos < SPANS[level + 1] {
340 let ticks = time_nanos >> SHIFT[level];
341 let index = ticks & (BUCKET_COUNTS[level] - 1);
342 return (level, index as usize);
343 }
344 }
345 (OVERFLOW_QUEUE_INDEX, 0)
346 }
347
348 fn duration_nanos_since_last_advanced(&self, time: Instant) -> u64 {
351 time.saturating_duration_since(self.current).as_nanos() as u64
355 }
356
357 fn time_nanos(&self, time: Instant) -> u64 {
365 let nanos_u128 = time
366 .saturating_duration_since(self.origin)
371 .as_nanos();
372
373 nanos_u128.try_into().unwrap_or(u64::MAX)
376 }
377}
378
379#[derive(Debug)]
383pub(crate) enum TimerEvent<K> {
384 Expired(Box<DeqNode<TimerNode<K>>>),
386 #[cfg(test)]
390 Rescheduled(MiniArc<EntryInfo<K>>),
391 #[cfg(not(test))]
392 Rescheduled(()),
393 Descheduled,
396}
397
398pub(crate) struct TimerEventsIter<'iter, K> {
400 timer_wheel: &'iter mut TimerWheel<K>,
401 previous_time: Instant,
402 current_time: Instant,
403 is_done: bool,
404 level: usize,
405 index: u8,
406 end_index: u8,
407 index_mask: u64,
408 is_new_level: bool,
409 is_new_index: bool,
410}
411
412impl<'iter, K> TimerEventsIter<'iter, K> {
413 fn new(
414 timer_wheel: &'iter mut TimerWheel<K>,
415 previous_time: Instant,
416 current_time: Instant,
417 ) -> Self {
418 Self {
419 timer_wheel,
420 previous_time,
421 current_time,
422 is_done: false,
423 level: 0,
424 index: 0,
425 end_index: 0,
426 index_mask: 0,
427 is_new_level: true,
428 is_new_index: true,
429 }
430 }
431}
432
433impl<K> Drop for TimerEventsIter<'_, K> {
434 fn drop(&mut self) {
435 if !self.is_done {
436 self.timer_wheel.current = self.previous_time;
440 }
441 }
442}
443
444impl<K> Iterator for TimerEventsIter<'_, K> {
445 type Item = TimerEvent<K>;
446
447 fn next(&mut self) -> Option<Self::Item> {
450 if self.is_done {
451 return None;
452 }
453
454 loop {
455 if self.is_new_level {
456 let previous_time_nanos = self.timer_wheel.time_nanos(self.previous_time);
457 let current_time_nanos = self.timer_wheel.time_nanos(self.current_time);
458 let previous_ticks = previous_time_nanos >> SHIFT[self.level];
459 let current_ticks = current_time_nanos >> SHIFT[self.level];
460
461 if current_ticks <= previous_ticks {
462 self.is_done = true;
463 return None;
464 }
465
466 self.index_mask = BUCKET_COUNTS[self.level] - 1;
467 self.index = (previous_ticks & self.index_mask) as u8;
468 let steps =
469 (current_ticks - previous_ticks + 1).min(BUCKET_COUNTS[self.level]) as u8;
470 self.end_index = self.index + steps;
471
472 self.is_new_level = false;
473 self.is_new_index = true;
474
475 }
477
478 let i = self.index & self.index_mask as u8;
479
480 if self.is_new_index {
481 self.timer_wheel
483 .reset_timer_node_positions(self.level, i as usize);
484
485 self.is_new_index = false;
486 }
487
488 if let Some(node) = self.timer_wheel.pop_timer_node(self.level, i as usize) {
494 let expiration_time = node.as_ref().element.entry_info().expiration_time();
495 if let Some(t) = expiration_time {
496 if t <= self.current_time {
497 node.as_ref().element.unset_timer_node_in_deq_nodes();
500 return Some(TimerEvent::Expired(node));
501 }
502
503 let node_p = NonNull::new(Box::into_raw(node)).expect("Got a null ptr");
505
506 #[cfg(test)]
507 let entry_info =
510 MiniArc::clone(unsafe { node_p.as_ref() }.element.entry_info());
511
512 match self.timer_wheel.schedule_existing_node(node_p) {
513 ReschedulingResult::Rescheduled => {
514 #[cfg(test)]
515 return Some(TimerEvent::Rescheduled(entry_info));
516 #[cfg(not(test))]
517 return Some(TimerEvent::Rescheduled(()));
518 }
519 ReschedulingResult::Removed(node) => {
520 node.as_ref().element.unset_timer_node_in_deq_nodes();
523 return Some(TimerEvent::Descheduled);
524 }
525 }
526 }
527 } else {
528 self.index += 1;
531 self.is_new_index = true;
532
533 if self.index >= self.end_index {
534 self.level += 1;
535 if self.level >= BUCKET_COUNTS.len() {
537 self.is_done = true;
538 return None;
539 }
540 self.is_new_level = true;
541 }
542 }
543 }
544 }
545}
546
547#[cfg(test)]
548mod tests {
549 use std::{sync::Arc, time::Duration};
550
551 use super::{TimerEvent, TimerWheel, SPANS};
552 use crate::common::{
553 concurrent::{arc::MiniArc, entry_info::EntryInfo, KeyHash},
554 time::{Clock, Instant, Mock},
555 };
556
557 #[test]
558 fn test_bucket_indices() {
559 fn bi(timer: &TimerWheel<()>, now: Instant, dur: Duration) -> (usize, usize) {
560 let t = now.saturating_add(dur);
561 timer.bucket_indices(t)
562 }
563
564 let (clock, mock) = Clock::mock();
565 let now = clock.now();
566
567 let mut timer = TimerWheel::<()>::new(now);
568 timer.enable();
569
570 assert_eq!(timer.bucket_indices(now), (0, 0));
571
572 assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 0));
574 assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 1));
575 assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 63));
576
577 assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 1));
579 assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 1));
580 assert_eq!(bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * 63)), (1, 63));
581
582 assert_eq!(bi(&timer, now, n2d(SPANS[1] * 64)), (2, 1));
584 assert_eq!(bi(&timer, now, n2d(SPANS[2])), (2, 1));
585 assert_eq!(
586 bi(
587 &timer,
588 now,
589 n2d(SPANS[2] * 31 + SPANS[1] * 63 + SPANS[0] * 63)
590 ),
591 (2, 31)
592 );
593
594 assert_eq!(bi(&timer, now, n2d(SPANS[2] * 32)), (3, 1));
596 assert_eq!(bi(&timer, now, n2d(SPANS[3])), (3, 1));
597 assert_eq!(bi(&timer, now, n2d(SPANS[3] * 3)), (3, 3));
598
599 assert_eq!(bi(&timer, now, n2d(SPANS[3] * 4)), (4, 0));
601 assert_eq!(bi(&timer, now, n2d(SPANS[4])), (4, 0));
602 assert_eq!(bi(&timer, now, n2d(SPANS[4] * 100)), (4, 0));
603
604 let now = advance_clock(&clock, &mock, n2d(SPANS[0] * 5));
606 timer.current = now;
607
608 assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 5));
610 assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 6));
611 assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 4));
612
613 assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 1));
615 assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 1));
616 assert_eq!(
617 bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * (63 - 5))),
618 (1, 63)
619 );
620
621 let now = advance_clock(&clock, &mock, n2d(SPANS[0] * 61));
623 timer.current = now;
624
625 assert_eq!(bi(&timer, now, n2d(SPANS[0] - 1)), (0, 2));
627 assert_eq!(bi(&timer, now, n2d(SPANS[0])), (0, 3));
628 assert_eq!(bi(&timer, now, n2d(SPANS[0] * 63)), (0, 1));
629
630 assert_eq!(bi(&timer, now, n2d(SPANS[0] * 64)), (1, 2));
632 assert_eq!(bi(&timer, now, n2d(SPANS[1])), (1, 2));
633 assert_eq!(
634 bi(&timer, now, n2d(SPANS[1] * 63 + SPANS[0] * (63 - 2))),
635 (1, 0)
636 );
637 }
638
639 #[test]
640 fn test_advance() {
641 fn schedule_timer(timer: &mut TimerWheel<u32>, key: u32, now: Instant, ttl: Duration) {
642 let hash = key as u64;
643 let key_hash = KeyHash::new(Arc::new(key), hash);
644 let policy_weight = 0;
645 let entry_info = MiniArc::new(EntryInfo::new(key_hash, now, policy_weight));
646 entry_info.set_expiration_time(Some(now.saturating_add(ttl)));
647 let deq_nodes = Default::default();
648 let timer_node = timer.schedule(entry_info, MiniArc::clone(&deq_nodes));
649 deq_nodes.lock().set_timer_node(timer_node);
650 }
651
652 fn expired_key(maybe_entry: Option<TimerEvent<u32>>) -> u32 {
653 let entry = maybe_entry.expect("entry is none");
654 match entry {
655 TimerEvent::Expired(node) => *node.element.entry_info().key_hash().key,
656 _ => panic!("Expected an expired entry. Got {entry:?}"),
657 }
658 }
659
660 fn rescheduled_key(maybe_entry: Option<TimerEvent<u32>>) -> u32 {
661 let entry = maybe_entry.expect("entry is none");
662 match entry {
663 TimerEvent::Rescheduled(entry) => *entry.key_hash().key,
664 _ => panic!("Expected a rescheduled entry. Got {entry:?}"),
665 }
666 }
667
668 let (clock, mock) = Clock::mock();
669 let now = advance_clock(&clock, &mock, s2d(10));
670
671 let mut timer = TimerWheel::<u32>::new(now);
672 timer.enable();
673
674 schedule_timer(&mut timer, 1, now, s2d(5));
676 schedule_timer(&mut timer, 2, now, s2d(1));
677 schedule_timer(&mut timer, 3, now, s2d(63));
678 schedule_timer(&mut timer, 4, now, s2d(3));
679
680 let now = advance_clock(&clock, &mock, s2d(4));
681 let mut expired_entries = timer.advance(now);
682 assert_eq!(expired_key(expired_entries.next()), 2);
683 assert_eq!(expired_key(expired_entries.next()), 4);
684 assert!(expired_entries.next().is_none());
685 drop(expired_entries);
686
687 let now = advance_clock(&clock, &mock, s2d(4));
688 let mut expired_entries = timer.advance(now);
689 assert_eq!(expired_key(expired_entries.next()), 1);
690 assert!(expired_entries.next().is_none());
691 drop(expired_entries);
692
693 let now = advance_clock(&clock, &mock, s2d(64 - 8));
694 let mut expired_entries = timer.advance(now);
695 assert_eq!(expired_key(expired_entries.next()), 3);
696 assert!(expired_entries.next().is_none());
697 drop(expired_entries);
698
699 const MINUTES: u64 = 60;
701 schedule_timer(&mut timer, 1, now, s2d(5 * MINUTES));
702 #[allow(clippy::identity_op)]
703 schedule_timer(&mut timer, 2, now, s2d(1 * MINUTES));
704 schedule_timer(&mut timer, 3, now, s2d(63 * MINUTES));
705 schedule_timer(&mut timer, 4, now, s2d(3 * MINUTES));
706
707 let now = advance_clock(&clock, &mock, s2d(4 * MINUTES));
708 let mut expired_entries = timer.advance(now);
709 assert_eq!(expired_key(expired_entries.next()), 2);
710 assert_eq!(expired_key(expired_entries.next()), 4);
711 assert!(expired_entries.next().is_none());
712 drop(expired_entries);
713
714 let now = advance_clock(&clock, &mock, s2d(4 * MINUTES));
715 let mut expired_entries = timer.advance(now);
716 assert_eq!(expired_key(expired_entries.next()), 1);
717 assert!(expired_entries.next().is_none());
718 drop(expired_entries);
719
720 let now = advance_clock(&clock, &mock, s2d((64 - 8) * MINUTES));
721 let mut expired_entries = timer.advance(now);
722 assert_eq!(expired_key(expired_entries.next()), 3);
723 assert!(expired_entries.next().is_none());
724 drop(expired_entries);
725
726 const HOURS: u64 = 60 * 60;
728 schedule_timer(&mut timer, 1, now, s2d(5 * HOURS));
729 #[allow(clippy::identity_op)]
730 schedule_timer(&mut timer, 2, now, s2d(1 * HOURS));
731 schedule_timer(&mut timer, 3, now, s2d(31 * HOURS));
732 schedule_timer(&mut timer, 4, now, s2d(3 * HOURS));
733
734 let now = advance_clock(&clock, &mock, s2d(4 * HOURS));
735 let mut expired_entries = timer.advance(now);
736 assert_eq!(expired_key(expired_entries.next()), 2);
737 assert_eq!(expired_key(expired_entries.next()), 4);
738 assert_eq!(rescheduled_key(expired_entries.next()), 1);
739 assert!(expired_entries.next().is_none());
740 drop(expired_entries);
741
742 let now = advance_clock(&clock, &mock, s2d(4 * HOURS));
743 let mut expired_entries = timer.advance(now);
744 assert_eq!(expired_key(expired_entries.next()), 1);
745 assert!(expired_entries.next().is_none());
746 drop(expired_entries);
747
748 let now = advance_clock(&clock, &mock, s2d((32 - 8) * HOURS));
749 let mut expired_entries = timer.advance(now);
750 assert_eq!(expired_key(expired_entries.next()), 3);
751 assert!(expired_entries.next().is_none());
752 drop(expired_entries);
753
754 const DAYS: u64 = 24 * 60 * 60;
756 schedule_timer(&mut timer, 1, now, s2d(5 * DAYS));
757 #[allow(clippy::identity_op)]
758 schedule_timer(&mut timer, 2, now, s2d(1 * DAYS));
759 schedule_timer(&mut timer, 3, now, s2d(2 * DAYS));
760 schedule_timer(&mut timer, 4, now, s2d(8 * DAYS));
762
763 let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
764 let mut expired_entries = timer.advance(now);
765 assert_eq!(expired_key(expired_entries.next()), 2);
766 assert_eq!(expired_key(expired_entries.next()), 3);
767 assert!(expired_entries.next().is_none());
768 drop(expired_entries);
769
770 let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
771 let mut expired_entries = timer.advance(now);
772 assert_eq!(expired_key(expired_entries.next()), 1);
773 assert_eq!(rescheduled_key(expired_entries.next()), 4);
774 assert!(expired_entries.next().is_none());
775 drop(expired_entries);
776
777 let now = advance_clock(&clock, &mock, s2d(3 * DAYS));
778 let mut expired_entries = timer.advance(now);
779 assert_eq!(expired_key(expired_entries.next()), 4);
780 assert!(expired_entries.next().is_none());
781 drop(expired_entries);
782 }
783
784 fn advance_clock(clock: &Clock, mock: &Arc<Mock>, duration: Duration) -> Instant {
789 mock.increment(duration);
790 clock.now()
791 }
792
793 fn n2d(nanos: u64) -> Duration {
795 Duration::from_nanos(nanos)
796 }
797
798 fn s2d(secs: u64) -> Duration {
800 Duration::from_secs(secs)
801 }
802}