tokio_metrics/
runtime.rs

1use std::time::{Duration, Instant};
2use tokio::runtime;
3
4#[cfg(feature = "metrics-rs-integration")]
5pub(crate) mod metrics_rs_integration;
6
7/// Monitors key metrics of the tokio runtime.
8///
9/// ### Usage
10/// ```
11/// use std::time::Duration;
12/// use tokio_metrics::RuntimeMonitor;
13///
14/// #[tokio::main]
15/// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
16///     let handle = tokio::runtime::Handle::current();
17///
18///     // print runtime metrics every 500ms
19///     {
20///         let runtime_monitor = RuntimeMonitor::new(&handle);
21///         tokio::spawn(async move {
22///             for interval in runtime_monitor.intervals() {
23///                 // pretty-print the metric interval
24///                 println!("{:?}", interval);
25///                 // wait 500ms
26///                 tokio::time::sleep(Duration::from_millis(500)).await;
27///             }
28///         });
29///     }
30///
31///     // await some tasks
32///     tokio::join![
33///         do_work(),
34///         do_work(),
35///         do_work(),
36///     ];
37///
38///     Ok(())
39/// }
40///
41/// async fn do_work() {
42///     for _ in 0..25 {
43///         tokio::task::yield_now().await;
44///         tokio::time::sleep(Duration::from_millis(100)).await;
45///     }
46/// }
47/// ```
48#[derive(Debug)]
49pub struct RuntimeMonitor {
50    /// Handle to the runtime
51    runtime: runtime::RuntimeMetrics,
52}
53
54macro_rules! define_runtime_metrics {
55    (
56    stable {
57        $(
58            $(#[$($attributes:tt)*])*
59            $vis:vis $name:ident: $ty:ty
60        ),*
61        $(,)?
62    }
63    unstable {
64        $(
65            $(#[$($unstable_attributes:tt)*])*
66            $unstable_vis:vis $unstable_name:ident: $unstable_ty:ty
67        ),*
68        $(,)?
69    }
70    ) => {
71        /// Key runtime metrics.
72        #[non_exhaustive]
73        #[derive(Default, Debug, Clone)]
74        pub struct RuntimeMetrics {
75            $(
76                $(#[$($attributes)*])*
77                #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
78                $vis $name: $ty,
79            )*
80            $(
81                $(#[$($unstable_attributes)*])*
82                #[cfg(tokio_unstable)]
83                #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
84                $unstable_vis $unstable_name: $unstable_ty,
85            )*
86        }
87    };
88}
89
90define_runtime_metrics! {
91    stable {
92        /// The number of worker threads used by the runtime.
93        ///
94        /// This metric is static for a runtime.
95        ///
96        /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`].
97        /// When using the `current_thread` runtime, the return value is always `1`.
98        ///
99        /// The number of workers is set by configuring
100        /// [`worker_threads`][`tokio::runtime::Builder::worker_threads`] with
101        /// [`tokio::runtime::Builder`], or by parameterizing [`tokio::main`].
102        ///
103        /// ##### Examples
104        /// In the below example, the number of workers is set by parameterizing [`tokio::main`]:
105        /// ```
106        /// use tokio::runtime::Handle;
107        ///
108        /// #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
109        /// async fn main() {
110        ///     let handle = tokio::runtime::Handle::current();
111        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
112        ///     let mut intervals = monitor.intervals();
113        ///     let mut next_interval = || intervals.next().unwrap();
114        ///
115        ///     assert_eq!(next_interval().workers_count, 10);
116        /// }
117        /// ```
118        ///
119        /// [`tokio::main`]: https://docs.rs/tokio/latest/tokio/attr.main.html
120        ///
121        /// When using the `current_thread` runtime, the return value is always `1`; e.g.:
122        /// ```
123        /// use tokio::runtime::Handle;
124        ///
125        /// #[tokio::main(flavor = "current_thread")]
126        /// async fn main() {
127        ///     let handle = tokio::runtime::Handle::current();
128        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
129        ///     let mut intervals = monitor.intervals();
130        ///     let mut next_interval = || intervals.next().unwrap();
131        ///
132        ///     assert_eq!(next_interval().workers_count, 1);
133        /// }
134        /// ```
135        ///
136        /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`]; e.g.:
137        /// ```
138        /// use tokio::runtime::Handle;
139        ///
140        /// #[tokio::main]
141        /// async fn main() {
142        ///     let handle = Handle::current();
143        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
144        ///     let mut intervals = monitor.intervals();
145        ///     let mut next_interval = || intervals.next().unwrap();
146        ///
147        ///     assert_eq!(next_interval().workers_count, handle.metrics().num_workers());
148        /// }
149        /// ```
150        pub workers_count: usize,
151
152        /// The number of times worker threads parked.
153        ///
154        /// The worker park count increases by one each time the worker parks the thread waiting for
155        /// new inbound events to process. This usually means the worker has processed all pending work
156        /// and is currently idle.
157        ///
158        /// ##### Definition
159        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_park_count`]
160        /// across all worker threads.
161        ///
162        /// ##### See also
163        /// - [`RuntimeMetrics::max_park_count`]
164        /// - [`RuntimeMetrics::min_park_count`]
165        ///
166        /// ##### Examples
167        /// ```
168        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
169        /// async fn main() {
170        ///     let handle = tokio::runtime::Handle::current();
171        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
172        ///     let mut intervals = monitor.intervals();
173        ///     let mut next_interval = || intervals.next().unwrap();
174        ///
175        ///     let interval = next_interval(); // end of interval 1
176        ///     assert_eq!(interval.total_park_count, 0);
177        ///
178        ///     induce_parks().await;
179        ///
180        ///     let interval = next_interval(); // end of interval 2
181        ///     assert!(interval.total_park_count >= 1); // usually 1 or 2 parks
182        /// }
183        ///
184        /// async fn induce_parks() {
185        ///     let _ = tokio::time::timeout(std::time::Duration::ZERO, async {
186        ///         loop { tokio::task::yield_now().await; }
187        ///     }).await;
188        /// }
189        /// ```
190        pub total_park_count: u64,
191
192        /// The maximum number of times any worker thread parked.
193        ///
194        /// ##### Definition
195        /// This metric is derived from the maximum of
196        /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
197        ///
198        /// ##### See also
199        /// - [`RuntimeMetrics::total_park_count`]
200        /// - [`RuntimeMetrics::min_park_count`]
201        pub max_park_count: u64,
202
203        /// The minimum number of times any worker thread parked.
204        ///
205        /// ##### Definition
206        /// This metric is derived from the maximum of
207        /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
208        ///
209        /// ##### See also
210        /// - [`RuntimeMetrics::total_park_count`]
211        /// - [`RuntimeMetrics::max_park_count`]
212        pub min_park_count: u64,
213
214        /// The amount of time worker threads were busy.
215        ///
216        /// The worker busy duration increases whenever the worker is spending time processing work.
217        /// Using this value can indicate the total load of workers.
218        ///
219        /// ##### Definition
220        /// This metric is derived from the sum of
221        /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
222        ///
223        /// ##### See also
224        /// - [`RuntimeMetrics::min_busy_duration`]
225        /// - [`RuntimeMetrics::max_busy_duration`]
226        ///
227        /// ##### Examples
228        /// In the below example, tasks spend a total of 3s busy:
229        /// ```
230        /// use tokio::time::Duration;
231        ///
232        /// fn main() {
233        ///     let start = tokio::time::Instant::now();
234        ///
235        ///     let rt = tokio::runtime::Builder::new_current_thread()
236        ///         .enable_all()
237        ///         .build()
238        ///         .unwrap();
239        ///
240        ///     let handle = rt.handle();
241        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
242        ///     let mut intervals = monitor.intervals();
243        ///     let mut next_interval = || intervals.next().unwrap();
244        ///
245        ///     let delay_1s = Duration::from_secs(1);
246        ///     let delay_3s = Duration::from_secs(3);
247        ///
248        ///     rt.block_on(async {
249        ///         // keep the main task busy for 1s
250        ///         spin_for(delay_1s);
251        ///
252        ///         // spawn a task and keep it busy for 2s
253        ///         let _ = tokio::spawn(async move {
254        ///             spin_for(delay_3s);
255        ///         }).await;
256        ///     });
257        ///
258        ///     // flush metrics
259        ///     drop(rt);
260        ///
261        ///     let elapsed = start.elapsed();
262        ///
263        ///     let interval =  next_interval(); // end of interval 2
264        ///     assert!(interval.total_busy_duration >= delay_1s + delay_3s);
265        ///     assert!(interval.total_busy_duration <= elapsed);
266        /// }
267        ///
268        /// fn time<F>(task: F) -> Duration
269        /// where
270        ///     F: Fn() -> ()
271        /// {
272        ///     let start = tokio::time::Instant::now();
273        ///     task();
274        ///     start.elapsed()
275        /// }
276        ///
277        /// /// Block the current thread for a given `duration`.
278        /// fn spin_for(duration: Duration) {
279        ///     let start = tokio::time::Instant::now();
280        ///     while start.elapsed() <= duration {}
281        /// }
282        /// ```
283        ///
284        /// Busy times may not accumulate as the above example suggests (FIXME: Why?); e.g., if we
285        /// remove the three second delay, the time spent busy falls to mere microseconds:
286        /// ```should_panic
287        /// use tokio::time::Duration;
288        ///
289        /// fn main() {
290        ///     let rt = tokio::runtime::Builder::new_current_thread()
291        ///         .enable_all()
292        ///         .build()
293        ///         .unwrap();
294        ///
295        ///     let handle = rt.handle();
296        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
297        ///     let mut intervals = monitor.intervals();
298        ///     let mut next_interval = || intervals.next().unwrap();
299        ///
300        ///     let delay_1s = Duration::from_secs(1);
301        ///
302        ///     let elapsed = time(|| rt.block_on(async {
303        ///         // keep the main task busy for 1s
304        ///         spin_for(delay_1s);
305        ///     }));
306        ///
307        ///     // flush metrics
308        ///     drop(rt);
309        ///
310        ///     let interval =  next_interval(); // end of interval 2
311        ///     assert!(interval.total_busy_duration >= delay_1s); // FAIL
312        ///     assert!(interval.total_busy_duration <= elapsed);
313        /// }
314        ///
315        /// fn time<F>(task: F) -> Duration
316        /// where
317        ///     F: Fn() -> ()
318        /// {
319        ///     let start = tokio::time::Instant::now();
320        ///     task();
321        ///     start.elapsed()
322        /// }
323        ///
324        /// /// Block the current thread for a given `duration`.
325        /// fn spin_for(duration: Duration) {
326        ///     let start = tokio::time::Instant::now();
327        ///     while start.elapsed() <= duration {}
328        /// }
329        /// ```
330        pub total_busy_duration: Duration,
331
332        /// The maximum amount of time a worker thread was busy.
333        ///
334        /// ##### Definition
335        /// This metric is derived from the maximum of
336        /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
337        ///
338        /// ##### See also
339        /// - [`RuntimeMetrics::total_busy_duration`]
340        /// - [`RuntimeMetrics::min_busy_duration`]
341        pub max_busy_duration: Duration,
342
343        /// The minimum amount of time a worker thread was busy.
344        ///
345        /// ##### Definition
346        /// This metric is derived from the minimum of
347        /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
348        ///
349        /// ##### See also
350        /// - [`RuntimeMetrics::total_busy_duration`]
351        /// - [`RuntimeMetrics::max_busy_duration`]
352        pub min_busy_duration: Duration,
353
354        /// The number of tasks currently scheduled in the runtime's global queue.
355        ///
356        /// Tasks that are spawned or notified from a non-runtime thread are scheduled using the
357        /// runtime's global queue. This metric returns the **current** number of tasks pending in
358        /// the global queue. As such, the returned value may increase or decrease as new tasks are
359        /// scheduled and processed.
360        ///
361        /// ##### Definition
362        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::global_queue_depth`].
363        ///
364        /// ##### Example
365        /// ```
366        /// # let current_thread = tokio::runtime::Builder::new_current_thread()
367        /// #     .enable_all()
368        /// #     .build()
369        /// #     .unwrap();
370        /// #
371        /// # let multi_thread = tokio::runtime::Builder::new_multi_thread()
372        /// #     .worker_threads(2)
373        /// #     .enable_all()
374        /// #     .build()
375        /// #     .unwrap();
376        /// #
377        /// # for runtime in [current_thread, multi_thread] {
378        /// let handle = runtime.handle().clone();
379        /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
380        /// let mut intervals = monitor.intervals();
381        /// let mut next_interval = || intervals.next().unwrap();
382        ///
383        /// let interval = next_interval(); // end of interval 1
384        /// # #[cfg(tokio_unstable)]
385        /// assert_eq!(interval.num_remote_schedules, 0);
386        ///
387        /// // spawn a system thread outside of the runtime
388        /// std::thread::spawn(move || {
389        ///     // spawn two tasks from this non-runtime thread
390        ///     handle.spawn(async {});
391        ///     handle.spawn(async {});
392        /// }).join().unwrap();
393        ///
394        /// // flush metrics
395        /// drop(runtime);
396        ///
397        /// let interval = next_interval(); // end of interval 2
398        /// # #[cfg(tokio_unstable)]
399        /// assert_eq!(interval.num_remote_schedules, 2);
400        /// # }
401        /// ```
402        pub global_queue_depth: usize,
403
404        /// Total amount of time elapsed since observing runtime metrics.
405        pub elapsed: Duration,
406    }
407    unstable {
408        /// The average duration of a single invocation of poll on a task.
409        ///
410        /// This average is an exponentially-weighted moving average of the duration
411        /// of task polls on all runtime workers.
412        ///
413        /// ##### Examples
414        /// ```
415        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
416        /// async fn main() {
417        ///     let handle = tokio::runtime::Handle::current();
418        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
419        ///     let mut intervals = monitor.intervals();
420        ///     let mut next_interval = || intervals.next().unwrap();
421        ///
422        ///     let interval = next_interval();
423        ///     println!("mean task poll duration is {:?}", interval.mean_poll_duration);
424        /// }
425        /// ```
426        pub mean_poll_duration: Duration,
427
428        /// The average duration of a single invocation of poll on a task on the
429        /// worker with the lowest value.
430        ///
431        /// This average is an exponentially-weighted moving average of the duration
432        /// of task polls on the runtime worker with the lowest value.
433        ///
434        /// ##### Examples
435        /// ```
436        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
437        /// async fn main() {
438        ///     let handle = tokio::runtime::Handle::current();
439        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
440        ///     let mut intervals = monitor.intervals();
441        ///     let mut next_interval = || intervals.next().unwrap();
442        ///
443        ///     let interval = next_interval();
444        ///     println!("min mean task poll duration is {:?}", interval.mean_poll_duration_worker_min);
445        /// }
446        /// ```
447        pub mean_poll_duration_worker_min: Duration,
448
449        /// The average duration of a single invocation of poll on a task on the
450        /// worker with the highest value.
451        ///
452        /// This average is an exponentially-weighted moving average of the duration
453        /// of task polls on the runtime worker with the highest value.
454        ///
455        /// ##### Examples
456        /// ```
457        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
458        /// async fn main() {
459        ///     let handle = tokio::runtime::Handle::current();
460        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
461        ///     let mut intervals = monitor.intervals();
462        ///     let mut next_interval = || intervals.next().unwrap();
463        ///
464        ///     let interval = next_interval();
465        ///     println!("max mean task poll duration is {:?}", interval.mean_poll_duration_worker_max);
466        /// }
467        /// ```
468        pub mean_poll_duration_worker_max: Duration,
469
470        /// A histogram of task polls since the previous probe grouped by poll
471        /// times.
472        ///
473        /// This metric must be explicitly enabled when creating the runtime with
474        /// [`enable_metrics_poll_time_histogram`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
475        /// Bucket sizes are fixed and configured at the runtime level. See
476        /// configuration options on
477        /// [`runtime::Builder`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
478        ///
479        /// ##### Examples
480        /// ```
481        /// use tokio::runtime::HistogramConfiguration;
482        /// use std::time::Duration;
483        ///
484        /// let config = HistogramConfiguration::linear(Duration::from_micros(50), 12);
485        ///
486        /// let rt = tokio::runtime::Builder::new_multi_thread()
487        ///     .enable_metrics_poll_time_histogram()
488        ///     .metrics_poll_time_histogram_configuration(config)
489        ///     .build()
490        ///     .unwrap();
491        ///
492        /// rt.block_on(async {
493        ///     let handle = tokio::runtime::Handle::current();
494        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
495        ///     let mut intervals = monitor.intervals();
496        ///     let mut next_interval = || intervals.next().unwrap();
497        ///
498        ///     let interval = next_interval();
499        ///     println!("poll count histogram {:?}", interval.poll_time_histogram);
500        /// });
501        /// ```
502        pub poll_time_histogram: Vec<u64>,
503
504        /// The number of times worker threads unparked but performed no work before parking again.
505        ///
506        /// The worker no-op count increases by one each time the worker unparks the thread but finds
507        /// no new work and goes back to sleep. This indicates a false-positive wake up.
508        ///
509        /// ##### Definition
510        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_noop_count`]
511        /// across all worker threads.
512        ///
513        /// ##### Examples
514        /// Unfortunately, there isn't a great way to reliably induce no-op parks, as they occur as
515        /// false-positive events under concurrency.
516        ///
517        /// The below example triggers fewer than two parks in the single-threaded runtime:
518        /// ```
519        /// #[tokio::main(flavor = "current_thread")]
520        /// async fn main() {
521        ///     let handle = tokio::runtime::Handle::current();
522        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
523        ///     let mut intervals = monitor.intervals();
524        ///     let mut next_interval = || intervals.next().unwrap();
525        ///
526        ///     assert_eq!(next_interval().total_park_count, 0);
527        ///
528        ///     async {
529        ///         tokio::time::sleep(std::time::Duration::from_millis(1)).await;
530        ///     }.await;
531        ///
532        ///     assert!(next_interval().total_park_count > 0);
533        /// }
534        /// ```
535        ///
536        /// The below example triggers fewer than two parks in the multi-threaded runtime:
537        /// ```
538        /// #[tokio::main(flavor = "multi_thread")]
539        /// async fn main() {
540        ///     let handle = tokio::runtime::Handle::current();
541        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
542        ///     let mut intervals = monitor.intervals();
543        ///     let mut next_interval = || intervals.next().unwrap();
544        ///
545        ///     assert_eq!(next_interval().total_noop_count, 0);
546        ///
547        ///     async {
548        ///         tokio::time::sleep(std::time::Duration::from_millis(1)).await;
549        ///     }.await;
550        ///
551        ///     assert!(next_interval().total_noop_count > 0);
552        /// }
553        /// ```
554        pub total_noop_count: u64,
555
556        /// The maximum number of times any worker thread unparked but performed no work before parking
557        /// again.
558        ///
559        /// ##### Definition
560        /// This metric is derived from the maximum of
561        /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
562        ///
563        /// ##### See also
564        /// - [`RuntimeMetrics::total_noop_count`]
565        /// - [`RuntimeMetrics::min_noop_count`]
566        pub max_noop_count: u64,
567
568        /// The minimum number of times any worker thread unparked but performed no work before parking
569        /// again.
570        ///
571        /// ##### Definition
572        /// This metric is derived from the minimum of
573        /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
574        ///
575        /// ##### See also
576        /// - [`RuntimeMetrics::total_noop_count`]
577        /// - [`RuntimeMetrics::max_noop_count`]
578        pub min_noop_count: u64,
579
580        /// The number of tasks worker threads stole from another worker thread.
581        ///
582        /// The worker steal count increases by the amount of stolen tasks each time the worker
583        /// has processed its scheduled queue and successfully steals more pending tasks from another
584        /// worker.
585        ///
586        /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
587        /// using the current thread runtime.
588        ///
589        /// ##### Definition
590        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] for
591        /// all worker threads.
592        ///
593        /// ##### See also
594        /// - [`RuntimeMetrics::min_steal_count`]
595        /// - [`RuntimeMetrics::max_steal_count`]
596        ///
597        /// ##### Examples
598        /// In the below example, a blocking channel is used to backup one worker thread:
599        /// ```
600        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
601        /// async fn main() {
602        ///     let handle = tokio::runtime::Handle::current();
603        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
604        ///     let mut intervals = monitor.intervals();
605        ///     let mut next_interval = || intervals.next().unwrap();
606        ///
607        ///     let interval = next_interval(); // end of first sampling interval
608        ///     assert_eq!(interval.total_steal_count, 0);
609        ///     assert_eq!(interval.min_steal_count, 0);
610        ///     assert_eq!(interval.max_steal_count, 0);
611        ///
612        ///     // induce a steal
613        ///     async {
614        ///         let (tx, rx) = std::sync::mpsc::channel();
615        ///         // Move to the runtime.
616        ///         tokio::spawn(async move {
617        ///             // Spawn the task that sends to the channel
618        ///             tokio::spawn(async move {
619        ///                 tx.send(()).unwrap();
620        ///             });
621        ///             // Spawn a task that bumps the previous task out of the "next
622        ///             // scheduled" slot.
623        ///             tokio::spawn(async {});
624        ///             // Blocking receive on the channel.
625        ///             rx.recv().unwrap();
626        ///             flush_metrics().await;
627        ///         }).await.unwrap();
628        ///         flush_metrics().await;
629        ///     }.await;
630        ///
631        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
632        ///     println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
633        ///
634        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 3
635        ///     println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
636        /// }
637        ///
638        /// async fn flush_metrics() {
639        ///     let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
640        /// }
641        /// ```
642        pub total_steal_count: u64,
643
644        /// The maximum number of tasks any worker thread stole from another worker thread.
645        ///
646        /// ##### Definition
647        /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
648        /// across all worker threads.
649        ///
650        /// ##### See also
651        /// - [`RuntimeMetrics::total_steal_count`]
652        /// - [`RuntimeMetrics::min_steal_count`]
653        pub max_steal_count: u64,
654
655        /// The minimum number of tasks any worker thread stole from another worker thread.
656        ///
657        /// ##### Definition
658        /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
659        /// across all worker threads.
660        ///
661        /// ##### See also
662        /// - [`RuntimeMetrics::total_steal_count`]
663        /// - [`RuntimeMetrics::max_steal_count`]
664        pub min_steal_count: u64,
665
666        /// The number of times worker threads stole tasks from another worker thread.
667        ///
668        /// The worker steal operations increases by one each time the worker has processed its
669        /// scheduled queue and successfully steals more pending tasks from another worker.
670        ///
671        /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
672        /// using the current thread runtime.
673        ///
674        /// ##### Definition
675        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
676        /// for all worker threads.
677        ///
678        /// ##### See also
679        /// - [`RuntimeMetrics::min_steal_operations`]
680        /// - [`RuntimeMetrics::max_steal_operations`]
681        ///
682        /// ##### Examples
683        /// In the below example, a blocking channel is used to backup one worker thread:
684        /// ```
685        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
686        /// async fn main() {
687        ///     let handle = tokio::runtime::Handle::current();
688        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
689        ///     let mut intervals = monitor.intervals();
690        ///     let mut next_interval = || intervals.next().unwrap();
691        ///
692        ///     let interval = next_interval(); // end of first sampling interval
693        ///     assert_eq!(interval.total_steal_operations, 0);
694        ///     assert_eq!(interval.min_steal_operations, 0);
695        ///     assert_eq!(interval.max_steal_operations, 0);
696        ///
697        ///     // induce a steal
698        ///     async {
699        ///         let (tx, rx) = std::sync::mpsc::channel();
700        ///         // Move to the runtime.
701        ///         tokio::spawn(async move {
702        ///             // Spawn the task that sends to the channel
703        ///             tokio::spawn(async move {
704        ///                 tx.send(()).unwrap();
705        ///             });
706        ///             // Spawn a task that bumps the previous task out of the "next
707        ///             // scheduled" slot.
708        ///             tokio::spawn(async {});
709        ///             // Blocking receive on the channe.
710        ///             rx.recv().unwrap();
711        ///             flush_metrics().await;
712        ///         }).await.unwrap();
713        ///         flush_metrics().await;
714        ///     }.await;
715        ///
716        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
717        ///     println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
718        ///
719        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 3
720        ///     println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
721        /// }
722        ///
723        /// async fn flush_metrics() {
724        ///     let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
725        /// }
726        /// ```
727        pub total_steal_operations: u64,
728
729        /// The maximum number of times any worker thread stole tasks from another worker thread.
730        ///
731        /// ##### Definition
732        /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
733        /// across all worker threads.
734        ///
735        /// ##### See also
736        /// - [`RuntimeMetrics::total_steal_operations`]
737        /// - [`RuntimeMetrics::min_steal_operations`]
738        pub max_steal_operations: u64,
739
740        /// The minimum number of times any worker thread stole tasks from another worker thread.
741        ///
742        /// ##### Definition
743        /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
744        /// across all worker threads.
745        ///
746        /// ##### See also
747        /// - [`RuntimeMetrics::total_steal_operations`]
748        /// - [`RuntimeMetrics::max_steal_operations`]
749        pub min_steal_operations: u64,
750
751        /// The number of tasks scheduled from **outside** of the runtime.
752        ///
753        /// The remote schedule count increases by one each time a task is woken from **outside** of
754        /// the runtime. This usually means that a task is spawned or notified from a non-runtime
755        /// thread and must be queued using the Runtime's global queue, which tends to be slower.
756        ///
757        /// ##### Definition
758        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::remote_schedule_count`].
759        ///
760        /// ##### Examples
761        /// In the below example, a remote schedule is induced by spawning a system thread, then
762        /// spawning a tokio task from that system thread:
763        /// ```
764        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
765        /// async fn main() {
766        ///     let handle = tokio::runtime::Handle::current();
767        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
768        ///     let mut intervals = monitor.intervals();
769        ///     let mut next_interval = || intervals.next().unwrap();
770        ///
771        ///     let interval = next_interval(); // end of first sampling interval
772        ///     assert_eq!(interval.num_remote_schedules, 0);
773        ///
774        ///     // spawn a non-runtime thread
775        ///     std::thread::spawn(move || {
776        ///         // spawn two tasks from this non-runtime thread
777        ///         async move {
778        ///             handle.spawn(async {}).await;
779        ///             handle.spawn(async {}).await;
780        ///         }
781        ///     }).join().unwrap().await;
782        ///
783        ///     let interval = next_interval(); // end of second sampling interval
784        ///     assert_eq!(interval.num_remote_schedules, 2);
785        ///
786        ///     let interval = next_interval(); // end of third sampling interval
787        ///     assert_eq!(interval.num_remote_schedules, 0);
788        /// }
789        /// ```
790        pub num_remote_schedules: u64,
791
792        /// The number of tasks scheduled from worker threads.
793        ///
794        /// The local schedule count increases by one each time a task is woken from **inside** of the
795        /// runtime. This usually means that a task is spawned or notified from within a runtime thread
796        /// and will be queued on the worker-local queue.
797        ///
798        /// ##### Definition
799        /// This metric is derived from the sum of
800        /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] across all worker threads.
801        ///
802        /// ##### See also
803        /// - [`RuntimeMetrics::min_local_schedule_count`]
804        /// - [`RuntimeMetrics::max_local_schedule_count`]
805        ///
806        /// ##### Examples
807        /// ###### With `current_thread` runtime
808        /// In the below example, two tasks are spawned from the context of a third tokio task:
809        /// ```
810        /// #[tokio::main(flavor = "current_thread")]
811        /// async fn main() {
812        ///     let handle = tokio::runtime::Handle::current();
813        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
814        ///     let mut intervals = monitor.intervals();
815        ///     let mut next_interval = || intervals.next().unwrap();
816        ///
817        ///     let interval = { flush_metrics().await; next_interval() }; // end interval 2
818        ///     assert_eq!(interval.total_local_schedule_count, 0);
819        ///
820        ///     let task = async {
821        ///         tokio::spawn(async {}); // local schedule 1
822        ///         tokio::spawn(async {}); // local schedule 2
823        ///     };
824        ///
825        ///     let handle = tokio::spawn(task); // local schedule 3
826        ///
827        ///     let interval = { flush_metrics().await; next_interval() }; // end interval 2
828        ///     assert_eq!(interval.total_local_schedule_count, 3);
829        ///
830        ///     let _ = handle.await;
831        ///
832        ///     let interval = { flush_metrics().await; next_interval() }; // end interval 3
833        ///     assert_eq!(interval.total_local_schedule_count, 0);
834        /// }
835        ///
836        /// async fn flush_metrics() {
837        ///     tokio::task::yield_now().await;
838        /// }
839        /// ```
840        ///
841        /// ###### With `multi_thread` runtime
842        /// In the below example, 100 tasks are spawned:
843        /// ```
844        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
845        /// async fn main() {
846        ///     let handle = tokio::runtime::Handle::current();
847        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
848        ///     let mut intervals = monitor.intervals();
849        ///     let mut next_interval = || intervals.next().unwrap();
850        ///
851        ///     let interval = next_interval(); // end of interval 1
852        ///     assert_eq!(interval.total_local_schedule_count, 0);
853        ///
854        ///     use std::sync::atomic::{AtomicBool, Ordering};
855        ///     static SPINLOCK: AtomicBool = AtomicBool::new(true);
856        ///
857        ///     // block the other worker thread
858        ///     tokio::spawn(async {
859        ///         while SPINLOCK.load(Ordering::SeqCst) {}
860        ///     });
861        ///
862        ///     // FIXME: why does this need to be in a `spawn`?
863        ///     let _ = tokio::spawn(async {
864        ///         // spawn 100 tasks
865        ///         for _ in 0..100 {
866        ///             tokio::spawn(async {});
867        ///         }
868        ///         // this spawns 1 more task
869        ///         flush_metrics().await;
870        ///     }).await;
871        ///
872        ///     // unblock the other worker thread
873        ///     SPINLOCK.store(false, Ordering::SeqCst);
874        ///
875        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
876        ///     assert_eq!(interval.total_local_schedule_count, 100 + 1);
877        /// }
878        ///
879        /// async fn flush_metrics() {
880        ///     let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
881        /// }
882        /// ```
883        pub total_local_schedule_count: u64,
884
885        /// The maximum number of tasks scheduled from any one worker thread.
886        ///
887        /// ##### Definition
888        /// This metric is derived from the maximum of
889        /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
890        ///
891        /// ##### See also
892        /// - [`RuntimeMetrics::total_local_schedule_count`]
893        /// - [`RuntimeMetrics::min_local_schedule_count`]
894        pub max_local_schedule_count: u64,
895
896        /// The minimum number of tasks scheduled from any one worker thread.
897        ///
898        /// ##### Definition
899        /// This metric is derived from the minimum of
900        /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
901        ///
902        /// ##### See also
903        /// - [`RuntimeMetrics::total_local_schedule_count`]
904        /// - [`RuntimeMetrics::max_local_schedule_count`]
905        pub min_local_schedule_count: u64,
906
907        /// The number of times worker threads saturated their local queues.
908        ///
909        /// The worker steal count increases by one each time the worker attempts to schedule a task
910        /// locally, but its local queue is full. When this happens, half of the
911        /// local queue is moved to the global queue.
912        ///
913        /// This metric only applies to the **multi-threaded** scheduler.
914        ///
915        /// ##### Definition
916        /// This metric is derived from the sum of
917        /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
918        ///
919        /// ##### See also
920        /// - [`RuntimeMetrics::min_overflow_count`]
921        /// - [`RuntimeMetrics::max_overflow_count`]
922        ///
923        /// ##### Examples
924        /// ```
925        /// #[tokio::main(flavor = "multi_thread", worker_threads = 1)]
926        /// async fn main() {
927        ///     let handle = tokio::runtime::Handle::current();
928        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
929        ///     let mut intervals = monitor.intervals();
930        ///     let mut next_interval = || intervals.next().unwrap();
931        ///
932        ///     let interval = next_interval(); // end of interval 1
933        ///     assert_eq!(interval.total_overflow_count, 0);
934        ///
935        ///     use std::sync::atomic::{AtomicBool, Ordering};
936        ///
937        ///     // spawn a ton of tasks
938        ///     let _ = tokio::spawn(async {
939        ///         // we do this in a `tokio::spawn` because it is impossible to
940        ///         // overflow the main task
941        ///         for _ in 0..300 {
942        ///             tokio::spawn(async {});
943        ///         }
944        ///     }).await;
945        ///
946        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
947        ///     assert_eq!(interval.total_overflow_count, 1);
948        /// }
949        ///
950        /// async fn flush_metrics() {
951        ///     let _ = tokio::time::sleep(std::time::Duration::from_millis(1)).await;
952        /// }
953        /// ```
954        pub total_overflow_count: u64,
955
956        /// The maximum number of times any one worker saturated its local queue.
957        ///
958        /// ##### Definition
959        /// This metric is derived from the maximum of
960        /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
961        ///
962        /// ##### See also
963        /// - [`RuntimeMetrics::total_overflow_count`]
964        /// - [`RuntimeMetrics::min_overflow_count`]
965        pub max_overflow_count: u64,
966
967        /// The minimum number of times any one worker saturated its local queue.
968        ///
969        /// ##### Definition
970        /// This metric is derived from the maximum of
971        /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
972        ///
973        /// ##### See also
974        /// - [`RuntimeMetrics::total_overflow_count`]
975        /// - [`RuntimeMetrics::max_overflow_count`]
976        pub min_overflow_count: u64,
977
978        /// The number of tasks that have been polled across all worker threads.
979        ///
980        /// The worker poll count increases by one each time a worker polls a scheduled task.
981        ///
982        /// ##### Definition
983        /// This metric is derived from the sum of
984        /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
985        ///
986        /// ##### See also
987        /// - [`RuntimeMetrics::min_polls_count`]
988        /// - [`RuntimeMetrics::max_polls_count`]
989        ///
990        /// ##### Examples
991        /// In the below example, 42 tasks are spawned and polled:
992        /// ```
993        /// #[tokio::main(flavor = "current_thread")]
994        /// async fn main() {
995        ///     let handle = tokio::runtime::Handle::current();
996        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
997        ///     let mut intervals = monitor.intervals();
998        ///     let mut next_interval = || intervals.next().unwrap();
999        ///
1000        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 1
1001        ///     assert_eq!(interval.total_polls_count, 0);
1002        ///     assert_eq!(interval.min_polls_count, 0);
1003        ///     assert_eq!(interval.max_polls_count, 0);
1004        ///
1005        ///     const N: u64 = 42;
1006        ///
1007        ///     for _ in 0..N {
1008        ///         let _ = tokio::spawn(async {}).await;
1009        ///     }
1010        ///
1011        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
1012        ///     assert_eq!(interval.total_polls_count, N);
1013        ///     assert_eq!(interval.min_polls_count, N);
1014        ///     assert_eq!(interval.max_polls_count, N);
1015        /// }
1016        ///
1017        /// async fn flush_metrics() {
1018        ///     let _ = tokio::task::yield_now().await;
1019        /// }
1020        /// ```
1021        pub total_polls_count: u64,
1022
1023        /// The maximum number of tasks that have been polled in any worker thread.
1024        ///
1025        /// ##### Definition
1026        /// This metric is derived from the maximum of
1027        /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1028        ///
1029        /// ##### See also
1030        /// - [`RuntimeMetrics::total_polls_count`]
1031        /// - [`RuntimeMetrics::min_polls_count`]
1032        pub max_polls_count: u64,
1033
1034        /// The minimum number of tasks that have been polled in any worker thread.
1035        ///
1036        /// ##### Definition
1037        /// This metric is derived from the minimum of
1038        /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1039        ///
1040        /// ##### See also
1041        /// - [`RuntimeMetrics::total_polls_count`]
1042        /// - [`RuntimeMetrics::max_polls_count`]
1043        pub min_polls_count: u64,
1044
1045        /// The total number of tasks currently scheduled in workers' local queues.
1046        ///
1047        /// Tasks that are spawned or notified from within a runtime thread are scheduled using that
1048        /// worker's local queue. This metric returns the **current** number of tasks pending in all
1049        /// workers' local queues. As such, the returned value may increase or decrease as new tasks
1050        /// are scheduled and processed.
1051        ///
1052        /// ##### Definition
1053        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`].
1054        ///
1055        /// ##### See also
1056        /// - [`RuntimeMetrics::min_local_queue_depth`]
1057        /// - [`RuntimeMetrics::max_local_queue_depth`]
1058        ///
1059        /// ##### Example
1060        ///
1061        /// ###### With `current_thread` runtime
1062        /// The below example spawns 100 tasks:
1063        /// ```
1064        /// #[tokio::main(flavor = "current_thread")]
1065        /// async fn main() {
1066        ///     const N: usize = 100;
1067        ///
1068        ///     let handle = tokio::runtime::Handle::current();
1069        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1070        ///     let mut intervals = monitor.intervals();
1071        ///     let mut next_interval = || intervals.next().unwrap();
1072        ///
1073        ///     let interval =  next_interval(); // end of interval 1
1074        ///     assert_eq!(interval.total_local_queue_depth, 0);
1075        ///
1076        ///
1077        ///     for _ in 0..N {
1078        ///         tokio::spawn(async {});
1079        ///     }
1080        ///     let interval =  next_interval(); // end of interval 2
1081        ///     assert_eq!(interval.total_local_queue_depth, N);
1082        /// }
1083        /// ```
1084        ///
1085        /// ###### With `multi_thread` runtime
1086        /// The below example spawns 100 tasks:
1087        /// ```
1088        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
1089        /// async fn main() {
1090        ///     const N: usize = 100;
1091        ///
1092        ///     let handle = tokio::runtime::Handle::current();
1093        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1094        ///     let mut intervals = monitor.intervals();
1095        ///     let mut next_interval = || intervals.next().unwrap();
1096        ///
1097        ///     let interval =  next_interval(); // end of interval 1
1098        ///     assert_eq!(interval.total_local_queue_depth, 0);
1099        ///
1100        ///     use std::sync::atomic::{AtomicBool, Ordering};
1101        ///     static SPINLOCK_A: AtomicBool = AtomicBool::new(true);
1102        ///
1103        ///     // block the other worker thread
1104        ///     tokio::spawn(async {
1105        ///         while SPINLOCK_A.load(Ordering::SeqCst) {}
1106        ///     });
1107        ///
1108        ///     static SPINLOCK_B: AtomicBool = AtomicBool::new(true);
1109        ///
1110        ///     let _ = tokio::spawn(async {
1111        ///         for _ in 0..N {
1112        ///             tokio::spawn(async {
1113        ///                 while SPINLOCK_B.load(Ordering::SeqCst) {}
1114        ///             });
1115        ///         }
1116        ///     }).await;
1117        ///
1118        ///     // unblock the other worker thread
1119        ///     SPINLOCK_A.store(false, Ordering::SeqCst);
1120        ///
1121        ///     let interval =  next_interval(); // end of interval 2
1122        ///     assert_eq!(interval.total_local_queue_depth, N - 1);
1123        ///
1124        ///     SPINLOCK_B.store(false, Ordering::SeqCst);
1125        /// }
1126        /// ```
1127        pub total_local_queue_depth: usize,
1128
1129        /// The maximum number of tasks currently scheduled any worker's local queue.
1130        ///
1131        /// ##### Definition
1132        /// This metric is derived from the maximum of
1133        /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1134        ///
1135        /// ##### See also
1136        /// - [`RuntimeMetrics::total_local_queue_depth`]
1137        /// - [`RuntimeMetrics::min_local_queue_depth`]
1138        pub max_local_queue_depth: usize,
1139
1140        /// The minimum number of tasks currently scheduled any worker's local queue.
1141        ///
1142        /// ##### Definition
1143        /// This metric is derived from the minimum of
1144        /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1145        ///
1146        /// ##### See also
1147        /// - [`RuntimeMetrics::total_local_queue_depth`]
1148        /// - [`RuntimeMetrics::max_local_queue_depth`]
1149        pub min_local_queue_depth: usize,
1150
1151        /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool.
1152        ///
1153        /// ##### Definition
1154        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::blocking_queue_depth`].
1155        pub blocking_queue_depth: usize,
1156
1157        /// The current number of alive tasks in the runtime.
1158        ///
1159        /// ##### Definition
1160        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_alive_tasks`].
1161        pub live_tasks_count: usize,
1162
1163        /// The number of additional threads spawned by the runtime.
1164        ///
1165        /// ##### Definition
1166        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_blocking_threads`].
1167        pub blocking_threads_count: usize,
1168
1169        /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls.
1170        ///
1171        /// ##### Definition
1172        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_idle_blocking_threads`].
1173        pub idle_blocking_threads_count: usize,
1174
1175        /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets.
1176        ///
1177        /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget.
1178        ///
1179        /// The counter is monotonically increasing. It is never decremented or reset to zero.
1180        ///
1181        /// ##### Definition
1182        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::budget_forced_yield_count`].
1183        pub budget_forced_yield_count: u64,
1184
1185        /// Returns the number of ready events processed by the runtime’s I/O driver.
1186        ///
1187        /// ##### Definition
1188        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::io_driver_ready_count`].
1189        pub io_driver_ready_count: u64,
1190    }
1191}
1192
1193macro_rules! define_semi_stable {
1194    (
1195    $(#[$($attributes:tt)*])*
1196    $vis:vis struct $name:ident {
1197        stable {
1198            $($stable_name:ident: $stable_ty:ty),*
1199            $(,)?
1200        }
1201        $(,)?
1202        unstable {
1203            $($unstable_name:ident: $unstable_ty:ty),*
1204            $(,)?
1205        }
1206    }
1207    ) => {
1208        $(#[$($attributes)*])*
1209        $vis struct $name {
1210            $(
1211                $stable_name: $stable_ty,
1212            )*
1213            $(
1214                #[cfg(tokio_unstable)]
1215                #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
1216                $unstable_name: $unstable_ty,
1217            )*
1218        }
1219    };
1220}
1221
1222define_semi_stable! {
1223    /// Snapshot of per-worker metrics
1224    #[derive(Debug, Default)]
1225    struct Worker {
1226        stable {
1227            worker: usize,
1228            total_park_count: u64,
1229            total_busy_duration: Duration,
1230        }
1231        unstable {
1232            total_noop_count: u64,
1233            total_steal_count: u64,
1234            total_steal_operations: u64,
1235            total_local_schedule_count: u64,
1236            total_overflow_count: u64,
1237            total_polls_count: u64,
1238            poll_time_histogram: Vec<u64>,
1239        }
1240    }
1241}
1242
1243define_semi_stable! {
1244    /// Iterator returned by [`RuntimeMonitor::intervals`].
1245    ///
1246    /// See that method's documentation for more details.
1247    #[derive(Debug)]
1248    pub struct RuntimeIntervals {
1249        stable {
1250            runtime: runtime::RuntimeMetrics,
1251            started_at: Instant,
1252            workers: Vec<Worker>,
1253        }
1254        unstable {
1255            // Number of tasks scheduled from *outside* of the runtime
1256            num_remote_schedules: u64,
1257            budget_forced_yield_count: u64,
1258            io_driver_ready_count: u64,
1259        }
1260    }
1261}
1262
1263impl RuntimeIntervals {
1264    fn probe(&mut self) -> RuntimeMetrics {
1265        let now = Instant::now();
1266
1267        let mut metrics = RuntimeMetrics {
1268            workers_count: self.runtime.num_workers(),
1269            elapsed: now - self.started_at,
1270            global_queue_depth: self.runtime.global_queue_depth(),
1271            min_park_count: u64::MAX,
1272            min_busy_duration: Duration::from_secs(1000000000),
1273            ..Default::default()
1274        };
1275
1276        #[cfg(tokio_unstable)]
1277        {
1278            let num_remote_schedules = self.runtime.remote_schedule_count();
1279            let budget_forced_yields = self.runtime.budget_forced_yield_count();
1280            let io_driver_ready_events = self.runtime.io_driver_ready_count();
1281
1282            metrics.num_remote_schedules = num_remote_schedules - self.num_remote_schedules;
1283            metrics.min_noop_count = u64::MAX;
1284            metrics.min_steal_count = u64::MAX;
1285            metrics.min_local_schedule_count = u64::MAX;
1286            metrics.min_overflow_count = u64::MAX;
1287            metrics.min_polls_count = u64::MAX;
1288            metrics.min_local_queue_depth = usize::MAX;
1289            metrics.mean_poll_duration_worker_min = Duration::MAX;
1290            metrics.poll_time_histogram = vec![0; self.runtime.poll_time_histogram_num_buckets()];
1291            metrics.budget_forced_yield_count =
1292                budget_forced_yields - self.budget_forced_yield_count;
1293            metrics.io_driver_ready_count = io_driver_ready_events - self.io_driver_ready_count;
1294
1295            self.num_remote_schedules = num_remote_schedules;
1296            self.budget_forced_yield_count = budget_forced_yields;
1297            self.io_driver_ready_count = io_driver_ready_events;
1298        }
1299        self.started_at = now;
1300
1301        for worker in &mut self.workers {
1302            worker.probe(&self.runtime, &mut metrics);
1303        }
1304
1305        #[cfg(tokio_unstable)]
1306        {
1307            if metrics.total_polls_count == 0 {
1308                debug_assert_eq!(metrics.mean_poll_duration, Duration::default());
1309
1310                metrics.mean_poll_duration_worker_max = Duration::default();
1311                metrics.mean_poll_duration_worker_min = Duration::default();
1312            }
1313        }
1314
1315        metrics
1316    }
1317}
1318
1319impl Iterator for RuntimeIntervals {
1320    type Item = RuntimeMetrics;
1321
1322    fn next(&mut self) -> Option<RuntimeMetrics> {
1323        Some(self.probe())
1324    }
1325}
1326
1327impl RuntimeMonitor {
1328    /// Creates a new [`RuntimeMonitor`].
1329    pub fn new(runtime: &runtime::Handle) -> RuntimeMonitor {
1330        let runtime = runtime.metrics();
1331
1332        RuntimeMonitor { runtime }
1333    }
1334
1335    /// Produces an unending iterator of [`RuntimeMetrics`].
1336    ///
1337    /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1338    /// produced by [`RuntimeMonitor::intervals`]. The item type of this iterator is [`RuntimeMetrics`],
1339    /// which is a bundle of runtime metrics that describe *only* changes occurring within that sampling
1340    /// interval.
1341    ///
1342    /// # Example
1343    ///
1344    /// ```
1345    /// use std::time::Duration;
1346    ///
1347    /// #[tokio::main]
1348    /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1349    ///     let handle = tokio::runtime::Handle::current();
1350    ///     // construct the runtime metrics monitor
1351    ///     let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1352    ///
1353    ///     // print runtime metrics every 500ms
1354    ///     {
1355    ///         tokio::spawn(async move {
1356    ///             for interval in runtime_monitor.intervals() {
1357    ///                 // pretty-print the metric interval
1358    ///                 println!("{:?}", interval);
1359    ///                 // wait 500ms
1360    ///                 tokio::time::sleep(Duration::from_millis(500)).await;
1361    ///             }
1362    ///         });
1363    ///     }
1364    ///
1365    ///     // await some tasks
1366    ///     tokio::join![
1367    ///         do_work(),
1368    ///         do_work(),
1369    ///         do_work(),
1370    ///     ];
1371    ///
1372    ///     Ok(())
1373    /// }
1374    ///
1375    /// async fn do_work() {
1376    ///     for _ in 0..25 {
1377    ///         tokio::task::yield_now().await;
1378    ///         tokio::time::sleep(Duration::from_millis(100)).await;
1379    ///     }
1380    /// }
1381    /// ```
1382    pub fn intervals(&self) -> RuntimeIntervals {
1383        let started_at = Instant::now();
1384
1385        let workers = (0..self.runtime.num_workers())
1386            .map(|worker| Worker::new(worker, &self.runtime))
1387            .collect();
1388
1389        RuntimeIntervals {
1390            runtime: self.runtime.clone(),
1391            started_at,
1392            workers,
1393
1394            #[cfg(tokio_unstable)]
1395            num_remote_schedules: self.runtime.remote_schedule_count(),
1396            #[cfg(tokio_unstable)]
1397            budget_forced_yield_count: self.runtime.budget_forced_yield_count(),
1398            #[cfg(tokio_unstable)]
1399            io_driver_ready_count: self.runtime.io_driver_ready_count(),
1400        }
1401    }
1402}
1403
1404impl Worker {
1405    fn new(worker: usize, rt: &runtime::RuntimeMetrics) -> Worker {
1406        #[allow(unused_mut, clippy::needless_update)]
1407        let mut wrk = Worker {
1408            worker,
1409            total_park_count: rt.worker_park_count(worker),
1410            total_busy_duration: rt.worker_total_busy_duration(worker),
1411            ..Default::default()
1412        };
1413
1414        #[cfg(tokio_unstable)]
1415        {
1416            let poll_time_histogram = if rt.poll_time_histogram_enabled() {
1417                vec![0; rt.poll_time_histogram_num_buckets()]
1418            } else {
1419                vec![]
1420            };
1421            wrk.total_noop_count = rt.worker_noop_count(worker);
1422            wrk.total_steal_count = rt.worker_steal_count(worker);
1423            wrk.total_steal_operations = rt.worker_steal_operations(worker);
1424            wrk.total_local_schedule_count = rt.worker_local_schedule_count(worker);
1425            wrk.total_overflow_count = rt.worker_overflow_count(worker);
1426            wrk.total_polls_count = rt.worker_poll_count(worker);
1427            wrk.poll_time_histogram = poll_time_histogram;
1428        };
1429        wrk
1430    }
1431
1432    fn probe(&mut self, rt: &runtime::RuntimeMetrics, metrics: &mut RuntimeMetrics) {
1433        macro_rules! metric {
1434            ( $sum:ident, $max:ident, $min:ident, $probe:ident ) => {{
1435                let val = rt.$probe(self.worker);
1436                let delta = val - self.$sum;
1437                self.$sum = val;
1438
1439                metrics.$sum += delta;
1440
1441                if delta > metrics.$max {
1442                    metrics.$max = delta;
1443                }
1444
1445                if delta < metrics.$min {
1446                    metrics.$min = delta;
1447                }
1448            }};
1449        }
1450
1451        metric!(
1452            total_park_count,
1453            max_park_count,
1454            min_park_count,
1455            worker_park_count
1456        );
1457        metric!(
1458            total_busy_duration,
1459            max_busy_duration,
1460            min_busy_duration,
1461            worker_total_busy_duration
1462        );
1463
1464        #[cfg(tokio_unstable)]
1465        {
1466            let mut worker_polls_count = self.total_polls_count;
1467            let total_polls_count = metrics.total_polls_count;
1468
1469            metric!(
1470                total_noop_count,
1471                max_noop_count,
1472                min_noop_count,
1473                worker_noop_count
1474            );
1475            metric!(
1476                total_steal_count,
1477                max_steal_count,
1478                min_steal_count,
1479                worker_steal_count
1480            );
1481            metric!(
1482                total_steal_operations,
1483                max_steal_operations,
1484                min_steal_operations,
1485                worker_steal_operations
1486            );
1487            metric!(
1488                total_local_schedule_count,
1489                max_local_schedule_count,
1490                min_local_schedule_count,
1491                worker_local_schedule_count
1492            );
1493            metric!(
1494                total_overflow_count,
1495                max_overflow_count,
1496                min_overflow_count,
1497                worker_overflow_count
1498            );
1499            metric!(
1500                total_polls_count,
1501                max_polls_count,
1502                min_polls_count,
1503                worker_poll_count
1504            );
1505
1506            // Get the number of polls since last probe
1507            worker_polls_count = self.total_polls_count - worker_polls_count;
1508
1509            // Update the mean task poll duration if there were polls
1510            if worker_polls_count > 0 {
1511                let val = rt.worker_mean_poll_time(self.worker);
1512
1513                if val > metrics.mean_poll_duration_worker_max {
1514                    metrics.mean_poll_duration_worker_max = val;
1515                }
1516
1517                if val < metrics.mean_poll_duration_worker_min {
1518                    metrics.mean_poll_duration_worker_min = val;
1519                }
1520
1521                // First, scale the current value down
1522                let ratio = total_polls_count as f64 / metrics.total_polls_count as f64;
1523                let mut mean = metrics.mean_poll_duration.as_nanos() as f64 * ratio;
1524
1525                // Add the scaled current worker's mean poll duration
1526                let ratio = worker_polls_count as f64 / metrics.total_polls_count as f64;
1527                mean += val.as_nanos() as f64 * ratio;
1528
1529                metrics.mean_poll_duration = Duration::from_nanos(mean as u64);
1530            }
1531
1532            // Update the histogram counts if there were polls since last count
1533            if worker_polls_count > 0 {
1534                for (bucket, cell) in metrics.poll_time_histogram.iter_mut().enumerate() {
1535                    let new = rt.poll_time_histogram_bucket_count(self.worker, bucket);
1536                    let delta = new - self.poll_time_histogram[bucket];
1537                    self.poll_time_histogram[bucket] = new;
1538
1539                    *cell += delta;
1540                }
1541            }
1542
1543            // Local scheduled tasks is an absolute value
1544            let local_scheduled_tasks = rt.worker_local_queue_depth(self.worker);
1545            metrics.total_local_queue_depth += local_scheduled_tasks;
1546
1547            if local_scheduled_tasks > metrics.max_local_queue_depth {
1548                metrics.max_local_queue_depth = local_scheduled_tasks;
1549            }
1550
1551            if local_scheduled_tasks < metrics.min_local_queue_depth {
1552                metrics.min_local_queue_depth = local_scheduled_tasks;
1553            }
1554
1555            // Blocking queue depth is an absolute value too
1556            metrics.blocking_queue_depth = rt.blocking_queue_depth();
1557
1558            #[allow(deprecated)]
1559            {
1560                // use the deprecated active_tasks_count here to support slightly older versions of Tokio,
1561                // it's the same.
1562                metrics.live_tasks_count = rt.active_tasks_count();
1563            }
1564            metrics.blocking_threads_count = rt.num_blocking_threads();
1565            metrics.idle_blocking_threads_count = rt.num_idle_blocking_threads();
1566        }
1567    }
1568}
1569
1570impl RuntimeMetrics {
1571    /// Returns the ratio of the [`RuntimeMetrics::total_polls_count`] to the [`RuntimeMetrics::total_noop_count`].
1572    #[cfg(tokio_unstable)]
1573    pub fn mean_polls_per_park(&self) -> f64 {
1574        let total_park_count = self.total_park_count - self.total_noop_count;
1575        if total_park_count == 0 {
1576            0.0
1577        } else {
1578            self.total_polls_count as f64 / total_park_count as f64
1579        }
1580    }
1581
1582    /// Returns the ratio of the [`RuntimeMetrics::total_busy_duration`] to the [`RuntimeMetrics::elapsed`].
1583    pub fn busy_ratio(&self) -> f64 {
1584        self.total_busy_duration.as_nanos() as f64 / self.elapsed.as_nanos() as f64
1585    }
1586}