Skip to main content

tokio_metrics/
runtime.rs

1use crate::derived_metrics::derived_metrics;
2#[cfg(tokio_unstable)]
3use std::ops::Range;
4use std::time::{Duration, Instant};
5use tokio::runtime;
6
7#[cfg(tokio_unstable)]
8mod poll_time_histogram;
9#[cfg(tokio_unstable)]
10pub use poll_time_histogram::{HistogramBucket, PollTimeHistogram};
11
12#[cfg(feature = "metrics-rs-integration")]
13pub(crate) mod metrics_rs_integration;
14
15/// Monitors key metrics of the tokio runtime.
16///
17/// ### Usage
18/// ```
19/// use std::time::Duration;
20/// use tokio_metrics::RuntimeMonitor;
21///
22/// #[tokio::main]
23/// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
24///     let handle = tokio::runtime::Handle::current();
25///
26///     // print runtime metrics every 500ms
27///     {
28///         let runtime_monitor = RuntimeMonitor::new(&handle);
29///         tokio::spawn(async move {
30///             for interval in runtime_monitor.intervals() {
31///                 // pretty-print the metric interval
32///                 println!("{:?}", interval);
33///                 // wait 500ms
34///                 tokio::time::sleep(Duration::from_millis(500)).await;
35///             }
36///         });
37///     }
38///
39///     // await some tasks
40///     tokio::join![
41///         do_work(),
42///         do_work(),
43///         do_work(),
44///     ];
45///
46///     Ok(())
47/// }
48///
49/// async fn do_work() {
50///     for _ in 0..25 {
51///         tokio::task::yield_now().await;
52///         tokio::time::sleep(Duration::from_millis(100)).await;
53///     }
54/// }
55/// ```
56#[derive(Debug)]
57pub struct RuntimeMonitor {
58    /// Handle to the runtime
59    runtime: runtime::RuntimeMetrics,
60}
61
62macro_rules! define_runtime_metrics {
63    (
64    stable {
65        $(
66            $(#[$($attributes:tt)*])*
67            $vis:vis $name:ident: $ty:ty
68        ),*
69        $(,)?
70    }
71    unstable {
72        $(
73            $(#[$($unstable_attributes:tt)*])*
74            $unstable_vis:vis $unstable_name:ident: $unstable_ty:ty
75        ),*
76        $(,)?
77    }
78    ) => {
79        /// Key runtime metrics.
80        #[non_exhaustive]
81        #[cfg_attr(feature = "metrique-integration", metrique::unit_of_work::metrics(subfield_owned))]
82        #[derive(Default, Debug, Clone)]
83        pub struct RuntimeMetrics {
84            $(
85                $(#[$($attributes)*])*
86                #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
87                $vis $name: $ty,
88            )*
89            $(
90                $(#[$($unstable_attributes)*])*
91                #[cfg(tokio_unstable)]
92                #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
93                $unstable_vis $unstable_name: $unstable_ty,
94            )*
95        }
96    };
97}
98
99define_runtime_metrics! {
100    stable {
101        /// The number of worker threads used by the runtime.
102        ///
103        /// This metric is static for a runtime.
104        ///
105        /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`].
106        /// When using the `current_thread` runtime, the return value is always `1`.
107        ///
108        /// The number of workers is set by configuring
109        /// [`worker_threads`][`tokio::runtime::Builder::worker_threads`] with
110        /// [`tokio::runtime::Builder`], or by parameterizing [`tokio::main`].
111        ///
112        /// ##### Examples
113        /// In the below example, the number of workers is set by parameterizing [`tokio::main`]:
114        /// ```
115        /// use tokio::runtime::Handle;
116        ///
117        /// #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
118        /// async fn main() {
119        ///     let handle = tokio::runtime::Handle::current();
120        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
121        ///     let mut intervals = monitor.intervals();
122        ///     let mut next_interval = || intervals.next().unwrap();
123        ///
124        ///     assert_eq!(next_interval().workers_count, 10);
125        /// }
126        /// ```
127        ///
128        /// [`tokio::main`]: https://docs.rs/tokio/latest/tokio/attr.main.html
129        ///
130        /// When using the `current_thread` runtime, the return value is always `1`; e.g.:
131        /// ```
132        /// use tokio::runtime::Handle;
133        ///
134        /// #[tokio::main(flavor = "current_thread")]
135        /// async fn main() {
136        ///     let handle = tokio::runtime::Handle::current();
137        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
138        ///     let mut intervals = monitor.intervals();
139        ///     let mut next_interval = || intervals.next().unwrap();
140        ///
141        ///     assert_eq!(next_interval().workers_count, 1);
142        /// }
143        /// ```
144        ///
145        /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`]; e.g.:
146        /// ```
147        /// use tokio::runtime::Handle;
148        ///
149        /// #[tokio::main]
150        /// async fn main() {
151        ///     let handle = Handle::current();
152        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
153        ///     let mut intervals = monitor.intervals();
154        ///     let mut next_interval = || intervals.next().unwrap();
155        ///
156        ///     assert_eq!(next_interval().workers_count, handle.metrics().num_workers());
157        /// }
158        /// ```
159        pub workers_count: usize,
160
161        /// The current number of alive tasks in the runtime.
162        ///
163        /// ##### Definition
164        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_alive_tasks`].
165        pub live_tasks_count: usize,
166
167        /// The number of times worker threads parked.
168        ///
169        /// The worker park count increases by one each time the worker parks the thread waiting for
170        /// new inbound events to process. This usually means the worker has processed all pending work
171        /// and is currently idle.
172        ///
173        /// ##### Definition
174        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_park_count`]
175        /// across all worker threads.
176        ///
177        /// ##### See also
178        /// - [`RuntimeMetrics::max_park_count`]
179        /// - [`RuntimeMetrics::min_park_count`]
180        ///
181        /// ##### Examples
182        /// ```
183        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
184        /// async fn main() {
185        ///     let handle = tokio::runtime::Handle::current();
186        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
187        ///     let mut intervals = monitor.intervals();
188        ///     let mut next_interval = || intervals.next().unwrap();
189        ///
190        ///     let interval = next_interval(); // end of interval 1
191        ///     assert_eq!(interval.total_park_count, 0);
192        ///
193        ///     induce_parks().await;
194        ///
195        ///     let interval = next_interval(); // end of interval 2
196        ///     assert!(interval.total_park_count >= 1); // usually 1 or 2 parks
197        /// }
198        ///
199        /// async fn induce_parks() {
200        ///     let _ = tokio::time::timeout(std::time::Duration::ZERO, async {
201        ///         loop { tokio::task::yield_now().await; }
202        ///     }).await;
203        /// }
204        /// ```
205        pub total_park_count: u64,
206
207        /// The maximum number of times any worker thread parked.
208        ///
209        /// ##### Definition
210        /// This metric is derived from the maximum of
211        /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
212        ///
213        /// ##### See also
214        /// - [`RuntimeMetrics::total_park_count`]
215        /// - [`RuntimeMetrics::min_park_count`]
216        pub max_park_count: u64,
217
218        /// The minimum number of times any worker thread parked.
219        ///
220        /// ##### Definition
221        /// This metric is derived from the maximum of
222        /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
223        ///
224        /// ##### See also
225        /// - [`RuntimeMetrics::total_park_count`]
226        /// - [`RuntimeMetrics::max_park_count`]
227        pub min_park_count: u64,
228
229        /// The amount of time worker threads were busy.
230        ///
231        /// The worker busy duration increases whenever the worker is spending time processing work.
232        /// Using this value can indicate the total load of workers.
233        ///
234        /// ##### Definition
235        /// This metric is derived from the sum of
236        /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
237        ///
238        /// ##### See also
239        /// - [`RuntimeMetrics::min_busy_duration`]
240        /// - [`RuntimeMetrics::max_busy_duration`]
241        ///
242        /// ##### Examples
243        /// In the below example, tasks spend a total of 3s busy:
244        /// ```
245        /// use tokio::time::Duration;
246        ///
247        /// fn main() {
248        ///     let start = tokio::time::Instant::now();
249        ///
250        ///     let rt = tokio::runtime::Builder::new_current_thread()
251        ///         .enable_all()
252        ///         .build()
253        ///         .unwrap();
254        ///
255        ///     let handle = rt.handle();
256        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
257        ///     let mut intervals = monitor.intervals();
258        ///     let mut next_interval = || intervals.next().unwrap();
259        ///
260        ///     let delay_1s = Duration::from_secs(1);
261        ///     let delay_3s = Duration::from_secs(3);
262        ///
263        ///     rt.block_on(async {
264        ///         // keep the main task busy for 1s
265        ///         spin_for(delay_1s);
266        ///
267        ///         // spawn a task and keep it busy for 2s
268        ///         let _ = tokio::spawn(async move {
269        ///             spin_for(delay_3s);
270        ///         }).await;
271        ///     });
272        ///
273        ///     // flush metrics
274        ///     drop(rt);
275        ///
276        ///     let elapsed = start.elapsed();
277        ///
278        ///     let interval =  next_interval(); // end of interval 2
279        ///     assert!(interval.total_busy_duration >= delay_1s + delay_3s);
280        ///     assert!(interval.total_busy_duration <= elapsed);
281        /// }
282        ///
283        /// fn time<F>(task: F) -> Duration
284        /// where
285        ///     F: Fn() -> ()
286        /// {
287        ///     let start = tokio::time::Instant::now();
288        ///     task();
289        ///     start.elapsed()
290        /// }
291        ///
292        /// /// Block the current thread for a given `duration`.
293        /// fn spin_for(duration: Duration) {
294        ///     let start = tokio::time::Instant::now();
295        ///     while start.elapsed() <= duration {}
296        /// }
297        /// ```
298        ///
299        /// Busy times may not accumulate as the above example suggests (FIXME: Why?); e.g., if we
300        /// remove the three second delay, the time spent busy falls to mere microseconds:
301        /// ```should_panic
302        /// use tokio::time::Duration;
303        ///
304        /// fn main() {
305        ///     let rt = tokio::runtime::Builder::new_current_thread()
306        ///         .enable_all()
307        ///         .build()
308        ///         .unwrap();
309        ///
310        ///     let handle = rt.handle();
311        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
312        ///     let mut intervals = monitor.intervals();
313        ///     let mut next_interval = || intervals.next().unwrap();
314        ///
315        ///     let delay_1s = Duration::from_secs(1);
316        ///
317        ///     let elapsed = time(|| rt.block_on(async {
318        ///         // keep the main task busy for 1s
319        ///         spin_for(delay_1s);
320        ///     }));
321        ///
322        ///     // flush metrics
323        ///     drop(rt);
324        ///
325        ///     let interval =  next_interval(); // end of interval 2
326        ///     assert!(interval.total_busy_duration >= delay_1s); // FAIL
327        ///     assert!(interval.total_busy_duration <= elapsed);
328        /// }
329        ///
330        /// fn time<F>(task: F) -> Duration
331        /// where
332        ///     F: Fn() -> ()
333        /// {
334        ///     let start = tokio::time::Instant::now();
335        ///     task();
336        ///     start.elapsed()
337        /// }
338        ///
339        /// /// Block the current thread for a given `duration`.
340        /// fn spin_for(duration: Duration) {
341        ///     let start = tokio::time::Instant::now();
342        ///     while start.elapsed() <= duration {}
343        /// }
344        /// ```
345        pub total_busy_duration: Duration,
346
347        /// The maximum amount of time a worker thread was busy.
348        ///
349        /// ##### Definition
350        /// This metric is derived from the maximum of
351        /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
352        ///
353        /// ##### See also
354        /// - [`RuntimeMetrics::total_busy_duration`]
355        /// - [`RuntimeMetrics::min_busy_duration`]
356        pub max_busy_duration: Duration,
357
358        /// The minimum amount of time a worker thread was busy.
359        ///
360        /// ##### Definition
361        /// This metric is derived from the minimum of
362        /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
363        ///
364        /// ##### See also
365        /// - [`RuntimeMetrics::total_busy_duration`]
366        /// - [`RuntimeMetrics::max_busy_duration`]
367        pub min_busy_duration: Duration,
368
369        /// The number of tasks currently scheduled in the runtime's global queue.
370        ///
371        /// Tasks that are spawned or notified from a non-runtime thread are scheduled using the
372        /// runtime's global queue. This metric returns the **current** number of tasks pending in
373        /// the global queue. As such, the returned value may increase or decrease as new tasks are
374        /// scheduled and processed.
375        ///
376        /// ##### Definition
377        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::global_queue_depth`].
378        ///
379        /// ##### Example
380        /// ```
381        /// # let current_thread = tokio::runtime::Builder::new_current_thread()
382        /// #     .enable_all()
383        /// #     .build()
384        /// #     .unwrap();
385        /// #
386        /// # let multi_thread = tokio::runtime::Builder::new_multi_thread()
387        /// #     .worker_threads(2)
388        /// #     .enable_all()
389        /// #     .build()
390        /// #     .unwrap();
391        /// #
392        /// # for runtime in [current_thread, multi_thread] {
393        /// let handle = runtime.handle().clone();
394        /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
395        /// let mut intervals = monitor.intervals();
396        /// let mut next_interval = || intervals.next().unwrap();
397        ///
398        /// let interval = next_interval(); // end of interval 1
399        /// # #[cfg(tokio_unstable)]
400        /// assert_eq!(interval.num_remote_schedules, 0);
401        ///
402        /// // spawn a system thread outside of the runtime
403        /// std::thread::spawn(move || {
404        ///     // spawn two tasks from this non-runtime thread
405        ///     handle.spawn(async {});
406        ///     handle.spawn(async {});
407        /// }).join().unwrap();
408        ///
409        /// // flush metrics
410        /// drop(runtime);
411        ///
412        /// let interval = next_interval(); // end of interval 2
413        /// # #[cfg(tokio_unstable)]
414        /// assert_eq!(interval.num_remote_schedules, 2);
415        /// # }
416        /// ```
417        pub global_queue_depth: usize,
418
419        /// Total amount of time elapsed since observing runtime metrics.
420        pub elapsed: Duration,
421    }
422    unstable {
423        /// The average duration of a single invocation of poll on a task.
424        ///
425        /// This average is an exponentially-weighted moving average of the duration
426        /// of task polls on all runtime workers.
427        ///
428        /// ##### Examples
429        /// ```
430        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
431        /// async fn main() {
432        ///     let handle = tokio::runtime::Handle::current();
433        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
434        ///     let mut intervals = monitor.intervals();
435        ///     let mut next_interval = || intervals.next().unwrap();
436        ///
437        ///     let interval = next_interval();
438        ///     println!("mean task poll duration is {:?}", interval.mean_poll_duration);
439        /// }
440        /// ```
441        pub mean_poll_duration: Duration,
442
443        /// The average duration of a single invocation of poll on a task on the
444        /// worker with the lowest value.
445        ///
446        /// This average is an exponentially-weighted moving average of the duration
447        /// of task polls on the runtime worker with the lowest value.
448        ///
449        /// ##### Examples
450        /// ```
451        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
452        /// async fn main() {
453        ///     let handle = tokio::runtime::Handle::current();
454        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
455        ///     let mut intervals = monitor.intervals();
456        ///     let mut next_interval = || intervals.next().unwrap();
457        ///
458        ///     let interval = next_interval();
459        ///     println!("min mean task poll duration is {:?}", interval.mean_poll_duration_worker_min);
460        /// }
461        /// ```
462        pub mean_poll_duration_worker_min: Duration,
463
464        /// The average duration of a single invocation of poll on a task on the
465        /// worker with the highest value.
466        ///
467        /// This average is an exponentially-weighted moving average of the duration
468        /// of task polls on the runtime worker with the highest value.
469        ///
470        /// ##### Examples
471        /// ```
472        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
473        /// async fn main() {
474        ///     let handle = tokio::runtime::Handle::current();
475        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
476        ///     let mut intervals = monitor.intervals();
477        ///     let mut next_interval = || intervals.next().unwrap();
478        ///
479        ///     let interval = next_interval();
480        ///     println!("max mean task poll duration is {:?}", interval.mean_poll_duration_worker_max);
481        /// }
482        /// ```
483        pub mean_poll_duration_worker_max: Duration,
484
485        /// A histogram of task polls since the previous probe grouped by poll
486        /// times.
487        ///
488        /// Each bucket contains the configured [`Duration`] range and the count
489        /// of task polls that fell into that range during the interval. Use
490        /// [`PollTimeHistogram::as_counts`] to get just the raw counts as a
491        /// `Vec<u64>`.
492        ///
493        /// This metric must be explicitly enabled when creating the runtime with
494        /// [`enable_metrics_poll_time_histogram`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
495        /// Bucket sizes are fixed and configured at the runtime level. See
496        /// configuration options on
497        /// [`runtime::Builder`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
498        ///
499        /// ##### Examples
500        /// ```
501        /// use tokio::runtime::HistogramConfiguration;
502        /// use std::time::Duration;
503        ///
504        /// let config = HistogramConfiguration::linear(Duration::from_micros(50), 12);
505        ///
506        /// let rt = tokio::runtime::Builder::new_multi_thread()
507        ///     .enable_metrics_poll_time_histogram()
508        ///     .metrics_poll_time_histogram_configuration(config)
509        ///     .build()
510        ///     .unwrap();
511        ///
512        /// rt.block_on(async {
513        ///     let handle = tokio::runtime::Handle::current();
514        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
515        ///     let mut intervals = monitor.intervals();
516        ///     let mut next_interval = || intervals.next().unwrap();
517        ///
518        ///     let interval = next_interval();
519        ///     for bucket in interval.poll_time_histogram.buckets() {
520        ///         println!("{:?}..{:?} => {} polls", bucket.range_start(), bucket.range_end(), bucket.count());
521        ///     }
522        /// });
523        /// ```
524        pub poll_time_histogram: PollTimeHistogram,
525
526        /// The number of times worker threads unparked but performed no work before parking again.
527        ///
528        /// The worker no-op count increases by one each time the worker unparks the thread but finds
529        /// no new work and goes back to sleep. This indicates a false-positive wake up.
530        ///
531        /// ##### Definition
532        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_noop_count`]
533        /// across all worker threads.
534        ///
535        /// ##### Examples
536        /// Unfortunately, there isn't a great way to reliably induce no-op parks, as they occur as
537        /// false-positive events under concurrency.
538        ///
539        /// The below example triggers fewer than two parks in the single-threaded runtime:
540        /// ```
541        /// #[tokio::main(flavor = "current_thread")]
542        /// async fn main() {
543        ///     let handle = tokio::runtime::Handle::current();
544        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
545        ///     let mut intervals = monitor.intervals();
546        ///     let mut next_interval = || intervals.next().unwrap();
547        ///
548        ///     assert_eq!(next_interval().total_park_count, 0);
549        ///
550        ///     async {
551        ///         tokio::time::sleep(std::time::Duration::from_millis(1)).await;
552        ///     }.await;
553        ///
554        ///     assert!(next_interval().total_park_count > 0);
555        /// }
556        /// ```
557        ///
558        /// The below example triggers fewer than two parks in the multi-threaded runtime:
559        /// ```
560        /// #[tokio::main(flavor = "multi_thread")]
561        /// async fn main() {
562        ///     let handle = tokio::runtime::Handle::current();
563        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
564        ///     let mut intervals = monitor.intervals();
565        ///     let mut next_interval = || intervals.next().unwrap();
566        ///
567        ///     async {
568        ///         tokio::time::sleep(std::time::Duration::from_millis(1)).await;
569        ///     }.await;
570        ///
571        ///     assert!(next_interval().total_noop_count > 0);
572        /// }
573        /// ```
574        pub total_noop_count: u64,
575
576        /// The maximum number of times any worker thread unparked but performed no work before parking
577        /// again.
578        ///
579        /// ##### Definition
580        /// This metric is derived from the maximum of
581        /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
582        ///
583        /// ##### See also
584        /// - [`RuntimeMetrics::total_noop_count`]
585        /// - [`RuntimeMetrics::min_noop_count`]
586        pub max_noop_count: u64,
587
588        /// The minimum number of times any worker thread unparked but performed no work before parking
589        /// again.
590        ///
591        /// ##### Definition
592        /// This metric is derived from the minimum of
593        /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
594        ///
595        /// ##### See also
596        /// - [`RuntimeMetrics::total_noop_count`]
597        /// - [`RuntimeMetrics::max_noop_count`]
598        pub min_noop_count: u64,
599
600        /// The number of tasks worker threads stole from another worker thread.
601        ///
602        /// The worker steal count increases by the amount of stolen tasks each time the worker
603        /// has processed its scheduled queue and successfully steals more pending tasks from another
604        /// worker.
605        ///
606        /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
607        /// using the current thread runtime.
608        ///
609        /// ##### Definition
610        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] for
611        /// all worker threads.
612        ///
613        /// ##### See also
614        /// - [`RuntimeMetrics::min_steal_count`]
615        /// - [`RuntimeMetrics::max_steal_count`]
616        ///
617        /// ##### Examples
618        /// In the below example, a blocking channel is used to backup one worker thread:
619        /// ```
620        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
621        /// async fn main() {
622        ///     let handle = tokio::runtime::Handle::current();
623        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
624        ///     let mut intervals = monitor.intervals();
625        ///     let mut next_interval = || intervals.next().unwrap();
626        ///
627        ///     let interval = next_interval(); // end of first sampling interval
628        ///     assert_eq!(interval.total_steal_count, 0);
629        ///     assert_eq!(interval.min_steal_count, 0);
630        ///     assert_eq!(interval.max_steal_count, 0);
631        ///
632        ///     // induce a steal
633        ///     async {
634        ///         let (tx, rx) = std::sync::mpsc::channel();
635        ///         // Move to the runtime.
636        ///         tokio::spawn(async move {
637        ///             // Spawn the task that sends to the channel
638        ///             tokio::spawn(async move {
639        ///                 tx.send(()).unwrap();
640        ///             });
641        ///             // Spawn a task that bumps the previous task out of the "next
642        ///             // scheduled" slot.
643        ///             tokio::spawn(async {});
644        ///             // Blocking receive on the channel.
645        ///             rx.recv().unwrap();
646        ///             flush_metrics().await;
647        ///         }).await.unwrap();
648        ///         flush_metrics().await;
649        ///     }.await;
650        ///
651        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
652        ///     println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
653        ///
654        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 3
655        ///     println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
656        /// }
657        ///
658        /// async fn flush_metrics() {
659        ///     let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
660        /// }
661        /// ```
662        pub total_steal_count: u64,
663
664        /// The maximum number of tasks any worker thread stole from another worker thread.
665        ///
666        /// ##### Definition
667        /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
668        /// across all worker threads.
669        ///
670        /// ##### See also
671        /// - [`RuntimeMetrics::total_steal_count`]
672        /// - [`RuntimeMetrics::min_steal_count`]
673        pub max_steal_count: u64,
674
675        /// The minimum number of tasks any worker thread stole from another worker thread.
676        ///
677        /// ##### Definition
678        /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
679        /// across all worker threads.
680        ///
681        /// ##### See also
682        /// - [`RuntimeMetrics::total_steal_count`]
683        /// - [`RuntimeMetrics::max_steal_count`]
684        pub min_steal_count: u64,
685
686        /// The number of times worker threads stole tasks from another worker thread.
687        ///
688        /// The worker steal operations increases by one each time the worker has processed its
689        /// scheduled queue and successfully steals more pending tasks from another worker.
690        ///
691        /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
692        /// using the current thread runtime.
693        ///
694        /// ##### Definition
695        /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
696        /// for all worker threads.
697        ///
698        /// ##### See also
699        /// - [`RuntimeMetrics::min_steal_operations`]
700        /// - [`RuntimeMetrics::max_steal_operations`]
701        ///
702        /// ##### Examples
703        /// In the below example, a blocking channel is used to backup one worker thread:
704        /// ```
705        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
706        /// async fn main() {
707        ///     let handle = tokio::runtime::Handle::current();
708        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
709        ///     let mut intervals = monitor.intervals();
710        ///     let mut next_interval = || intervals.next().unwrap();
711        ///
712        ///     let interval = next_interval(); // end of first sampling interval
713        ///     assert_eq!(interval.total_steal_operations, 0);
714        ///     assert_eq!(interval.min_steal_operations, 0);
715        ///     assert_eq!(interval.max_steal_operations, 0);
716        ///
717        ///     // induce a steal
718        ///     async {
719        ///         let (tx, rx) = std::sync::mpsc::channel();
720        ///         // Move to the runtime.
721        ///         tokio::spawn(async move {
722        ///             // Spawn the task that sends to the channel
723        ///             tokio::spawn(async move {
724        ///                 tx.send(()).unwrap();
725        ///             });
726        ///             // Spawn a task that bumps the previous task out of the "next
727        ///             // scheduled" slot.
728        ///             tokio::spawn(async {});
729        ///             // Blocking receive on the channe.
730        ///             rx.recv().unwrap();
731        ///             flush_metrics().await;
732        ///         }).await.unwrap();
733        ///         flush_metrics().await;
734        ///     }.await;
735        ///
736        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
737        ///     println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
738        ///
739        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 3
740        ///     println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
741        /// }
742        ///
743        /// async fn flush_metrics() {
744        ///     let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
745        /// }
746        /// ```
747        pub total_steal_operations: u64,
748
749        /// The maximum number of times any worker thread stole tasks from another worker thread.
750        ///
751        /// ##### Definition
752        /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
753        /// across all worker threads.
754        ///
755        /// ##### See also
756        /// - [`RuntimeMetrics::total_steal_operations`]
757        /// - [`RuntimeMetrics::min_steal_operations`]
758        pub max_steal_operations: u64,
759
760        /// The minimum number of times any worker thread stole tasks from another worker thread.
761        ///
762        /// ##### Definition
763        /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
764        /// across all worker threads.
765        ///
766        /// ##### See also
767        /// - [`RuntimeMetrics::total_steal_operations`]
768        /// - [`RuntimeMetrics::max_steal_operations`]
769        pub min_steal_operations: u64,
770
771        /// The number of tasks scheduled from **outside** of the runtime.
772        ///
773        /// The remote schedule count increases by one each time a task is woken from **outside** of
774        /// the runtime. This usually means that a task is spawned or notified from a non-runtime
775        /// thread and must be queued using the Runtime's global queue, which tends to be slower.
776        ///
777        /// ##### Definition
778        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::remote_schedule_count`].
779        ///
780        /// ##### Examples
781        /// In the below example, a remote schedule is induced by spawning a system thread, then
782        /// spawning a tokio task from that system thread:
783        /// ```
784        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
785        /// async fn main() {
786        ///     let handle = tokio::runtime::Handle::current();
787        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
788        ///     let mut intervals = monitor.intervals();
789        ///     let mut next_interval = || intervals.next().unwrap();
790        ///
791        ///     let interval = next_interval(); // end of first sampling interval
792        ///     assert_eq!(interval.num_remote_schedules, 0);
793        ///
794        ///     // spawn a non-runtime thread
795        ///     std::thread::spawn(move || {
796        ///         // spawn two tasks from this non-runtime thread
797        ///         async move {
798        ///             handle.spawn(async {}).await;
799        ///             handle.spawn(async {}).await;
800        ///         }
801        ///     }).join().unwrap().await;
802        ///
803        ///     let interval = next_interval(); // end of second sampling interval
804        ///     assert_eq!(interval.num_remote_schedules, 2);
805        ///
806        ///     let interval = next_interval(); // end of third sampling interval
807        ///     assert_eq!(interval.num_remote_schedules, 0);
808        /// }
809        /// ```
810        pub num_remote_schedules: u64,
811
812        /// The number of tasks scheduled from worker threads.
813        ///
814        /// The local schedule count increases by one each time a task is woken from **inside** of the
815        /// runtime. This usually means that a task is spawned or notified from within a runtime thread
816        /// and will be queued on the worker-local queue.
817        ///
818        /// ##### Definition
819        /// This metric is derived from the sum of
820        /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] across all worker threads.
821        ///
822        /// ##### See also
823        /// - [`RuntimeMetrics::min_local_schedule_count`]
824        /// - [`RuntimeMetrics::max_local_schedule_count`]
825        ///
826        /// ##### Examples
827        /// ###### With `current_thread` runtime
828        /// In the below example, two tasks are spawned from the context of a third tokio task:
829        /// ```
830        /// #[tokio::main(flavor = "current_thread")]
831        /// async fn main() {
832        ///     let handle = tokio::runtime::Handle::current();
833        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
834        ///     let mut intervals = monitor.intervals();
835        ///     let mut next_interval = || intervals.next().unwrap();
836        ///
837        ///     let interval = { flush_metrics().await; next_interval() }; // end interval 2
838        ///     assert_eq!(interval.total_local_schedule_count, 0);
839        ///
840        ///     let task = async {
841        ///         tokio::spawn(async {}); // local schedule 1
842        ///         tokio::spawn(async {}); // local schedule 2
843        ///     };
844        ///
845        ///     let handle = tokio::spawn(task); // local schedule 3
846        ///
847        ///     let interval = { flush_metrics().await; next_interval() }; // end interval 2
848        ///     assert_eq!(interval.total_local_schedule_count, 3);
849        ///
850        ///     let _ = handle.await;
851        ///
852        ///     let interval = { flush_metrics().await; next_interval() }; // end interval 3
853        ///     assert_eq!(interval.total_local_schedule_count, 0);
854        /// }
855        ///
856        /// async fn flush_metrics() {
857        ///     tokio::task::yield_now().await;
858        /// }
859        /// ```
860        ///
861        /// ###### With `multi_thread` runtime
862        /// In the below example, 100 tasks are spawned:
863        /// ```
864        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
865        /// async fn main() {
866        ///     let handle = tokio::runtime::Handle::current();
867        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
868        ///     let mut intervals = monitor.intervals();
869        ///     let mut next_interval = || intervals.next().unwrap();
870        ///
871        ///     let interval = next_interval(); // end of interval 1
872        ///     assert_eq!(interval.total_local_schedule_count, 0);
873        ///
874        ///     use std::sync::atomic::{AtomicBool, Ordering};
875        ///     static SPINLOCK: AtomicBool = AtomicBool::new(true);
876        ///
877        ///     // block the other worker thread
878        ///     tokio::spawn(async {
879        ///         while SPINLOCK.load(Ordering::SeqCst) {}
880        ///     });
881        ///
882        ///     // FIXME: why does this need to be in a `spawn`?
883        ///     let _ = tokio::spawn(async {
884        ///         // spawn 100 tasks
885        ///         for _ in 0..100 {
886        ///             tokio::spawn(async {});
887        ///         }
888        ///         // this spawns 1 more task
889        ///         flush_metrics().await;
890        ///     }).await;
891        ///
892        ///     // unblock the other worker thread
893        ///     SPINLOCK.store(false, Ordering::SeqCst);
894        ///
895        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
896        ///     assert_eq!(interval.total_local_schedule_count, 100 + 1);
897        /// }
898        ///
899        /// async fn flush_metrics() {
900        ///     let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
901        /// }
902        /// ```
903        pub total_local_schedule_count: u64,
904
905        /// The maximum number of tasks scheduled from any one worker thread.
906        ///
907        /// ##### Definition
908        /// This metric is derived from the maximum of
909        /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
910        ///
911        /// ##### See also
912        /// - [`RuntimeMetrics::total_local_schedule_count`]
913        /// - [`RuntimeMetrics::min_local_schedule_count`]
914        pub max_local_schedule_count: u64,
915
916        /// The minimum number of tasks scheduled from any one worker thread.
917        ///
918        /// ##### Definition
919        /// This metric is derived from the minimum of
920        /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
921        ///
922        /// ##### See also
923        /// - [`RuntimeMetrics::total_local_schedule_count`]
924        /// - [`RuntimeMetrics::max_local_schedule_count`]
925        pub min_local_schedule_count: u64,
926
927        /// The number of times worker threads saturated their local queues.
928        ///
929        /// The worker steal count increases by one each time the worker attempts to schedule a task
930        /// locally, but its local queue is full. When this happens, half of the
931        /// local queue is moved to the global queue.
932        ///
933        /// This metric only applies to the **multi-threaded** scheduler.
934        ///
935        /// ##### Definition
936        /// This metric is derived from the sum of
937        /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
938        ///
939        /// ##### See also
940        /// - [`RuntimeMetrics::min_overflow_count`]
941        /// - [`RuntimeMetrics::max_overflow_count`]
942        ///
943        /// ##### Examples
944        /// ```
945        /// #[tokio::main(flavor = "multi_thread", worker_threads = 1)]
946        /// async fn main() {
947        ///     let handle = tokio::runtime::Handle::current();
948        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
949        ///     let mut intervals = monitor.intervals();
950        ///     let mut next_interval = || intervals.next().unwrap();
951        ///
952        ///     let interval = next_interval(); // end of interval 1
953        ///     assert_eq!(interval.total_overflow_count, 0);
954        ///
955        ///     use std::sync::atomic::{AtomicBool, Ordering};
956        ///
957        ///     // spawn a ton of tasks
958        ///     let _ = tokio::spawn(async {
959        ///         // we do this in a `tokio::spawn` because it is impossible to
960        ///         // overflow the main task
961        ///         for _ in 0..300 {
962        ///             tokio::spawn(async {});
963        ///         }
964        ///     }).await;
965        ///
966        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
967        ///     assert_eq!(interval.total_overflow_count, 1);
968        /// }
969        ///
970        /// async fn flush_metrics() {
971        ///     let _ = tokio::time::sleep(std::time::Duration::from_millis(1)).await;
972        /// }
973        /// ```
974        pub total_overflow_count: u64,
975
976        /// The maximum number of times any one worker saturated its local queue.
977        ///
978        /// ##### Definition
979        /// This metric is derived from the maximum of
980        /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
981        ///
982        /// ##### See also
983        /// - [`RuntimeMetrics::total_overflow_count`]
984        /// - [`RuntimeMetrics::min_overflow_count`]
985        pub max_overflow_count: u64,
986
987        /// The minimum number of times any one worker saturated its local queue.
988        ///
989        /// ##### Definition
990        /// This metric is derived from the maximum of
991        /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
992        ///
993        /// ##### See also
994        /// - [`RuntimeMetrics::total_overflow_count`]
995        /// - [`RuntimeMetrics::max_overflow_count`]
996        pub min_overflow_count: u64,
997
998        /// The number of tasks that have been polled across all worker threads.
999        ///
1000        /// The worker poll count increases by one each time a worker polls a scheduled task.
1001        ///
1002        /// ##### Definition
1003        /// This metric is derived from the sum of
1004        /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1005        ///
1006        /// ##### See also
1007        /// - [`RuntimeMetrics::min_polls_count`]
1008        /// - [`RuntimeMetrics::max_polls_count`]
1009        ///
1010        /// ##### Examples
1011        /// In the below example, 42 tasks are spawned and polled:
1012        /// ```
1013        /// #[tokio::main(flavor = "current_thread")]
1014        /// async fn main() {
1015        ///     let handle = tokio::runtime::Handle::current();
1016        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1017        ///     let mut intervals = monitor.intervals();
1018        ///     let mut next_interval = || intervals.next().unwrap();
1019        ///
1020        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 1
1021        ///     assert_eq!(interval.total_polls_count, 0);
1022        ///     assert_eq!(interval.min_polls_count, 0);
1023        ///     assert_eq!(interval.max_polls_count, 0);
1024        ///
1025        ///     const N: u64 = 42;
1026        ///
1027        ///     for _ in 0..N {
1028        ///         let _ = tokio::spawn(async {}).await;
1029        ///     }
1030        ///
1031        ///     let interval = { flush_metrics().await; next_interval() }; // end of interval 2
1032        ///     assert_eq!(interval.total_polls_count, N);
1033        ///     assert_eq!(interval.min_polls_count, N);
1034        ///     assert_eq!(interval.max_polls_count, N);
1035        /// }
1036        ///
1037        /// async fn flush_metrics() {
1038        ///     let _ = tokio::task::yield_now().await;
1039        /// }
1040        /// ```
1041        pub total_polls_count: u64,
1042
1043        /// The maximum number of tasks that have been polled in any worker thread.
1044        ///
1045        /// ##### Definition
1046        /// This metric is derived from the maximum of
1047        /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1048        ///
1049        /// ##### See also
1050        /// - [`RuntimeMetrics::total_polls_count`]
1051        /// - [`RuntimeMetrics::min_polls_count`]
1052        pub max_polls_count: u64,
1053
1054        /// The minimum number of tasks that have been polled in any worker thread.
1055        ///
1056        /// ##### Definition
1057        /// This metric is derived from the minimum of
1058        /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1059        ///
1060        /// ##### See also
1061        /// - [`RuntimeMetrics::total_polls_count`]
1062        /// - [`RuntimeMetrics::max_polls_count`]
1063        pub min_polls_count: u64,
1064
1065        /// The total number of tasks currently scheduled in workers' local queues.
1066        ///
1067        /// Tasks that are spawned or notified from within a runtime thread are scheduled using that
1068        /// worker's local queue. This metric returns the **current** number of tasks pending in all
1069        /// workers' local queues. As such, the returned value may increase or decrease as new tasks
1070        /// are scheduled and processed.
1071        ///
1072        /// ##### Definition
1073        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`].
1074        ///
1075        /// ##### See also
1076        /// - [`RuntimeMetrics::min_local_queue_depth`]
1077        /// - [`RuntimeMetrics::max_local_queue_depth`]
1078        ///
1079        /// ##### Example
1080        ///
1081        /// ###### With `current_thread` runtime
1082        /// The below example spawns 100 tasks:
1083        /// ```
1084        /// #[tokio::main(flavor = "current_thread")]
1085        /// async fn main() {
1086        ///     const N: usize = 100;
1087        ///
1088        ///     let handle = tokio::runtime::Handle::current();
1089        ///     let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1090        ///     let mut intervals = monitor.intervals();
1091        ///     let mut next_interval = || intervals.next().unwrap();
1092        ///
1093        ///     let interval =  next_interval(); // end of interval 1
1094        ///     assert_eq!(interval.total_local_queue_depth, 0);
1095        ///
1096        ///
1097        ///     for _ in 0..N {
1098        ///         tokio::spawn(async {});
1099        ///     }
1100        ///     let interval =  next_interval(); // end of interval 2
1101        ///     assert_eq!(interval.total_local_queue_depth, N);
1102        /// }
1103        /// ```
1104        ///
1105        /// ###### With `multi_thread` runtime
1106        /// The below example spawns 100 tasks and observes them in the
1107        /// local queue:
1108        /// ```
1109        /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
1110        /// async fn main() {
1111        ///     use std::sync::mpsc;
1112        ///     use tokio::sync::oneshot;
1113        ///
1114        ///     const N: usize = 100;
1115        /// 
1116        ///     let handle = tokio::runtime::Handle::current();
1117        ///
1118        ///     // block one worker so the other is the only one running
1119        ///     let (block_tx, block_rx) = mpsc::channel::<()>();
1120        ///     let (started_tx, started_rx) = oneshot::channel();
1121        ///     tokio::spawn(async move {
1122        ///         let _ = started_tx.send(());
1123        ///         let _ = block_rx.recv();
1124        ///     });
1125        ///     let _ = started_rx.await;
1126        ///
1127        ///     // spawn + sample from the free worker thread
1128        ///     let (depth_tx, depth_rx) = oneshot::channel();
1129        ///     tokio::spawn(async move {
1130        ///         let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1131        ///         let mut intervals = monitor.intervals();
1132        ///         let _ = intervals.next().unwrap(); // baseline
1133        ///
1134        ///         for _ in 0..N {
1135        ///             tokio::spawn(async {});
1136        ///         }
1137        ///
1138        ///         let depth = intervals.next().unwrap().total_local_queue_depth;
1139        ///         let _ = depth_tx.send(depth);
1140        ///     });
1141        ///
1142        ///     let depth = depth_rx.await.unwrap();
1143        ///
1144        ///     // Tokio may place one spawned task in a LIFO slot rather than the
1145        ///     // local queue, which may not be reflected in `worker_local_queue_depth`,
1146        ///     // so accept N or N - 1.
1147        ///     assert!(depth == N || depth == N - 1, "depth = {depth}");
1148        ///
1149        ///     let _ = block_tx.send(());
1150        /// }
1151        /// ```
1152        pub total_local_queue_depth: usize,
1153
1154        /// The maximum number of tasks currently scheduled any worker's local queue.
1155        ///
1156        /// ##### Definition
1157        /// This metric is derived from the maximum of
1158        /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1159        ///
1160        /// ##### See also
1161        /// - [`RuntimeMetrics::total_local_queue_depth`]
1162        /// - [`RuntimeMetrics::min_local_queue_depth`]
1163        pub max_local_queue_depth: usize,
1164
1165        /// The minimum number of tasks currently scheduled any worker's local queue.
1166        ///
1167        /// ##### Definition
1168        /// This metric is derived from the minimum of
1169        /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1170        ///
1171        /// ##### See also
1172        /// - [`RuntimeMetrics::total_local_queue_depth`]
1173        /// - [`RuntimeMetrics::max_local_queue_depth`]
1174        pub min_local_queue_depth: usize,
1175
1176        /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool.
1177        ///
1178        /// ##### Definition
1179        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::blocking_queue_depth`].
1180        pub blocking_queue_depth: usize,
1181
1182        /// The number of additional threads spawned by the runtime.
1183        ///
1184        /// ##### Definition
1185        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_blocking_threads`].
1186        pub blocking_threads_count: usize,
1187
1188        /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls.
1189        ///
1190        /// ##### Definition
1191        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_idle_blocking_threads`].
1192        pub idle_blocking_threads_count: usize,
1193
1194        /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets.
1195        ///
1196        /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget.
1197        ///
1198        /// The counter is monotonically increasing. It is never decremented or reset to zero.
1199        ///
1200        /// ##### Definition
1201        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::budget_forced_yield_count`].
1202        pub budget_forced_yield_count: u64,
1203
1204        /// Returns the number of ready events processed by the runtime’s I/O driver.
1205        ///
1206        /// ##### Definition
1207        /// This metric is derived from [`tokio::runtime::RuntimeMetrics::io_driver_ready_count`].
1208        pub io_driver_ready_count: u64,
1209    }
1210}
1211
1212macro_rules! define_semi_stable {
1213    (
1214    $(#[$($attributes:tt)*])*
1215    $vis:vis struct $name:ident {
1216        stable {
1217            $($stable_name:ident: $stable_ty:ty),*
1218            $(,)?
1219        }
1220        $(,)?
1221        unstable {
1222            $($unstable_name:ident: $unstable_ty:ty),*
1223            $(,)?
1224        }
1225    }
1226    ) => {
1227        $(#[$($attributes)*])*
1228        $vis struct $name {
1229            $(
1230                $stable_name: $stable_ty,
1231            )*
1232            $(
1233                #[cfg(tokio_unstable)]
1234                #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
1235                $unstable_name: $unstable_ty,
1236            )*
1237        }
1238    };
1239}
1240
1241define_semi_stable! {
1242    /// Snapshot of per-worker metrics
1243    #[derive(Debug, Default)]
1244    struct Worker {
1245        stable {
1246            worker: usize,
1247            total_park_count: u64,
1248            total_busy_duration: Duration,
1249        }
1250        unstable {
1251            total_noop_count: u64,
1252            total_steal_count: u64,
1253            total_steal_operations: u64,
1254            total_local_schedule_count: u64,
1255            total_overflow_count: u64,
1256            total_polls_count: u64,
1257            poll_time_histogram: Vec<u64>,
1258        }
1259    }
1260}
1261
1262define_semi_stable! {
1263    /// Iterator returned by [`RuntimeMonitor::intervals`].
1264    ///
1265    /// See that method's documentation for more details.
1266    #[derive(Debug)]
1267    pub struct RuntimeIntervals {
1268        stable {
1269            runtime: runtime::RuntimeMetrics,
1270            started_at: Instant,
1271            workers: Vec<Worker>,
1272        }
1273        unstable {
1274            // Number of tasks scheduled from *outside* of the runtime
1275            num_remote_schedules: u64,
1276            budget_forced_yield_count: u64,
1277            io_driver_ready_count: u64,
1278            // Cached bucket ranges, static config that doesn't change after runtime creation.
1279            bucket_ranges: Vec<Range<Duration>>,
1280        }
1281    }
1282}
1283
1284impl RuntimeIntervals {
1285    fn probe(&mut self) -> RuntimeMetrics {
1286        let now = Instant::now();
1287
1288        let mut metrics = RuntimeMetrics {
1289            workers_count: self.runtime.num_workers(),
1290            live_tasks_count: self.runtime.num_alive_tasks(),
1291            elapsed: now.saturating_duration_since(self.started_at),
1292            global_queue_depth: self.runtime.global_queue_depth(),
1293            min_park_count: u64::MAX,
1294            min_busy_duration: Duration::from_secs(1000000000),
1295            ..Default::default()
1296        };
1297
1298        #[cfg(tokio_unstable)]
1299        {
1300            let num_remote_schedules = self.runtime.remote_schedule_count();
1301            let budget_forced_yields = self.runtime.budget_forced_yield_count();
1302            let io_driver_ready_events = self.runtime.io_driver_ready_count();
1303
1304            metrics.num_remote_schedules = num_remote_schedules.saturating_sub(self.num_remote_schedules);
1305            metrics.min_noop_count = u64::MAX;
1306            metrics.min_steal_count = u64::MAX;
1307            metrics.min_local_schedule_count = u64::MAX;
1308            metrics.min_overflow_count = u64::MAX;
1309            metrics.min_polls_count = u64::MAX;
1310            metrics.min_local_queue_depth = usize::MAX;
1311            metrics.mean_poll_duration_worker_min = Duration::MAX;
1312            metrics.poll_time_histogram = PollTimeHistogram::new(
1313                self.bucket_ranges
1314                    .iter()
1315                    .map(|range| HistogramBucket::new(range.start, range.end, 0))
1316                    .collect(),
1317            );
1318            metrics.budget_forced_yield_count =
1319                budget_forced_yields.saturating_sub(self.budget_forced_yield_count);
1320            metrics.io_driver_ready_count = io_driver_ready_events.saturating_sub(self.io_driver_ready_count);
1321
1322            self.num_remote_schedules = num_remote_schedules;
1323            self.budget_forced_yield_count = budget_forced_yields;
1324            self.io_driver_ready_count = io_driver_ready_events;
1325        }
1326        self.started_at = now;
1327
1328        for worker in &mut self.workers {
1329            worker.probe(&self.runtime, &mut metrics);
1330        }
1331
1332        #[cfg(tokio_unstable)]
1333        {
1334            if metrics.total_polls_count == 0 {
1335                debug_assert_eq!(metrics.mean_poll_duration, Duration::default());
1336
1337                metrics.mean_poll_duration_worker_max = Duration::default();
1338                metrics.mean_poll_duration_worker_min = Duration::default();
1339            }
1340        }
1341
1342        metrics
1343    }
1344}
1345
1346impl Iterator for RuntimeIntervals {
1347    type Item = RuntimeMetrics;
1348
1349    fn next(&mut self) -> Option<RuntimeMetrics> {
1350        Some(self.probe())
1351    }
1352}
1353
1354impl RuntimeMonitor {
1355    /// Creates a new [`RuntimeMonitor`].
1356    pub fn new(runtime: &runtime::Handle) -> RuntimeMonitor {
1357        let runtime = runtime.metrics();
1358
1359        RuntimeMonitor { runtime }
1360    }
1361
1362    /// Produces an unending iterator of [`RuntimeMetrics`].
1363    ///
1364    /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1365    /// produced by [`RuntimeMonitor::intervals`]. The item type of this iterator is [`RuntimeMetrics`],
1366    /// which is a bundle of runtime metrics that describe *only* changes occurring within that sampling
1367    /// interval.
1368    ///
1369    /// # Example
1370    ///
1371    /// ```
1372    /// use std::time::Duration;
1373    ///
1374    /// #[tokio::main]
1375    /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1376    ///     let handle = tokio::runtime::Handle::current();
1377    ///     // construct the runtime metrics monitor
1378    ///     let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1379    ///
1380    ///     // print runtime metrics every 500ms
1381    ///     {
1382    ///         tokio::spawn(async move {
1383    ///             for interval in runtime_monitor.intervals() {
1384    ///                 // pretty-print the metric interval
1385    ///                 println!("{:?}", interval);
1386    ///                 // wait 500ms
1387    ///                 tokio::time::sleep(Duration::from_millis(500)).await;
1388    ///             }
1389    ///         });
1390    ///     }
1391    ///
1392    ///     // await some tasks
1393    ///     tokio::join![
1394    ///         do_work(),
1395    ///         do_work(),
1396    ///         do_work(),
1397    ///     ];
1398    ///
1399    ///     Ok(())
1400    /// }
1401    ///
1402    /// async fn do_work() {
1403    ///     for _ in 0..25 {
1404    ///         tokio::task::yield_now().await;
1405    ///         tokio::time::sleep(Duration::from_millis(100)).await;
1406    ///     }
1407    /// }
1408    /// ```
1409    pub fn intervals(&self) -> RuntimeIntervals {
1410        let started_at = Instant::now();
1411
1412        let workers = (0..self.runtime.num_workers())
1413            .map(|worker| Worker::new(worker, &self.runtime))
1414            .collect();
1415
1416        RuntimeIntervals {
1417            runtime: self.runtime.clone(),
1418            started_at,
1419            workers,
1420
1421            #[cfg(tokio_unstable)]
1422            num_remote_schedules: self.runtime.remote_schedule_count(),
1423            #[cfg(tokio_unstable)]
1424            budget_forced_yield_count: self.runtime.budget_forced_yield_count(),
1425            #[cfg(tokio_unstable)]
1426            io_driver_ready_count: self.runtime.io_driver_ready_count(),
1427            #[cfg(tokio_unstable)]
1428            bucket_ranges: (0..self.runtime.poll_time_histogram_num_buckets())
1429                .map(|i| self.runtime.poll_time_histogram_bucket_range(i))
1430                .collect(),
1431        }
1432    }
1433}
1434
1435impl Worker {
1436    fn new(worker: usize, rt: &runtime::RuntimeMetrics) -> Worker {
1437        #[allow(unused_mut, clippy::needless_update)]
1438        let mut wrk = Worker {
1439            worker,
1440            total_park_count: rt.worker_park_count(worker),
1441            total_busy_duration: rt.worker_total_busy_duration(worker),
1442            ..Default::default()
1443        };
1444
1445        #[cfg(tokio_unstable)]
1446        {
1447            let poll_time_histogram = if rt.poll_time_histogram_enabled() {
1448                vec![0; rt.poll_time_histogram_num_buckets()]
1449            } else {
1450                vec![]
1451            };
1452            wrk.total_noop_count = rt.worker_noop_count(worker);
1453            wrk.total_steal_count = rt.worker_steal_count(worker);
1454            wrk.total_steal_operations = rt.worker_steal_operations(worker);
1455            wrk.total_local_schedule_count = rt.worker_local_schedule_count(worker);
1456            wrk.total_overflow_count = rt.worker_overflow_count(worker);
1457            wrk.total_polls_count = rt.worker_poll_count(worker);
1458            wrk.poll_time_histogram = poll_time_histogram;
1459        };
1460        wrk
1461    }
1462
1463    fn probe(&mut self, rt: &runtime::RuntimeMetrics, metrics: &mut RuntimeMetrics) {
1464        macro_rules! metric {
1465            ( $sum:ident, $max:ident, $min:ident, $probe:ident ) => {{
1466                let val = rt.$probe(self.worker);
1467                let delta = val - self.$sum;
1468                self.$sum = val;
1469
1470                metrics.$sum += delta;
1471
1472                if delta > metrics.$max {
1473                    metrics.$max = delta;
1474                }
1475
1476                if delta < metrics.$min {
1477                    metrics.$min = delta;
1478                }
1479            }};
1480        }
1481
1482        metric!(
1483            total_park_count,
1484            max_park_count,
1485            min_park_count,
1486            worker_park_count
1487        );
1488        metric!(
1489            total_busy_duration,
1490            max_busy_duration,
1491            min_busy_duration,
1492            worker_total_busy_duration
1493        );
1494
1495        #[cfg(tokio_unstable)]
1496        {
1497            let mut worker_polls_count = self.total_polls_count;
1498            let total_polls_count = metrics.total_polls_count;
1499
1500            metric!(
1501                total_noop_count,
1502                max_noop_count,
1503                min_noop_count,
1504                worker_noop_count
1505            );
1506            metric!(
1507                total_steal_count,
1508                max_steal_count,
1509                min_steal_count,
1510                worker_steal_count
1511            );
1512            metric!(
1513                total_steal_operations,
1514                max_steal_operations,
1515                min_steal_operations,
1516                worker_steal_operations
1517            );
1518            metric!(
1519                total_local_schedule_count,
1520                max_local_schedule_count,
1521                min_local_schedule_count,
1522                worker_local_schedule_count
1523            );
1524            metric!(
1525                total_overflow_count,
1526                max_overflow_count,
1527                min_overflow_count,
1528                worker_overflow_count
1529            );
1530            metric!(
1531                total_polls_count,
1532                max_polls_count,
1533                min_polls_count,
1534                worker_poll_count
1535            );
1536
1537            // Get the number of polls since last probe
1538            worker_polls_count = self.total_polls_count.saturating_sub(worker_polls_count);
1539
1540            // Update the mean task poll duration if there were polls
1541            if worker_polls_count > 0 {
1542                let val = rt.worker_mean_poll_time(self.worker);
1543
1544                if val > metrics.mean_poll_duration_worker_max {
1545                    metrics.mean_poll_duration_worker_max = val;
1546                }
1547
1548                if val < metrics.mean_poll_duration_worker_min {
1549                    metrics.mean_poll_duration_worker_min = val;
1550                }
1551
1552                // First, scale the current value down
1553                let ratio = total_polls_count as f64 / metrics.total_polls_count as f64;
1554                let mut mean = metrics.mean_poll_duration.as_nanos() as f64 * ratio;
1555
1556                // Add the scaled current worker's mean poll duration
1557                let ratio = worker_polls_count as f64 / metrics.total_polls_count as f64;
1558                mean += val.as_nanos() as f64 * ratio;
1559
1560                metrics.mean_poll_duration = Duration::from_nanos(mean as u64);
1561            }
1562
1563            // Update the histogram counts if there were polls since last count
1564            if worker_polls_count > 0 {
1565                for (bucket, entry) in metrics.poll_time_histogram.buckets_mut().iter_mut().enumerate() {
1566                    let new = rt.poll_time_histogram_bucket_count(self.worker, bucket);
1567                    let delta = new.saturating_sub(self.poll_time_histogram[bucket]);
1568                    self.poll_time_histogram[bucket] = new;
1569
1570                    entry.add_count(delta);
1571                }
1572            }
1573
1574            // Local scheduled tasks is an absolute value
1575            let local_scheduled_tasks = rt.worker_local_queue_depth(self.worker);
1576            metrics.total_local_queue_depth = metrics.total_local_queue_depth.saturating_add(local_scheduled_tasks);
1577
1578            if local_scheduled_tasks > metrics.max_local_queue_depth {
1579                metrics.max_local_queue_depth = local_scheduled_tasks;
1580            }
1581
1582            if local_scheduled_tasks < metrics.min_local_queue_depth {
1583                metrics.min_local_queue_depth = local_scheduled_tasks;
1584            }
1585
1586            // Blocking queue depth is an absolute value too
1587            metrics.blocking_queue_depth = rt.blocking_queue_depth();
1588
1589            metrics.blocking_threads_count = rt.num_blocking_threads();
1590            metrics.idle_blocking_threads_count = rt.num_idle_blocking_threads();
1591        }
1592    }
1593}
1594
1595derived_metrics!(
1596    [RuntimeMetrics] {
1597        stable {
1598            /// Returns the ratio of the [`RuntimeMetrics::total_busy_duration`] to the [`RuntimeMetrics::elapsed`].
1599            pub fn busy_ratio(&self) -> f64 {
1600                self.total_busy_duration.as_nanos() as f64 / self.elapsed.as_nanos() as f64
1601            }
1602        }
1603        unstable {
1604            /// Returns the ratio of the [`RuntimeMetrics::total_polls_count`] to the [`RuntimeMetrics::total_noop_count`].
1605            pub fn mean_polls_per_park(&self) -> f64 {
1606                let total_park_count = self.total_park_count.saturating_sub(self.total_noop_count);
1607                if total_park_count == 0 {
1608                    0.0
1609                } else {
1610                    self.total_polls_count as f64 / total_park_count as f64
1611                }
1612            }
1613        }
1614    }
1615);
1616
1617#[cfg(all(test, tokio_unstable, feature = "metrique-integration"))]
1618mod metrique_integration_tests {
1619    use super::*;
1620    use metrique::test_util::test_metric;
1621
1622    /// Compile-time regression: if a field is added whose type doesn't
1623    /// implement `CloseValue`, this will fail to compile.
1624    #[test]
1625    fn metrique_integration_produces_expected_fields() {
1626        let metrics = RuntimeMetrics {
1627            workers_count: 4,
1628            total_park_count: 100,
1629            poll_time_histogram: PollTimeHistogram::new(vec![
1630                HistogramBucket::new(Duration::from_micros(0), Duration::from_micros(100), 10),
1631                HistogramBucket::new(Duration::from_micros(100), Duration::from_micros(200), 0),
1632                HistogramBucket::new(Duration::from_micros(200), Duration::from_micros(500), 3),
1633            ]),
1634            ..Default::default()
1635        };
1636
1637        let entry = test_metric(metrics);
1638
1639        // Stable fields
1640        assert_eq!(entry.metrics["workers_count"], 4);
1641        assert_eq!(entry.metrics["total_park_count"], 100);
1642        assert_eq!(entry.metrics["elapsed"].as_f64(), 0.0);
1643        assert_eq!(entry.metrics["total_busy_duration"].as_f64(), 0.0);
1644        assert_eq!(entry.metrics["global_queue_depth"].as_u64(), 0);
1645
1646        // Unstable fields
1647        assert_eq!(entry.metrics["mean_poll_duration"].as_f64(), 0.0);
1648        assert_eq!(entry.metrics["total_steal_count"].as_u64(), 0);
1649        assert_eq!(entry.metrics["total_polls_count"].as_u64(), 0);
1650
1651        // 2 non-zero buckets (count 10 and 3) should produce 2 observations
1652        let hist = &entry.metrics["poll_time_histogram"];
1653        assert_eq!(hist.distribution.len(), 2, "expected 2 non-zero buckets");
1654
1655        // midpoint of 0..100µs = 50µs, count = 10
1656        match hist.distribution[0] {
1657            metrique::writer::Observation::Repeated { total, occurrences } => {
1658                assert_eq!(occurrences, 10);
1659                assert!((total - 500.0).abs() < 0.01, "expected 50 * 10 = 500, got {total}");
1660            }
1661            other => panic!("expected Repeated, got {other:?}"),
1662        }
1663
1664        // midpoint of 200..500µs = 350µs, count = 3
1665        match hist.distribution[1] {
1666            metrique::writer::Observation::Repeated { total, occurrences } => {
1667                assert_eq!(occurrences, 3);
1668                assert!((total - 1050.0).abs() < 0.01, "expected 350 * 3 = 1050, got {total}");
1669            }
1670            other => panic!("expected Repeated, got {other:?}"),
1671        }
1672    }
1673
1674    /// Collect `RuntimeMetrics` from a live Tokio runtime and verify the pipeline produces valid output.
1675    #[cfg(feature = "rt")]
1676    #[test]
1677    fn metrique_end_to_end() {
1678        let rt = tokio::runtime::Builder::new_current_thread()
1679            .enable_all()
1680            .enable_metrics_poll_time_histogram()
1681            .build()
1682            .unwrap();
1683
1684        rt.block_on(async {
1685            let handle = tokio::runtime::Handle::current();
1686            let monitor = RuntimeMonitor::new(&handle);
1687            let mut intervals = monitor.intervals();
1688
1689            let _ = intervals.next().unwrap();
1690
1691            // Spawn tasks to create some work for the runtime to poll.
1692            let mut metrics_with_polls = None;
1693            for _ in 0..4 {
1694                for _ in 0..25 {
1695                    tokio::spawn(async {
1696                        tokio::task::yield_now().await;
1697                    })
1698                    .await
1699                    .unwrap();
1700                }
1701                // Slow poll (>900µs) to land in the last histogram bucket.
1702                tokio::spawn(async {
1703                    std::thread::sleep(Duration::from_millis(1));
1704                })
1705                .await
1706                .unwrap();
1707
1708                let metrics = intervals.next().unwrap();
1709                let total_polls: u64 = metrics.poll_time_histogram.buckets().iter().map(|b| b.count()).sum();
1710                if total_polls > 0 {
1711                    metrics_with_polls = Some(metrics);
1712                    break;
1713                }
1714            }
1715            let metrics = metrics_with_polls.expect("expected polls to be recorded within 4 sampled intervals");
1716
1717            let expected_workers_count = metrics.workers_count;
1718            let expected_non_zero_buckets = metrics
1719                .poll_time_histogram
1720                .buckets()
1721                .iter()
1722                .filter(|b| b.count() > 0)
1723                .count();
1724
1725            let expected_total_polls: u64 = metrics.poll_time_histogram.buckets().iter().map(|b| b.count()).sum();
1726            assert!(expected_workers_count > 0);
1727            assert!(expected_total_polls > 0);
1728
1729            let last_bucket = metrics.poll_time_histogram.buckets().last().unwrap();
1730
1731            // Sanity check: Tokio's last histogram bucket ends at Duration::from_nanos(u64::MAX)
1732            assert_eq!(last_bucket.range_end(), Duration::from_nanos(u64::MAX));
1733            assert!(last_bucket.count() > 0, "expected slow poll to land in last bucket");
1734            let last_bucket_start_us = last_bucket.range_start().as_micros() as f64;
1735            let last_bucket_count = last_bucket.count();
1736
1737            let entry = test_metric(metrics);
1738
1739            assert_eq!(entry.metrics["workers_count"], expected_workers_count as u64);
1740            assert!(entry.metrics["elapsed"].as_f64() >= 0.0);
1741            assert!(entry.metrics["total_busy_duration"].as_f64() >= 0.0);
1742
1743            let hist = &entry.metrics["poll_time_histogram"];
1744            assert_eq!(hist.distribution.len(), expected_non_zero_buckets);
1745            let observed_total_occurrences: u64 = hist
1746                .distribution
1747                .iter()
1748                .map(|obs| match obs {
1749                    metrique::writer::Observation::Repeated { occurrences, .. } => *occurrences,
1750                    other => panic!("expected Repeated, got {other:?}"),
1751                })
1752                .sum();
1753            assert_eq!(observed_total_occurrences, expected_total_polls);
1754
1755            // The last observation corresponds to the last histogram bucket.
1756            // Verify it uses range_start as the representative value instead of a midpoint,
1757            // since the last bucket range_end is Duration::from_nanos(u64::MAX).
1758            let last_obs = hist.distribution.last().unwrap();
1759            match last_obs {
1760                metrique::writer::Observation::Repeated { total, occurrences } => {
1761                    assert_eq!(*occurrences, last_bucket_count);
1762                    let expected_total = last_bucket_start_us * last_bucket_count as f64;
1763                    assert!(
1764                        (total - expected_total).abs() < 0.01,
1765                        "last bucket should use range_start ({last_bucket_start_us}µs) as representative value, \
1766                         expected total={expected_total}, got {total}"
1767                    );
1768                }
1769                other => panic!("expected Repeated, got {other:?}"),
1770            }
1771        });
1772    }
1773}