Skip to main content

tokio_metrics/
runtime.rs

1use std::time::{Duration, Instant};
2use tokio::runtime;
3use crate::derived_metrics::derived_metrics;
4
5#[cfg(feature = "metrics-rs-integration")]
6pub(crate) mod metrics_rs_integration;
7
8/// Monitors key metrics of the tokio runtime.
9///
10/// ### Usage
11/// ```
12/// use std::time::Duration;
13/// use tokio_metrics::RuntimeMonitor;
14///
15/// #[tokio::main]
16/// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
17///     let handle = tokio::runtime::Handle::current();
18///
19///     // print runtime metrics every 500ms
20///     {
21///         let runtime_monitor = RuntimeMonitor::new(&handle);
22///         tokio::spawn(async move {
23///             for interval in runtime_monitor.intervals() {
24///                 // pretty-print the metric interval
25///                 println!("{:?}", interval);
26///                 // wait 500ms
27///                 tokio::time::sleep(Duration::from_millis(500)).await;
28///             }
29///         });
30///     }
31///
32///     // await some tasks
33///     tokio::join![
34///         do_work(),
35///         do_work(),
36///         do_work(),
37///     ];
38///
39///     Ok(())
40/// }
41///
42/// async fn do_work() {
43///     for _ in 0..25 {
44///         tokio::task::yield_now().await;
45///         tokio::time::sleep(Duration::from_millis(100)).await;
46///     }
47/// }
48/// ```
49#[derive(Debug)]
50pub struct RuntimeMonitor {
51    /// Handle to the runtime
52    runtime: runtime::RuntimeMetrics,
53}
54
55macro_rules! define_runtime_metrics {
56    (
57    stable {
58        $(
59            $(#[$($attributes:tt)*])*
60            $vis:vis $name:ident: $ty:ty
61        ),*
62        $(,)?
63    }
64    unstable {
65        $(
66            $(#[$($unstable_attributes:tt)*])*
67            $unstable_vis:vis $unstable_name:ident: $unstable_ty:ty
68        ),*
69        $(,)?
70    }
71    ) => {
72        /// Key runtime metrics.
73        #[non_exhaustive]
74        #[derive(Default, Debug, Clone)]
75        pub struct RuntimeMetrics {
76            $(
77                $(#[$($attributes)*])*
78                #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
79                $vis $name: $ty,
80            )*
81            $(
82                $(#[$($unstable_attributes)*])*
83                #[cfg(tokio_unstable)]
84                #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
85                $unstable_vis $unstable_name: $unstable_ty,
86            )*
87        }
88    };
89}
90
91define_runtime_metrics! {
92    stable {
93        /// The number of worker threads used by the runtime.
94        ///
95        /// This metric is static for a runtime.
96        ///
97        /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`].
98        /// When using the `current_thread` runtime, the return value is always `1`.
99        ///
100        /// The number of workers is set by configuring
101        /// [`worker_threads`][`tokio::runtime::Builder::worker_threads`] with
102        /// [`tokio::runtime::Builder`], or by parameterizing [`tokio::main`].
103        ///
104        /// ##### Examples
105        /// In the below example, the number of workers is set by parameterizing [`tokio::main`]:
106        /// ```
107        /// use tokio::runtime::Handle;
108        ///
109        /// #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
110        /// async fn main() {
111        ///     let handle = tokio::runtime::Handle::current();
112        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
113        ///     let mut intervals = monitor.intervals();
114        ///     let mut next_interval = || intervals.next().unwrap();
115        ///
116        ///     assert_eq!(next_interval().workers_count, 10);
117        /// }
118        /// ```
119        ///
120        /// [`tokio::main`]: https://docs.rs/tokio/latest/tokio/attr.main.html
121        ///
122        /// When using the `current_thread` runtime, the return value is always `1`; e.g.:
123        /// ```
124        /// use tokio::runtime::Handle;
125        ///
126        /// #[tokio::main(flavor = "current_thread")]
127        /// async fn main() {
128        ///     let handle = tokio::runtime::Handle::current();
129        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
130        ///     let mut intervals = monitor.intervals();
131        ///     let mut next_interval = || intervals.next().unwrap();
132        ///
133        ///     assert_eq!(next_interval().workers_count, 1);
134        /// }
135        /// ```
136        ///
137        /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`]; e.g.:
138        /// ```
139        /// use tokio::runtime::Handle;
140        ///
141        /// #[tokio::main]
142        /// async fn main() {
143        ///     let handle = Handle::current();
144        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
145        ///     let mut intervals = monitor.intervals();
146        ///     let mut next_interval = || intervals.next().unwrap();
147        ///
148        ///     assert_eq!(next_interval().workers_count, handle.metrics().num_workers());
149        /// }
150        /// ```
151        pub workers_count: usize,
152
153        /// The current number of alive tasks in the runtime.
154        ///
155        /// ##### Definition
156        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_alive_tasks`].
157        pub live_tasks_count: usize,
158
159        /// The number of times worker threads parked.
160        ///
161        /// The worker park count increases by one each time the worker parks the thread waiting for
162        /// new inbound events to process. This usually means the worker has processed all pending work
163        /// and is currently idle.
164        ///
165        /// ##### Definition
166        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_park_count`]
167        /// across all worker threads.
168        ///
169        /// ##### See also
170        /// - [`RuntimeMetrics::max_park_count`]
171        /// - [`RuntimeMetrics::min_park_count`]
172        ///
173        /// ##### Examples
174        /// ```
175        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
176        /// async fn main() {
177        ///     let handle = tokio::runtime::Handle::current();
178        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
179        ///     let mut intervals = monitor.intervals();
180        ///     let mut next_interval = || intervals.next().unwrap();
181        ///
182        ///     let interval = next_interval(); // end of interval 1
183        ///     assert_eq!(interval.total_park_count, 0);
184        ///
185        ///     induce_parks().await;
186        ///
187        ///     let interval = next_interval(); // end of interval 2
188        ///     assert!(interval.total_park_count >= 1); // usually 1 or 2 parks
189        /// }
190        ///
191        /// async fn induce_parks() {
192        ///     let _ = tokio::time::timeout(std::time::Duration::ZERO, async {
193        ///         loop { tokio::task::yield_now().await; }
194        ///     }).await;
195        /// }
196        /// ```
197        pub total_park_count: u64,
198
199        /// The maximum number of times any worker thread parked.
200        ///
201        /// ##### Definition
202        /// This metric is derived from the maximum of
203        /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
204        ///
205        /// ##### See also
206        /// - [`RuntimeMetrics::total_park_count`]
207        /// - [`RuntimeMetrics::min_park_count`]
208        pub max_park_count: u64,
209
210        /// The minimum number of times any worker thread parked.
211        ///
212        /// ##### Definition
213        /// This metric is derived from the maximum of
214        /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
215        ///
216        /// ##### See also
217        /// - [`RuntimeMetrics::total_park_count`]
218        /// - [`RuntimeMetrics::max_park_count`]
219        pub min_park_count: u64,
220
221        /// The amount of time worker threads were busy.
222        ///
223        /// The worker busy duration increases whenever the worker is spending time processing work.
224        /// Using this value can indicate the total load of workers.
225        ///
226        /// ##### Definition
227        /// This metric is derived from the sum of
228        /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
229        ///
230        /// ##### See also
231        /// - [`RuntimeMetrics::min_busy_duration`]
232        /// - [`RuntimeMetrics::max_busy_duration`]
233        ///
234        /// ##### Examples
235        /// In the below example, tasks spend a total of 3s busy:
236        /// ```
237        /// use tokio::time::Duration;
238        ///
239        /// fn main() {
240        ///     let start = tokio::time::Instant::now();
241        ///
242        ///     let rt = tokio::runtime::Builder::new_current_thread()
243        ///         .enable_all()
244        ///         .build()
245        ///         .unwrap();
246        ///
247        ///     let handle = rt.handle();
248        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
249        ///     let mut intervals = monitor.intervals();
250        ///     let mut next_interval = || intervals.next().unwrap();
251        ///
252        ///     let delay_1s = Duration::from_secs(1);
253        ///     let delay_3s = Duration::from_secs(3);
254        ///
255        ///     rt.block_on(async {
256        ///         // keep the main task busy for 1s
257        ///         spin_for(delay_1s);
258        ///
259        ///         // spawn a task and keep it busy for 2s
260        ///         let _ = tokio::spawn(async move {
261        ///             spin_for(delay_3s);
262        ///         }).await;
263        ///     });
264        ///
265        ///     // flush metrics
266        ///     drop(rt);
267        ///
268        ///     let elapsed = start.elapsed();
269        ///
270        ///     let interval =  next_interval(); // end of interval 2
271        ///     assert!(interval.total_busy_duration >= delay_1s + delay_3s);
272        ///     assert!(interval.total_busy_duration <= elapsed);
273        /// }
274        ///
275        /// fn time<F>(task: F) -> Duration
276        /// where
277        ///     F: Fn() -> ()
278        /// {
279        ///     let start = tokio::time::Instant::now();
280        ///     task();
281        ///     start.elapsed()
282        /// }
283        ///
284        /// /// Block the current thread for a given `duration`.
285        /// fn spin_for(duration: Duration) {
286        ///     let start = tokio::time::Instant::now();
287        ///     while start.elapsed() <= duration {}
288        /// }
289        /// ```
290        ///
291        /// Busy times may not accumulate as the above example suggests (FIXME: Why?); e.g., if we
292        /// remove the three second delay, the time spent busy falls to mere microseconds:
293        /// ```should_panic
294        /// use tokio::time::Duration;
295        ///
296        /// fn main() {
297        ///     let rt = tokio::runtime::Builder::new_current_thread()
298        ///         .enable_all()
299        ///         .build()
300        ///         .unwrap();
301        ///
302        ///     let handle = rt.handle();
303        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
304        ///     let mut intervals = monitor.intervals();
305        ///     let mut next_interval = || intervals.next().unwrap();
306        ///
307        ///     let delay_1s = Duration::from_secs(1);
308        ///
309        ///     let elapsed = time(|| rt.block_on(async {
310        ///         // keep the main task busy for 1s
311        ///         spin_for(delay_1s);
312        ///     }));
313        ///
314        ///     // flush metrics
315        ///     drop(rt);
316        ///
317        ///     let interval =  next_interval(); // end of interval 2
318        ///     assert!(interval.total_busy_duration >= delay_1s); // FAIL
319        ///     assert!(interval.total_busy_duration <= elapsed);
320        /// }
321        ///
322        /// fn time<F>(task: F) -> Duration
323        /// where
324        ///     F: Fn() -> ()
325        /// {
326        ///     let start = tokio::time::Instant::now();
327        ///     task();
328        ///     start.elapsed()
329        /// }
330        ///
331        /// /// Block the current thread for a given `duration`.
332        /// fn spin_for(duration: Duration) {
333        ///     let start = tokio::time::Instant::now();
334        ///     while start.elapsed() <= duration {}
335        /// }
336        /// ```
337        pub total_busy_duration: Duration,
338
339        /// The maximum amount of time a worker thread was busy.
340        ///
341        /// ##### Definition
342        /// This metric is derived from the maximum of
343        /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
344        ///
345        /// ##### See also
346        /// - [`RuntimeMetrics::total_busy_duration`]
347        /// - [`RuntimeMetrics::min_busy_duration`]
348        pub max_busy_duration: Duration,
349
350        /// The minimum amount of time a worker thread was busy.
351        ///
352        /// ##### Definition
353        /// This metric is derived from the minimum of
354        /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
355        ///
356        /// ##### See also
357        /// - [`RuntimeMetrics::total_busy_duration`]
358        /// - [`RuntimeMetrics::max_busy_duration`]
359        pub min_busy_duration: Duration,
360
361        /// The number of tasks currently scheduled in the runtime's global queue.
362        ///
363        /// Tasks that are spawned or notified from a non-runtime thread are scheduled using the
364        /// runtime's global queue. This metric returns the **current** number of tasks pending in
365        /// the global queue. As such, the returned value may increase or decrease as new tasks are
366        /// scheduled and processed.
367        ///
368        /// ##### Definition
369        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::global_queue_depth`].
370        ///
371        /// ##### Example
372        /// ```
373        /// # let current_thread = tokio::runtime::Builder::new_current_thread()
374        /// #     .enable_all()
375        /// #     .build()
376        /// #     .unwrap();
377        /// #
378        /// # let multi_thread = tokio::runtime::Builder::new_multi_thread()
379        /// #     .worker_threads(2)
380        /// #     .enable_all()
381        /// #     .build()
382        /// #     .unwrap();
383        /// #
384        /// # for runtime in [current_thread, multi_thread] {
385        /// let handle = runtime.handle().clone();
386        /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
387        /// let mut intervals = monitor.intervals();
388        /// let mut next_interval = || intervals.next().unwrap();
389        ///
390        /// let interval = next_interval(); // end of interval 1
391        /// # #[cfg(tokio_unstable)]
392        /// assert_eq!(interval.num_remote_schedules, 0);
393        ///
394        /// // spawn a system thread outside of the runtime
395        /// std::thread::spawn(move || {
396        ///     // spawn two tasks from this non-runtime thread
397        ///     handle.spawn(async {});
398        ///     handle.spawn(async {});
399        /// }).join().unwrap();
400        ///
401        /// // flush metrics
402        /// drop(runtime);
403        ///
404        /// let interval = next_interval(); // end of interval 2
405        /// # #[cfg(tokio_unstable)]
406        /// assert_eq!(interval.num_remote_schedules, 2);
407        /// # }
408        /// ```
409        pub global_queue_depth: usize,
410
411        /// Total amount of time elapsed since observing runtime metrics.
412        pub elapsed: Duration,
413    }
414    unstable {
415        /// The average duration of a single invocation of poll on a task.
416        ///
417        /// This average is an exponentially-weighted moving average of the duration
418        /// of task polls on all runtime workers.
419        ///
420        /// ##### Examples
421        /// ```
422        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
423        /// async fn main() {
424        ///     let handle = tokio::runtime::Handle::current();
425        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
426        ///     let mut intervals = monitor.intervals();
427        ///     let mut next_interval = || intervals.next().unwrap();
428        ///
429        ///     let interval = next_interval();
430        ///     println!("mean task poll duration is {:?}", interval.mean_poll_duration);
431        /// }
432        /// ```
433        pub mean_poll_duration: Duration,
434
435        /// The average duration of a single invocation of poll on a task on the
436        /// worker with the lowest value.
437        ///
438        /// This average is an exponentially-weighted moving average of the duration
439        /// of task polls on the runtime worker with the lowest value.
440        ///
441        /// ##### Examples
442        /// ```
443        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
444        /// async fn main() {
445        ///     let handle = tokio::runtime::Handle::current();
446        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
447        ///     let mut intervals = monitor.intervals();
448        ///     let mut next_interval = || intervals.next().unwrap();
449        ///
450        ///     let interval = next_interval();
451        ///     println!("min mean task poll duration is {:?}", interval.mean_poll_duration_worker_min);
452        /// }
453        /// ```
454        pub mean_poll_duration_worker_min: Duration,
455
456        /// The average duration of a single invocation of poll on a task on the
457        /// worker with the highest value.
458        ///
459        /// This average is an exponentially-weighted moving average of the duration
460        /// of task polls on the runtime worker with the highest value.
461        ///
462        /// ##### Examples
463        /// ```
464        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
465        /// async fn main() {
466        ///     let handle = tokio::runtime::Handle::current();
467        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
468        ///     let mut intervals = monitor.intervals();
469        ///     let mut next_interval = || intervals.next().unwrap();
470        ///
471        ///     let interval = next_interval();
472        ///     println!("max mean task poll duration is {:?}", interval.mean_poll_duration_worker_max);
473        /// }
474        /// ```
475        pub mean_poll_duration_worker_max: Duration,
476
477        /// A histogram of task polls since the previous probe grouped by poll
478        /// times.
479        ///
480        /// This metric must be explicitly enabled when creating the runtime with
481        /// [`enable_metrics_poll_time_histogram`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
482        /// Bucket sizes are fixed and configured at the runtime level. See
483        /// configuration options on
484        /// [`runtime::Builder`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
485        ///
486        /// ##### Examples
487        /// ```
488        /// use tokio::runtime::HistogramConfiguration;
489        /// use std::time::Duration;
490        ///
491        /// let config = HistogramConfiguration::linear(Duration::from_micros(50), 12);
492        ///
493        /// let rt = tokio::runtime::Builder::new_multi_thread()
494        ///     .enable_metrics_poll_time_histogram()
495        ///     .metrics_poll_time_histogram_configuration(config)
496        ///     .build()
497        ///     .unwrap();
498        ///
499        /// rt.block_on(async {
500        ///     let handle = tokio::runtime::Handle::current();
501        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
502        ///     let mut intervals = monitor.intervals();
503        ///     let mut next_interval = || intervals.next().unwrap();
504        ///
505        ///     let interval = next_interval();
506        ///     println!("poll count histogram {:?}", interval.poll_time_histogram);
507        /// });
508        /// ```
509        pub poll_time_histogram: Vec<u64>,
510
511        /// The number of times worker threads unparked but performed no work before parking again.
512        ///
513        /// The worker no-op count increases by one each time the worker unparks the thread but finds
514        /// no new work and goes back to sleep. This indicates a false-positive wake up.
515        ///
516        /// ##### Definition
517        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_noop_count`]
518        /// across all worker threads.
519        ///
520        /// ##### Examples
521        /// Unfortunately, there isn't a great way to reliably induce no-op parks, as they occur as
522        /// false-positive events under concurrency.
523        ///
524        /// The below example triggers fewer than two parks in the single-threaded runtime:
525        /// ```
526        /// #[tokio::main(flavor = "current_thread")]
527        /// async fn main() {
528        ///     let handle = tokio::runtime::Handle::current();
529        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
530        ///     let mut intervals = monitor.intervals();
531        ///     let mut next_interval = || intervals.next().unwrap();
532        ///
533        ///     assert_eq!(next_interval().total_park_count, 0);
534        ///
535        ///     async {
536        ///         tokio::time::sleep(std::time::Duration::from_millis(1)).await;
537        ///     }.await;
538        ///
539        ///     assert!(next_interval().total_park_count > 0);
540        /// }
541        /// ```
542        ///
543        /// The below example triggers fewer than two parks in the multi-threaded runtime:
544        /// ```
545        /// #[tokio::main(flavor = "multi_thread")]
546        /// async fn main() {
547        ///     let handle = tokio::runtime::Handle::current();
548        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
549        ///     let mut intervals = monitor.intervals();
550        ///     let mut next_interval = || intervals.next().unwrap();
551        ///
552        ///     assert_eq!(next_interval().total_noop_count, 0);
553        ///
554        ///     async {
555        ///         tokio::time::sleep(std::time::Duration::from_millis(1)).await;
556        ///     }.await;
557        ///
558        ///     assert!(next_interval().total_noop_count > 0);
559        /// }
560        /// ```
561        pub total_noop_count: u64,
562
563        /// The maximum number of times any worker thread unparked but performed no work before parking
564        /// again.
565        ///
566        /// ##### Definition
567        /// This metric is derived from the maximum of
568        /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
569        ///
570        /// ##### See also
571        /// - [`RuntimeMetrics::total_noop_count`]
572        /// - [`RuntimeMetrics::min_noop_count`]
573        pub max_noop_count: u64,
574
575        /// The minimum number of times any worker thread unparked but performed no work before parking
576        /// again.
577        ///
578        /// ##### Definition
579        /// This metric is derived from the minimum of
580        /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
581        ///
582        /// ##### See also
583        /// - [`RuntimeMetrics::total_noop_count`]
584        /// - [`RuntimeMetrics::max_noop_count`]
585        pub min_noop_count: u64,
586
587        /// The number of tasks worker threads stole from another worker thread.
588        ///
589        /// The worker steal count increases by the amount of stolen tasks each time the worker
590        /// has processed its scheduled queue and successfully steals more pending tasks from another
591        /// worker.
592        ///
593        /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
594        /// using the current thread runtime.
595        ///
596        /// ##### Definition
597        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] for
598        /// all worker threads.
599        ///
600        /// ##### See also
601        /// - [`RuntimeMetrics::min_steal_count`]
602        /// - [`RuntimeMetrics::max_steal_count`]
603        ///
604        /// ##### Examples
605        /// In the below example, a blocking channel is used to backup one worker thread:
606        /// ```
607        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
608        /// async fn main() {
609        ///     let handle = tokio::runtime::Handle::current();
610        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
611        ///     let mut intervals = monitor.intervals();
612        ///     let mut next_interval = || intervals.next().unwrap();
613        ///
614        ///     let interval = next_interval(); // end of first sampling interval
615        ///     assert_eq!(interval.total_steal_count, 0);
616        ///     assert_eq!(interval.min_steal_count, 0);
617        ///     assert_eq!(interval.max_steal_count, 0);
618        ///
619        ///     // induce a steal
620        ///     async {
621        ///         let (tx, rx) = std::sync::mpsc::channel();
622        ///         // Move to the runtime.
623        ///         tokio::spawn(async move {
624        ///             // Spawn the task that sends to the channel
625        ///             tokio::spawn(async move {
626        ///                 tx.send(()).unwrap();
627        ///             });
628        ///             // Spawn a task that bumps the previous task out of the "next
629        ///             // scheduled" slot.
630        ///             tokio::spawn(async {});
631        ///             // Blocking receive on the channel.
632        ///             rx.recv().unwrap();
633        ///             flush_metrics().await;
634        ///         }).await.unwrap();
635        ///         flush_metrics().await;
636        ///     }.await;
637        ///
638        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
639        ///     println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
640        ///
641        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 3
642        ///     println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
643        /// }
644        ///
645        /// async fn flush_metrics() {
646        ///     let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
647        /// }
648        /// ```
649        pub total_steal_count: u64,
650
651        /// The maximum number of tasks any worker thread stole from another worker thread.
652        ///
653        /// ##### Definition
654        /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
655        /// across all worker threads.
656        ///
657        /// ##### See also
658        /// - [`RuntimeMetrics::total_steal_count`]
659        /// - [`RuntimeMetrics::min_steal_count`]
660        pub max_steal_count: u64,
661
662        /// The minimum number of tasks any worker thread stole from another worker thread.
663        ///
664        /// ##### Definition
665        /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
666        /// across all worker threads.
667        ///
668        /// ##### See also
669        /// - [`RuntimeMetrics::total_steal_count`]
670        /// - [`RuntimeMetrics::max_steal_count`]
671        pub min_steal_count: u64,
672
673        /// The number of times worker threads stole tasks from another worker thread.
674        ///
675        /// The worker steal operations increases by one each time the worker has processed its
676        /// scheduled queue and successfully steals more pending tasks from another worker.
677        ///
678        /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
679        /// using the current thread runtime.
680        ///
681        /// ##### Definition
682        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
683        /// for all worker threads.
684        ///
685        /// ##### See also
686        /// - [`RuntimeMetrics::min_steal_operations`]
687        /// - [`RuntimeMetrics::max_steal_operations`]
688        ///
689        /// ##### Examples
690        /// In the below example, a blocking channel is used to backup one worker thread:
691        /// ```
692        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
693        /// async fn main() {
694        ///     let handle = tokio::runtime::Handle::current();
695        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
696        ///     let mut intervals = monitor.intervals();
697        ///     let mut next_interval = || intervals.next().unwrap();
698        ///
699        ///     let interval = next_interval(); // end of first sampling interval
700        ///     assert_eq!(interval.total_steal_operations, 0);
701        ///     assert_eq!(interval.min_steal_operations, 0);
702        ///     assert_eq!(interval.max_steal_operations, 0);
703        ///
704        ///     // induce a steal
705        ///     async {
706        ///         let (tx, rx) = std::sync::mpsc::channel();
707        ///         // Move to the runtime.
708        ///         tokio::spawn(async move {
709        ///             // Spawn the task that sends to the channel
710        ///             tokio::spawn(async move {
711        ///                 tx.send(()).unwrap();
712        ///             });
713        ///             // Spawn a task that bumps the previous task out of the "next
714        ///             // scheduled" slot.
715        ///             tokio::spawn(async {});
716        ///             // Blocking receive on the channe.
717        ///             rx.recv().unwrap();
718        ///             flush_metrics().await;
719        ///         }).await.unwrap();
720        ///         flush_metrics().await;
721        ///     }.await;
722        ///
723        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
724        ///     println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
725        ///
726        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 3
727        ///     println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
728        /// }
729        ///
730        /// async fn flush_metrics() {
731        ///     let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
732        /// }
733        /// ```
734        pub total_steal_operations: u64,
735
736        /// The maximum number of times any worker thread stole tasks from another worker thread.
737        ///
738        /// ##### Definition
739        /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
740        /// across all worker threads.
741        ///
742        /// ##### See also
743        /// - [`RuntimeMetrics::total_steal_operations`]
744        /// - [`RuntimeMetrics::min_steal_operations`]
745        pub max_steal_operations: u64,
746
747        /// The minimum number of times any worker thread stole tasks from another worker thread.
748        ///
749        /// ##### Definition
750        /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
751        /// across all worker threads.
752        ///
753        /// ##### See also
754        /// - [`RuntimeMetrics::total_steal_operations`]
755        /// - [`RuntimeMetrics::max_steal_operations`]
756        pub min_steal_operations: u64,
757
758        /// The number of tasks scheduled from **outside** of the runtime.
759        ///
760        /// The remote schedule count increases by one each time a task is woken from **outside** of
761        /// the runtime. This usually means that a task is spawned or notified from a non-runtime
762        /// thread and must be queued using the Runtime's global queue, which tends to be slower.
763        ///
764        /// ##### Definition
765        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::remote_schedule_count`].
766        ///
767        /// ##### Examples
768        /// In the below example, a remote schedule is induced by spawning a system thread, then
769        /// spawning a tokio task from that system thread:
770        /// ```
771        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
772        /// async fn main() {
773        ///     let handle = tokio::runtime::Handle::current();
774        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
775        ///     let mut intervals = monitor.intervals();
776        ///     let mut next_interval = || intervals.next().unwrap();
777        ///
778        ///     let interval = next_interval(); // end of first sampling interval
779        ///     assert_eq!(interval.num_remote_schedules, 0);
780        ///
781        ///     // spawn a non-runtime thread
782        ///     std::thread::spawn(move || {
783        ///         // spawn two tasks from this non-runtime thread
784        ///         async move {
785        ///             handle.spawn(async {}).await;
786        ///             handle.spawn(async {}).await;
787        ///         }
788        ///     }).join().unwrap().await;
789        ///
790        ///     let interval = next_interval(); // end of second sampling interval
791        ///     assert_eq!(interval.num_remote_schedules, 2);
792        ///
793        ///     let interval = next_interval(); // end of third sampling interval
794        ///     assert_eq!(interval.num_remote_schedules, 0);
795        /// }
796        /// ```
797        pub num_remote_schedules: u64,
798
799        /// The number of tasks scheduled from worker threads.
800        ///
801        /// The local schedule count increases by one each time a task is woken from **inside** of the
802        /// runtime. This usually means that a task is spawned or notified from within a runtime thread
803        /// and will be queued on the worker-local queue.
804        ///
805        /// ##### Definition
806        /// This metric is derived from the sum of
807        /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] across all worker threads.
808        ///
809        /// ##### See also
810        /// - [`RuntimeMetrics::min_local_schedule_count`]
811        /// - [`RuntimeMetrics::max_local_schedule_count`]
812        ///
813        /// ##### Examples
814        /// ###### With `current_thread` runtime
815        /// In the below example, two tasks are spawned from the context of a third tokio task:
816        /// ```
817        /// #[tokio::main(flavor = "current_thread")]
818        /// async fn main() {
819        ///     let handle = tokio::runtime::Handle::current();
820        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
821        ///     let mut intervals = monitor.intervals();
822        ///     let mut next_interval = || intervals.next().unwrap();
823        ///
824        ///     let interval = { flush_metrics().await; next_interval() }; // end interval 2
825        ///     assert_eq!(interval.total_local_schedule_count, 0);
826        ///
827        ///     let task = async {
828        ///         tokio::spawn(async {}); // local schedule 1
829        ///         tokio::spawn(async {}); // local schedule 2
830        ///     };
831        ///
832        ///     let handle = tokio::spawn(task); // local schedule 3
833        ///
834        ///     let interval = { flush_metrics().await; next_interval() }; // end interval 2
835        ///     assert_eq!(interval.total_local_schedule_count, 3);
836        ///
837        ///     let _ = handle.await;
838        ///
839        ///     let interval = { flush_metrics().await; next_interval() }; // end interval 3
840        ///     assert_eq!(interval.total_local_schedule_count, 0);
841        /// }
842        ///
843        /// async fn flush_metrics() {
844        ///     tokio::task::yield_now().await;
845        /// }
846        /// ```
847        ///
848        /// ###### With `multi_thread` runtime
849        /// In the below example, 100 tasks are spawned:
850        /// ```
851        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
852        /// async fn main() {
853        ///     let handle = tokio::runtime::Handle::current();
854        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
855        ///     let mut intervals = monitor.intervals();
856        ///     let mut next_interval = || intervals.next().unwrap();
857        ///
858        ///     let interval = next_interval(); // end of interval 1
859        ///     assert_eq!(interval.total_local_schedule_count, 0);
860        ///
861        ///     use std::sync::atomic::{AtomicBool, Ordering};
862        ///     static SPINLOCK: AtomicBool = AtomicBool::new(true);
863        ///
864        ///     // block the other worker thread
865        ///     tokio::spawn(async {
866        ///         while SPINLOCK.load(Ordering::SeqCst) {}
867        ///     });
868        ///
869        ///     // FIXME: why does this need to be in a `spawn`?
870        ///     let _ = tokio::spawn(async {
871        ///         // spawn 100 tasks
872        ///         for _ in 0..100 {
873        ///             tokio::spawn(async {});
874        ///         }
875        ///         // this spawns 1 more task
876        ///         flush_metrics().await;
877        ///     }).await;
878        ///
879        ///     // unblock the other worker thread
880        ///     SPINLOCK.store(false, Ordering::SeqCst);
881        ///
882        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
883        ///     assert_eq!(interval.total_local_schedule_count, 100 + 1);
884        /// }
885        ///
886        /// async fn flush_metrics() {
887        ///     let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
888        /// }
889        /// ```
890        pub total_local_schedule_count: u64,
891
892        /// The maximum number of tasks scheduled from any one worker thread.
893        ///
894        /// ##### Definition
895        /// This metric is derived from the maximum of
896        /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
897        ///
898        /// ##### See also
899        /// - [`RuntimeMetrics::total_local_schedule_count`]
900        /// - [`RuntimeMetrics::min_local_schedule_count`]
901        pub max_local_schedule_count: u64,
902
903        /// The minimum number of tasks scheduled from any one worker thread.
904        ///
905        /// ##### Definition
906        /// This metric is derived from the minimum of
907        /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
908        ///
909        /// ##### See also
910        /// - [`RuntimeMetrics::total_local_schedule_count`]
911        /// - [`RuntimeMetrics::max_local_schedule_count`]
912        pub min_local_schedule_count: u64,
913
914        /// The number of times worker threads saturated their local queues.
915        ///
916        /// The worker steal count increases by one each time the worker attempts to schedule a task
917        /// locally, but its local queue is full. When this happens, half of the
918        /// local queue is moved to the global queue.
919        ///
920        /// This metric only applies to the **multi-threaded** scheduler.
921        ///
922        /// ##### Definition
923        /// This metric is derived from the sum of
924        /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
925        ///
926        /// ##### See also
927        /// - [`RuntimeMetrics::min_overflow_count`]
928        /// - [`RuntimeMetrics::max_overflow_count`]
929        ///
930        /// ##### Examples
931        /// ```
932        /// #[tokio::main(flavor = "multi_thread", worker_threads = 1)]
933        /// async fn main() {
934        ///     let handle = tokio::runtime::Handle::current();
935        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
936        ///     let mut intervals = monitor.intervals();
937        ///     let mut next_interval = || intervals.next().unwrap();
938        ///
939        ///     let interval = next_interval(); // end of interval 1
940        ///     assert_eq!(interval.total_overflow_count, 0);
941        ///
942        ///     use std::sync::atomic::{AtomicBool, Ordering};
943        ///
944        ///     // spawn a ton of tasks
945        ///     let _ = tokio::spawn(async {
946        ///         // we do this in a `tokio::spawn` because it is impossible to
947        ///         // overflow the main task
948        ///         for _ in 0..300 {
949        ///             tokio::spawn(async {});
950        ///         }
951        ///     }).await;
952        ///
953        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
954        ///     assert_eq!(interval.total_overflow_count, 1);
955        /// }
956        ///
957        /// async fn flush_metrics() {
958        ///     let _ = tokio::time::sleep(std::time::Duration::from_millis(1)).await;
959        /// }
960        /// ```
961        pub total_overflow_count: u64,
962
963        /// The maximum number of times any one worker saturated its local queue.
964        ///
965        /// ##### Definition
966        /// This metric is derived from the maximum of
967        /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
968        ///
969        /// ##### See also
970        /// - [`RuntimeMetrics::total_overflow_count`]
971        /// - [`RuntimeMetrics::min_overflow_count`]
972        pub max_overflow_count: u64,
973
974        /// The minimum number of times any one worker saturated its local queue.
975        ///
976        /// ##### Definition
977        /// This metric is derived from the maximum of
978        /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
979        ///
980        /// ##### See also
981        /// - [`RuntimeMetrics::total_overflow_count`]
982        /// - [`RuntimeMetrics::max_overflow_count`]
983        pub min_overflow_count: u64,
984
985        /// The number of tasks that have been polled across all worker threads.
986        ///
987        /// The worker poll count increases by one each time a worker polls a scheduled task.
988        ///
989        /// ##### Definition
990        /// This metric is derived from the sum of
991        /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
992        ///
993        /// ##### See also
994        /// - [`RuntimeMetrics::min_polls_count`]
995        /// - [`RuntimeMetrics::max_polls_count`]
996        ///
997        /// ##### Examples
998        /// In the below example, 42 tasks are spawned and polled:
999        /// ```
1000        /// #[tokio::main(flavor = "current_thread")]
1001        /// async fn main() {
1002        ///     let handle = tokio::runtime::Handle::current();
1003        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1004        ///     let mut intervals = monitor.intervals();
1005        ///     let mut next_interval = || intervals.next().unwrap();
1006        ///
1007        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 1
1008        ///     assert_eq!(interval.total_polls_count, 0);
1009        ///     assert_eq!(interval.min_polls_count, 0);
1010        ///     assert_eq!(interval.max_polls_count, 0);
1011        ///
1012        ///     const N: u64 = 42;
1013        ///
1014        ///     for _ in 0..N {
1015        ///         let _ = tokio::spawn(async {}).await;
1016        ///     }
1017        ///
1018        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
1019        ///     assert_eq!(interval.total_polls_count, N);
1020        ///     assert_eq!(interval.min_polls_count, N);
1021        ///     assert_eq!(interval.max_polls_count, N);
1022        /// }
1023        ///
1024        /// async fn flush_metrics() {
1025        ///     let _ = tokio::task::yield_now().await;
1026        /// }
1027        /// ```
1028        pub total_polls_count: u64,
1029
1030        /// The maximum number of tasks that have been polled in any worker thread.
1031        ///
1032        /// ##### Definition
1033        /// This metric is derived from the maximum of
1034        /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1035        ///
1036        /// ##### See also
1037        /// - [`RuntimeMetrics::total_polls_count`]
1038        /// - [`RuntimeMetrics::min_polls_count`]
1039        pub max_polls_count: u64,
1040
1041        /// The minimum number of tasks that have been polled in any worker thread.
1042        ///
1043        /// ##### Definition
1044        /// This metric is derived from the minimum of
1045        /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1046        ///
1047        /// ##### See also
1048        /// - [`RuntimeMetrics::total_polls_count`]
1049        /// - [`RuntimeMetrics::max_polls_count`]
1050        pub min_polls_count: u64,
1051
1052        /// The total number of tasks currently scheduled in workers' local queues.
1053        ///
1054        /// Tasks that are spawned or notified from within a runtime thread are scheduled using that
1055        /// worker's local queue. This metric returns the **current** number of tasks pending in all
1056        /// workers' local queues. As such, the returned value may increase or decrease as new tasks
1057        /// are scheduled and processed.
1058        ///
1059        /// ##### Definition
1060        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`].
1061        ///
1062        /// ##### See also
1063        /// - [`RuntimeMetrics::min_local_queue_depth`]
1064        /// - [`RuntimeMetrics::max_local_queue_depth`]
1065        ///
1066        /// ##### Example
1067        ///
1068        /// ###### With `current_thread` runtime
1069        /// The below example spawns 100 tasks:
1070        /// ```
1071        /// #[tokio::main(flavor = "current_thread")]
1072        /// async fn main() {
1073        ///     const N: usize = 100;
1074        ///
1075        ///     let handle = tokio::runtime::Handle::current();
1076        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1077        ///     let mut intervals = monitor.intervals();
1078        ///     let mut next_interval = || intervals.next().unwrap();
1079        ///
1080        ///     let interval =  next_interval(); // end of interval 1
1081        ///     assert_eq!(interval.total_local_queue_depth, 0);
1082        ///
1083        ///
1084        ///     for _ in 0..N {
1085        ///         tokio::spawn(async {});
1086        ///     }
1087        ///     let interval =  next_interval(); // end of interval 2
1088        ///     assert_eq!(interval.total_local_queue_depth, N);
1089        /// }
1090        /// ```
1091        ///
1092        /// ###### With `multi_thread` runtime
1093        /// The below example spawns 100 tasks:
1094        /// ```
1095        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
1096        /// async fn main() {
1097        ///     const N: usize = 100;
1098        ///
1099        ///     let handle = tokio::runtime::Handle::current();
1100        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1101        ///     let mut intervals = monitor.intervals();
1102        ///     let mut next_interval = || intervals.next().unwrap();
1103        ///
1104        ///     let interval =  next_interval(); // end of interval 1
1105        ///     assert_eq!(interval.total_local_queue_depth, 0);
1106        ///
1107        ///     use std::sync::atomic::{AtomicBool, Ordering};
1108        ///     static SPINLOCK_A: AtomicBool = AtomicBool::new(true);
1109        ///
1110        ///     // block the other worker thread
1111        ///     tokio::spawn(async {
1112        ///         while SPINLOCK_A.load(Ordering::SeqCst) {}
1113        ///     });
1114        ///
1115        ///     static SPINLOCK_B: AtomicBool = AtomicBool::new(true);
1116        ///
1117        ///     let _ = tokio::spawn(async {
1118        ///         for _ in 0..N {
1119        ///             tokio::spawn(async {
1120        ///                 while SPINLOCK_B.load(Ordering::SeqCst) {}
1121        ///             });
1122        ///         }
1123        ///     }).await;
1124        ///
1125        ///     // unblock the other worker thread
1126        ///     SPINLOCK_A.store(false, Ordering::SeqCst);
1127        ///
1128        ///     let interval =  next_interval(); // end of interval 2
1129        ///     assert_eq!(interval.total_local_queue_depth, N - 1);
1130        ///
1131        ///     SPINLOCK_B.store(false, Ordering::SeqCst);
1132        /// }
1133        /// ```
1134        pub total_local_queue_depth: usize,
1135
1136        /// The maximum number of tasks currently scheduled any worker's local queue.
1137        ///
1138        /// ##### Definition
1139        /// This metric is derived from the maximum of
1140        /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1141        ///
1142        /// ##### See also
1143        /// - [`RuntimeMetrics::total_local_queue_depth`]
1144        /// - [`RuntimeMetrics::min_local_queue_depth`]
1145        pub max_local_queue_depth: usize,
1146
1147        /// The minimum number of tasks currently scheduled any worker's local queue.
1148        ///
1149        /// ##### Definition
1150        /// This metric is derived from the minimum of
1151        /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1152        ///
1153        /// ##### See also
1154        /// - [`RuntimeMetrics::total_local_queue_depth`]
1155        /// - [`RuntimeMetrics::max_local_queue_depth`]
1156        pub min_local_queue_depth: usize,
1157
1158        /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool.
1159        ///
1160        /// ##### Definition
1161        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::blocking_queue_depth`].
1162        pub blocking_queue_depth: usize,
1163
1164        /// The number of additional threads spawned by the runtime.
1165        ///
1166        /// ##### Definition
1167        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_blocking_threads`].
1168        pub blocking_threads_count: usize,
1169
1170        /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls.
1171        ///
1172        /// ##### Definition
1173        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_idle_blocking_threads`].
1174        pub idle_blocking_threads_count: usize,
1175
1176        /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets.
1177        ///
1178        /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget.
1179        ///
1180        /// The counter is monotonically increasing. It is never decremented or reset to zero.
1181        ///
1182        /// ##### Definition
1183        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::budget_forced_yield_count`].
1184        pub budget_forced_yield_count: u64,
1185
1186        /// Returns the number of ready events processed by the runtime’s I/O driver.
1187        ///
1188        /// ##### Definition
1189        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::io_driver_ready_count`].
1190        pub io_driver_ready_count: u64,
1191    }
1192}
1193
1194macro_rules! define_semi_stable {
1195    (
1196    $(#[$($attributes:tt)*])*
1197    $vis:vis struct $name:ident {
1198        stable {
1199            $($stable_name:ident: $stable_ty:ty),*
1200            $(,)?
1201        }
1202        $(,)?
1203        unstable {
1204            $($unstable_name:ident: $unstable_ty:ty),*
1205            $(,)?
1206        }
1207    }
1208    ) => {
1209        $(#[$($attributes)*])*
1210        $vis struct $name {
1211            $(
1212                $stable_name: $stable_ty,
1213            )*
1214            $(
1215                #[cfg(tokio_unstable)]
1216                #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
1217                $unstable_name: $unstable_ty,
1218            )*
1219        }
1220    };
1221}
1222
1223define_semi_stable! {
1224    /// Snapshot of per-worker metrics
1225    #[derive(Debug, Default)]
1226    struct Worker {
1227        stable {
1228            worker: usize,
1229            total_park_count: u64,
1230            total_busy_duration: Duration,
1231        }
1232        unstable {
1233            total_noop_count: u64,
1234            total_steal_count: u64,
1235            total_steal_operations: u64,
1236            total_local_schedule_count: u64,
1237            total_overflow_count: u64,
1238            total_polls_count: u64,
1239            poll_time_histogram: Vec<u64>,
1240        }
1241    }
1242}
1243
1244define_semi_stable! {
1245    /// Iterator returned by [`RuntimeMonitor::intervals`].
1246    ///
1247    /// See that method's documentation for more details.
1248    #[derive(Debug)]
1249    pub struct RuntimeIntervals {
1250        stable {
1251            runtime: runtime::RuntimeMetrics,
1252            started_at: Instant,
1253            workers: Vec<Worker>,
1254        }
1255        unstable {
1256            // Number of tasks scheduled from *outside* of the runtime
1257            num_remote_schedules: u64,
1258            budget_forced_yield_count: u64,
1259            io_driver_ready_count: u64,
1260        }
1261    }
1262}
1263
1264impl RuntimeIntervals {
1265    fn probe(&mut self) -> RuntimeMetrics {
1266        let now = Instant::now();
1267
1268        let mut metrics = RuntimeMetrics {
1269            workers_count: self.runtime.num_workers(),
1270            live_tasks_count: self.runtime.num_alive_tasks(),
1271            elapsed: now - self.started_at,
1272            global_queue_depth: self.runtime.global_queue_depth(),
1273            min_park_count: u64::MAX,
1274            min_busy_duration: Duration::from_secs(1000000000),
1275            ..Default::default()
1276        };
1277
1278        #[cfg(tokio_unstable)]
1279        {
1280            let num_remote_schedules = self.runtime.remote_schedule_count();
1281            let budget_forced_yields = self.runtime.budget_forced_yield_count();
1282            let io_driver_ready_events = self.runtime.io_driver_ready_count();
1283
1284            metrics.num_remote_schedules = num_remote_schedules - self.num_remote_schedules;
1285            metrics.min_noop_count = u64::MAX;
1286            metrics.min_steal_count = u64::MAX;
1287            metrics.min_local_schedule_count = u64::MAX;
1288            metrics.min_overflow_count = u64::MAX;
1289            metrics.min_polls_count = u64::MAX;
1290            metrics.min_local_queue_depth = usize::MAX;
1291            metrics.mean_poll_duration_worker_min = Duration::MAX;
1292            metrics.poll_time_histogram = vec![0; self.runtime.poll_time_histogram_num_buckets()];
1293            metrics.budget_forced_yield_count =
1294                budget_forced_yields - self.budget_forced_yield_count;
1295            metrics.io_driver_ready_count = io_driver_ready_events - self.io_driver_ready_count;
1296
1297            self.num_remote_schedules = num_remote_schedules;
1298            self.budget_forced_yield_count = budget_forced_yields;
1299            self.io_driver_ready_count = io_driver_ready_events;
1300        }
1301        self.started_at = now;
1302
1303        for worker in &mut self.workers {
1304            worker.probe(&self.runtime, &mut metrics);
1305        }
1306
1307        #[cfg(tokio_unstable)]
1308        {
1309            if metrics.total_polls_count == 0 {
1310                debug_assert_eq!(metrics.mean_poll_duration, Duration::default());
1311
1312                metrics.mean_poll_duration_worker_max = Duration::default();
1313                metrics.mean_poll_duration_worker_min = Duration::default();
1314            }
1315        }
1316
1317        metrics
1318    }
1319}
1320
1321impl Iterator for RuntimeIntervals {
1322    type Item = RuntimeMetrics;
1323
1324    fn next(&mut self) -> Option<RuntimeMetrics> {
1325        Some(self.probe())
1326    }
1327}
1328
1329impl RuntimeMonitor {
1330    /// Creates a new [`RuntimeMonitor`].
1331    pub fn new(runtime: &runtime::Handle) -> RuntimeMonitor {
1332        let runtime = runtime.metrics();
1333
1334        RuntimeMonitor { runtime }
1335    }
1336
1337    /// Produces an unending iterator of [`RuntimeMetrics`].
1338    ///
1339    /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1340    /// produced by [`RuntimeMonitor::intervals`]. The item type of this iterator is [`RuntimeMetrics`],
1341    /// which is a bundle of runtime metrics that describe *only* changes occurring within that sampling
1342    /// interval.
1343    ///
1344    /// # Example
1345    ///
1346    /// ```
1347    /// use std::time::Duration;
1348    ///
1349    /// #[tokio::main]
1350    /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1351    ///     let handle = tokio::runtime::Handle::current();
1352    ///     // construct the runtime metrics monitor
1353    ///     let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1354    ///
1355    ///     // print runtime metrics every 500ms
1356    ///     {
1357    ///         tokio::spawn(async move {
1358    ///             for interval in runtime_monitor.intervals() {
1359    ///                 // pretty-print the metric interval
1360    ///                 println!("{:?}", interval);
1361    ///                 // wait 500ms
1362    ///                 tokio::time::sleep(Duration::from_millis(500)).await;
1363    ///             }
1364    ///         });
1365    ///     }
1366    ///
1367    ///     // await some tasks
1368    ///     tokio::join![
1369    ///         do_work(),
1370    ///         do_work(),
1371    ///         do_work(),
1372    ///     ];
1373    ///
1374    ///     Ok(())
1375    /// }
1376    ///
1377    /// async fn do_work() {
1378    ///     for _ in 0..25 {
1379    ///         tokio::task::yield_now().await;
1380    ///         tokio::time::sleep(Duration::from_millis(100)).await;
1381    ///     }
1382    /// }
1383    /// ```
1384    pub fn intervals(&self) -> RuntimeIntervals {
1385        let started_at = Instant::now();
1386
1387        let workers = (0..self.runtime.num_workers())
1388            .map(|worker| Worker::new(worker, &self.runtime))
1389            .collect();
1390
1391        RuntimeIntervals {
1392            runtime: self.runtime.clone(),
1393            started_at,
1394            workers,
1395
1396            #[cfg(tokio_unstable)]
1397            num_remote_schedules: self.runtime.remote_schedule_count(),
1398            #[cfg(tokio_unstable)]
1399            budget_forced_yield_count: self.runtime.budget_forced_yield_count(),
1400            #[cfg(tokio_unstable)]
1401            io_driver_ready_count: self.runtime.io_driver_ready_count(),
1402        }
1403    }
1404}
1405
1406impl Worker {
1407    fn new(worker: usize, rt: &runtime::RuntimeMetrics) -> Worker {
1408        #[allow(unused_mut, clippy::needless_update)]
1409        let mut wrk = Worker {
1410            worker,
1411            total_park_count: rt.worker_park_count(worker),
1412            total_busy_duration: rt.worker_total_busy_duration(worker),
1413            ..Default::default()
1414        };
1415
1416        #[cfg(tokio_unstable)]
1417        {
1418            let poll_time_histogram = if rt.poll_time_histogram_enabled() {
1419                vec![0; rt.poll_time_histogram_num_buckets()]
1420            } else {
1421                vec![]
1422            };
1423            wrk.total_noop_count = rt.worker_noop_count(worker);
1424            wrk.total_steal_count = rt.worker_steal_count(worker);
1425            wrk.total_steal_operations = rt.worker_steal_operations(worker);
1426            wrk.total_local_schedule_count = rt.worker_local_schedule_count(worker);
1427            wrk.total_overflow_count = rt.worker_overflow_count(worker);
1428            wrk.total_polls_count = rt.worker_poll_count(worker);
1429            wrk.poll_time_histogram = poll_time_histogram;
1430        };
1431        wrk
1432    }
1433
1434    fn probe(&mut self, rt: &runtime::RuntimeMetrics, metrics: &mut RuntimeMetrics) {
1435        macro_rules! metric {
1436            ( $sum:ident, $max:ident, $min:ident, $probe:ident ) => {{
1437                let val = rt.$probe(self.worker);
1438                let delta = val - self.$sum;
1439                self.$sum = val;
1440
1441                metrics.$sum += delta;
1442
1443                if delta > metrics.$max {
1444                    metrics.$max = delta;
1445                }
1446
1447                if delta < metrics.$min {
1448                    metrics.$min = delta;
1449                }
1450            }};
1451        }
1452
1453        metric!(
1454            total_park_count,
1455            max_park_count,
1456            min_park_count,
1457            worker_park_count
1458        );
1459        metric!(
1460            total_busy_duration,
1461            max_busy_duration,
1462            min_busy_duration,
1463            worker_total_busy_duration
1464        );
1465
1466        #[cfg(tokio_unstable)]
1467        {
1468            let mut worker_polls_count = self.total_polls_count;
1469            let total_polls_count = metrics.total_polls_count;
1470
1471            metric!(
1472                total_noop_count,
1473                max_noop_count,
1474                min_noop_count,
1475                worker_noop_count
1476            );
1477            metric!(
1478                total_steal_count,
1479                max_steal_count,
1480                min_steal_count,
1481                worker_steal_count
1482            );
1483            metric!(
1484                total_steal_operations,
1485                max_steal_operations,
1486                min_steal_operations,
1487                worker_steal_operations
1488            );
1489            metric!(
1490                total_local_schedule_count,
1491                max_local_schedule_count,
1492                min_local_schedule_count,
1493                worker_local_schedule_count
1494            );
1495            metric!(
1496                total_overflow_count,
1497                max_overflow_count,
1498                min_overflow_count,
1499                worker_overflow_count
1500            );
1501            metric!(
1502                total_polls_count,
1503                max_polls_count,
1504                min_polls_count,
1505                worker_poll_count
1506            );
1507
1508            // Get the number of polls since last probe
1509            worker_polls_count = self.total_polls_count - worker_polls_count;
1510
1511            // Update the mean task poll duration if there were polls
1512            if worker_polls_count > 0 {
1513                let val = rt.worker_mean_poll_time(self.worker);
1514
1515                if val > metrics.mean_poll_duration_worker_max {
1516                    metrics.mean_poll_duration_worker_max = val;
1517                }
1518
1519                if val < metrics.mean_poll_duration_worker_min {
1520                    metrics.mean_poll_duration_worker_min = val;
1521                }
1522
1523                // First, scale the current value down
1524                let ratio = total_polls_count as f64 / metrics.total_polls_count as f64;
1525                let mut mean = metrics.mean_poll_duration.as_nanos() as f64 * ratio;
1526
1527                // Add the scaled current worker's mean poll duration
1528                let ratio = worker_polls_count as f64 / metrics.total_polls_count as f64;
1529                mean += val.as_nanos() as f64 * ratio;
1530
1531                metrics.mean_poll_duration = Duration::from_nanos(mean as u64);
1532            }
1533
1534            // Update the histogram counts if there were polls since last count
1535            if worker_polls_count > 0 {
1536                for (bucket, cell) in metrics.poll_time_histogram.iter_mut().enumerate() {
1537                    let new = rt.poll_time_histogram_bucket_count(self.worker, bucket);
1538                    let delta = new - self.poll_time_histogram[bucket];
1539                    self.poll_time_histogram[bucket] = new;
1540
1541                    *cell += delta;
1542                }
1543            }
1544
1545            // Local scheduled tasks is an absolute value
1546            let local_scheduled_tasks = rt.worker_local_queue_depth(self.worker);
1547            metrics.total_local_queue_depth += local_scheduled_tasks;
1548
1549            if local_scheduled_tasks > metrics.max_local_queue_depth {
1550                metrics.max_local_queue_depth = local_scheduled_tasks;
1551            }
1552
1553            if local_scheduled_tasks < metrics.min_local_queue_depth {
1554                metrics.min_local_queue_depth = local_scheduled_tasks;
1555            }
1556
1557            // Blocking queue depth is an absolute value too
1558            metrics.blocking_queue_depth = rt.blocking_queue_depth();
1559
1560            metrics.blocking_threads_count = rt.num_blocking_threads();
1561            metrics.idle_blocking_threads_count = rt.num_idle_blocking_threads();
1562        }
1563    }
1564}
1565
1566derived_metrics!(
1567    [RuntimeMetrics] {
1568        stable {
1569            /// Returns the ratio of the [`RuntimeMetrics::total_busy_duration`] to the [`RuntimeMetrics::elapsed`].
1570            pub fn busy_ratio(&self) -> f64 {
1571                self.total_busy_duration.as_nanos() as f64 / self.elapsed.as_nanos() as f64
1572            }
1573        }
1574        unstable {
1575            /// Returns the ratio of the [`RuntimeMetrics::total_polls_count`] to the [`RuntimeMetrics::total_noop_count`].
1576            pub fn mean_polls_per_park(&self) -> f64 {
1577                let total_park_count = self.total_park_count - self.total_noop_count;
1578                if total_park_count == 0 {
1579                    0.0
1580                } else {
1581                    self.total_polls_count as f64 / total_park_count as f64
1582                }
1583            }
1584        }
1585    }
1586);