tokio_metrics/runtime.rs
1use crate::derived_metrics::derived_metrics;
2#[cfg(tokio_unstable)]
3use std::ops::Range;
4use std::time::{Duration, Instant};
5use tokio::runtime;
6
7#[cfg(tokio_unstable)]
8mod poll_time_histogram;
9#[cfg(tokio_unstable)]
10pub use poll_time_histogram::{HistogramBucket, PollTimeHistogram};
11
12#[cfg(feature = "metrics-rs-integration")]
13pub(crate) mod metrics_rs_integration;
14
15/// Monitors key metrics of the tokio runtime.
16///
17/// ### Usage
18/// ```
19/// use std::time::Duration;
20/// use tokio_metrics::RuntimeMonitor;
21///
22/// #[tokio::main]
23/// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
24/// let handle = tokio::runtime::Handle::current();
25///
26/// // print runtime metrics every 500ms
27/// {
28/// let runtime_monitor = RuntimeMonitor::new(&handle);
29/// tokio::spawn(async move {
30/// for interval in runtime_monitor.intervals() {
31/// // pretty-print the metric interval
32/// println!("{:?}", interval);
33/// // wait 500ms
34/// tokio::time::sleep(Duration::from_millis(500)).await;
35/// }
36/// });
37/// }
38///
39/// // await some tasks
40/// tokio::join![
41/// do_work(),
42/// do_work(),
43/// do_work(),
44/// ];
45///
46/// Ok(())
47/// }
48///
49/// async fn do_work() {
50/// for _ in 0..25 {
51/// tokio::task::yield_now().await;
52/// tokio::time::sleep(Duration::from_millis(100)).await;
53/// }
54/// }
55/// ```
56#[derive(Debug)]
57pub struct RuntimeMonitor {
58 /// Handle to the runtime
59 runtime: runtime::RuntimeMetrics,
60}
61
62macro_rules! define_runtime_metrics {
63 (
64 stable {
65 $(
66 $(#[$($attributes:tt)*])*
67 $vis:vis $name:ident: $ty:ty
68 ),*
69 $(,)?
70 }
71 unstable {
72 $(
73 $(#[$($unstable_attributes:tt)*])*
74 $unstable_vis:vis $unstable_name:ident: $unstable_ty:ty
75 ),*
76 $(,)?
77 }
78 ) => {
79 /// Key runtime metrics.
80 #[non_exhaustive]
81 #[cfg_attr(feature = "metrique-integration", metrique::unit_of_work::metrics(subfield_owned))]
82 #[derive(Default, Debug, Clone)]
83 pub struct RuntimeMetrics {
84 $(
85 $(#[$($attributes)*])*
86 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
87 $vis $name: $ty,
88 )*
89 $(
90 $(#[$($unstable_attributes)*])*
91 #[cfg(tokio_unstable)]
92 #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
93 $unstable_vis $unstable_name: $unstable_ty,
94 )*
95 }
96 };
97}
98
99define_runtime_metrics! {
100 stable {
101 /// The number of worker threads used by the runtime.
102 ///
103 /// This metric is static for a runtime.
104 ///
105 /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`].
106 /// When using the `current_thread` runtime, the return value is always `1`.
107 ///
108 /// The number of workers is set by configuring
109 /// [`worker_threads`][`tokio::runtime::Builder::worker_threads`] with
110 /// [`tokio::runtime::Builder`], or by parameterizing [`tokio::main`].
111 ///
112 /// ##### Examples
113 /// In the below example, the number of workers is set by parameterizing [`tokio::main`]:
114 /// ```
115 /// use tokio::runtime::Handle;
116 ///
117 /// #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
118 /// async fn main() {
119 /// let handle = tokio::runtime::Handle::current();
120 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
121 /// let mut intervals = monitor.intervals();
122 /// let mut next_interval = || intervals.next().unwrap();
123 ///
124 /// assert_eq!(next_interval().workers_count, 10);
125 /// }
126 /// ```
127 ///
128 /// [`tokio::main`]: https://docs.rs/tokio/latest/tokio/attr.main.html
129 ///
130 /// When using the `current_thread` runtime, the return value is always `1`; e.g.:
131 /// ```
132 /// use tokio::runtime::Handle;
133 ///
134 /// #[tokio::main(flavor = "current_thread")]
135 /// async fn main() {
136 /// let handle = tokio::runtime::Handle::current();
137 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
138 /// let mut intervals = monitor.intervals();
139 /// let mut next_interval = || intervals.next().unwrap();
140 ///
141 /// assert_eq!(next_interval().workers_count, 1);
142 /// }
143 /// ```
144 ///
145 /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`]; e.g.:
146 /// ```
147 /// use tokio::runtime::Handle;
148 ///
149 /// #[tokio::main]
150 /// async fn main() {
151 /// let handle = Handle::current();
152 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
153 /// let mut intervals = monitor.intervals();
154 /// let mut next_interval = || intervals.next().unwrap();
155 ///
156 /// assert_eq!(next_interval().workers_count, handle.metrics().num_workers());
157 /// }
158 /// ```
159 pub workers_count: usize,
160
161 /// The current number of alive tasks in the runtime.
162 ///
163 /// ##### Definition
164 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_alive_tasks`].
165 pub live_tasks_count: usize,
166
167 /// The number of times worker threads parked.
168 ///
169 /// The worker park count increases by one each time the worker parks the thread waiting for
170 /// new inbound events to process. This usually means the worker has processed all pending work
171 /// and is currently idle.
172 ///
173 /// ##### Definition
174 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_park_count`]
175 /// across all worker threads.
176 ///
177 /// ##### See also
178 /// - [`RuntimeMetrics::max_park_count`]
179 /// - [`RuntimeMetrics::min_park_count`]
180 ///
181 /// ##### Examples
182 /// ```
183 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
184 /// async fn main() {
185 /// let handle = tokio::runtime::Handle::current();
186 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
187 /// let mut intervals = monitor.intervals();
188 /// let mut next_interval = || intervals.next().unwrap();
189 ///
190 /// let interval = next_interval(); // end of interval 1
191 /// assert_eq!(interval.total_park_count, 0);
192 ///
193 /// induce_parks().await;
194 ///
195 /// let interval = next_interval(); // end of interval 2
196 /// assert!(interval.total_park_count >= 1); // usually 1 or 2 parks
197 /// }
198 ///
199 /// async fn induce_parks() {
200 /// let _ = tokio::time::timeout(std::time::Duration::ZERO, async {
201 /// loop { tokio::task::yield_now().await; }
202 /// }).await;
203 /// }
204 /// ```
205 pub total_park_count: u64,
206
207 /// The maximum number of times any worker thread parked.
208 ///
209 /// ##### Definition
210 /// This metric is derived from the maximum of
211 /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
212 ///
213 /// ##### See also
214 /// - [`RuntimeMetrics::total_park_count`]
215 /// - [`RuntimeMetrics::min_park_count`]
216 pub max_park_count: u64,
217
218 /// The minimum number of times any worker thread parked.
219 ///
220 /// ##### Definition
221 /// This metric is derived from the maximum of
222 /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
223 ///
224 /// ##### See also
225 /// - [`RuntimeMetrics::total_park_count`]
226 /// - [`RuntimeMetrics::max_park_count`]
227 pub min_park_count: u64,
228
229 /// The amount of time worker threads were busy.
230 ///
231 /// The worker busy duration increases whenever the worker is spending time processing work.
232 /// Using this value can indicate the total load of workers.
233 ///
234 /// ##### Definition
235 /// This metric is derived from the sum of
236 /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
237 ///
238 /// ##### See also
239 /// - [`RuntimeMetrics::min_busy_duration`]
240 /// - [`RuntimeMetrics::max_busy_duration`]
241 ///
242 /// ##### Examples
243 /// In the below example, tasks spend a total of 3s busy:
244 /// ```
245 /// use tokio::time::Duration;
246 ///
247 /// fn main() {
248 /// let start = tokio::time::Instant::now();
249 ///
250 /// let rt = tokio::runtime::Builder::new_current_thread()
251 /// .enable_all()
252 /// .build()
253 /// .unwrap();
254 ///
255 /// let handle = rt.handle();
256 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
257 /// let mut intervals = monitor.intervals();
258 /// let mut next_interval = || intervals.next().unwrap();
259 ///
260 /// let delay_1s = Duration::from_secs(1);
261 /// let delay_3s = Duration::from_secs(3);
262 ///
263 /// rt.block_on(async {
264 /// // keep the main task busy for 1s
265 /// spin_for(delay_1s);
266 ///
267 /// // spawn a task and keep it busy for 2s
268 /// let _ = tokio::spawn(async move {
269 /// spin_for(delay_3s);
270 /// }).await;
271 /// });
272 ///
273 /// // flush metrics
274 /// drop(rt);
275 ///
276 /// let elapsed = start.elapsed();
277 ///
278 /// let interval = next_interval(); // end of interval 2
279 /// assert!(interval.total_busy_duration >= delay_1s + delay_3s);
280 /// assert!(interval.total_busy_duration <= elapsed);
281 /// }
282 ///
283 /// fn time<F>(task: F) -> Duration
284 /// where
285 /// F: Fn() -> ()
286 /// {
287 /// let start = tokio::time::Instant::now();
288 /// task();
289 /// start.elapsed()
290 /// }
291 ///
292 /// /// Block the current thread for a given `duration`.
293 /// fn spin_for(duration: Duration) {
294 /// let start = tokio::time::Instant::now();
295 /// while start.elapsed() <= duration {}
296 /// }
297 /// ```
298 ///
299 /// Busy times may not accumulate as the above example suggests (FIXME: Why?); e.g., if we
300 /// remove the three second delay, the time spent busy falls to mere microseconds:
301 /// ```should_panic
302 /// use tokio::time::Duration;
303 ///
304 /// fn main() {
305 /// let rt = tokio::runtime::Builder::new_current_thread()
306 /// .enable_all()
307 /// .build()
308 /// .unwrap();
309 ///
310 /// let handle = rt.handle();
311 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
312 /// let mut intervals = monitor.intervals();
313 /// let mut next_interval = || intervals.next().unwrap();
314 ///
315 /// let delay_1s = Duration::from_secs(1);
316 ///
317 /// let elapsed = time(|| rt.block_on(async {
318 /// // keep the main task busy for 1s
319 /// spin_for(delay_1s);
320 /// }));
321 ///
322 /// // flush metrics
323 /// drop(rt);
324 ///
325 /// let interval = next_interval(); // end of interval 2
326 /// assert!(interval.total_busy_duration >= delay_1s); // FAIL
327 /// assert!(interval.total_busy_duration <= elapsed);
328 /// }
329 ///
330 /// fn time<F>(task: F) -> Duration
331 /// where
332 /// F: Fn() -> ()
333 /// {
334 /// let start = tokio::time::Instant::now();
335 /// task();
336 /// start.elapsed()
337 /// }
338 ///
339 /// /// Block the current thread for a given `duration`.
340 /// fn spin_for(duration: Duration) {
341 /// let start = tokio::time::Instant::now();
342 /// while start.elapsed() <= duration {}
343 /// }
344 /// ```
345 pub total_busy_duration: Duration,
346
347 /// The maximum amount of time a worker thread was busy.
348 ///
349 /// ##### Definition
350 /// This metric is derived from the maximum of
351 /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
352 ///
353 /// ##### See also
354 /// - [`RuntimeMetrics::total_busy_duration`]
355 /// - [`RuntimeMetrics::min_busy_duration`]
356 pub max_busy_duration: Duration,
357
358 /// The minimum amount of time a worker thread was busy.
359 ///
360 /// ##### Definition
361 /// This metric is derived from the minimum of
362 /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
363 ///
364 /// ##### See also
365 /// - [`RuntimeMetrics::total_busy_duration`]
366 /// - [`RuntimeMetrics::max_busy_duration`]
367 pub min_busy_duration: Duration,
368
369 /// The number of tasks currently scheduled in the runtime's global queue.
370 ///
371 /// Tasks that are spawned or notified from a non-runtime thread are scheduled using the
372 /// runtime's global queue. This metric returns the **current** number of tasks pending in
373 /// the global queue. As such, the returned value may increase or decrease as new tasks are
374 /// scheduled and processed.
375 ///
376 /// ##### Definition
377 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::global_queue_depth`].
378 ///
379 /// ##### Example
380 /// ```
381 /// # let current_thread = tokio::runtime::Builder::new_current_thread()
382 /// # .enable_all()
383 /// # .build()
384 /// # .unwrap();
385 /// #
386 /// # let multi_thread = tokio::runtime::Builder::new_multi_thread()
387 /// # .worker_threads(2)
388 /// # .enable_all()
389 /// # .build()
390 /// # .unwrap();
391 /// #
392 /// # for runtime in [current_thread, multi_thread] {
393 /// let handle = runtime.handle().clone();
394 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
395 /// let mut intervals = monitor.intervals();
396 /// let mut next_interval = || intervals.next().unwrap();
397 ///
398 /// let interval = next_interval(); // end of interval 1
399 /// # #[cfg(tokio_unstable)]
400 /// assert_eq!(interval.num_remote_schedules, 0);
401 ///
402 /// // spawn a system thread outside of the runtime
403 /// std::thread::spawn(move || {
404 /// // spawn two tasks from this non-runtime thread
405 /// handle.spawn(async {});
406 /// handle.spawn(async {});
407 /// }).join().unwrap();
408 ///
409 /// // flush metrics
410 /// drop(runtime);
411 ///
412 /// let interval = next_interval(); // end of interval 2
413 /// # #[cfg(tokio_unstable)]
414 /// assert_eq!(interval.num_remote_schedules, 2);
415 /// # }
416 /// ```
417 pub global_queue_depth: usize,
418
419 /// Total amount of time elapsed since observing runtime metrics.
420 pub elapsed: Duration,
421 }
422 unstable {
423 /// The average duration of a single invocation of poll on a task.
424 ///
425 /// This average is an exponentially-weighted moving average of the duration
426 /// of task polls on all runtime workers.
427 ///
428 /// ##### Examples
429 /// ```
430 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
431 /// async fn main() {
432 /// let handle = tokio::runtime::Handle::current();
433 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
434 /// let mut intervals = monitor.intervals();
435 /// let mut next_interval = || intervals.next().unwrap();
436 ///
437 /// let interval = next_interval();
438 /// println!("mean task poll duration is {:?}", interval.mean_poll_duration);
439 /// }
440 /// ```
441 pub mean_poll_duration: Duration,
442
443 /// The average duration of a single invocation of poll on a task on the
444 /// worker with the lowest value.
445 ///
446 /// This average is an exponentially-weighted moving average of the duration
447 /// of task polls on the runtime worker with the lowest value.
448 ///
449 /// ##### Examples
450 /// ```
451 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
452 /// async fn main() {
453 /// let handle = tokio::runtime::Handle::current();
454 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
455 /// let mut intervals = monitor.intervals();
456 /// let mut next_interval = || intervals.next().unwrap();
457 ///
458 /// let interval = next_interval();
459 /// println!("min mean task poll duration is {:?}", interval.mean_poll_duration_worker_min);
460 /// }
461 /// ```
462 pub mean_poll_duration_worker_min: Duration,
463
464 /// The average duration of a single invocation of poll on a task on the
465 /// worker with the highest value.
466 ///
467 /// This average is an exponentially-weighted moving average of the duration
468 /// of task polls on the runtime worker with the highest value.
469 ///
470 /// ##### Examples
471 /// ```
472 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
473 /// async fn main() {
474 /// let handle = tokio::runtime::Handle::current();
475 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
476 /// let mut intervals = monitor.intervals();
477 /// let mut next_interval = || intervals.next().unwrap();
478 ///
479 /// let interval = next_interval();
480 /// println!("max mean task poll duration is {:?}", interval.mean_poll_duration_worker_max);
481 /// }
482 /// ```
483 pub mean_poll_duration_worker_max: Duration,
484
485 /// A histogram of task polls since the previous probe grouped by poll
486 /// times.
487 ///
488 /// Each bucket contains the configured [`Duration`] range and the count
489 /// of task polls that fell into that range during the interval. Use
490 /// [`PollTimeHistogram::as_counts`] to get just the raw counts as a
491 /// `Vec<u64>`.
492 ///
493 /// This metric must be explicitly enabled when creating the runtime with
494 /// [`enable_metrics_poll_time_histogram`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
495 /// Bucket sizes are fixed and configured at the runtime level. See
496 /// configuration options on
497 /// [`runtime::Builder`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
498 ///
499 /// ##### Examples
500 /// ```
501 /// use tokio::runtime::HistogramConfiguration;
502 /// use std::time::Duration;
503 ///
504 /// let config = HistogramConfiguration::linear(Duration::from_micros(50), 12);
505 ///
506 /// let rt = tokio::runtime::Builder::new_multi_thread()
507 /// .enable_metrics_poll_time_histogram()
508 /// .metrics_poll_time_histogram_configuration(config)
509 /// .build()
510 /// .unwrap();
511 ///
512 /// rt.block_on(async {
513 /// let handle = tokio::runtime::Handle::current();
514 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
515 /// let mut intervals = monitor.intervals();
516 /// let mut next_interval = || intervals.next().unwrap();
517 ///
518 /// let interval = next_interval();
519 /// for bucket in interval.poll_time_histogram.buckets() {
520 /// println!("{:?}..{:?} => {} polls", bucket.range_start(), bucket.range_end(), bucket.count());
521 /// }
522 /// });
523 /// ```
524 pub poll_time_histogram: PollTimeHistogram,
525
526 /// The number of times worker threads unparked but performed no work before parking again.
527 ///
528 /// The worker no-op count increases by one each time the worker unparks the thread but finds
529 /// no new work and goes back to sleep. This indicates a false-positive wake up.
530 ///
531 /// ##### Definition
532 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_noop_count`]
533 /// across all worker threads.
534 ///
535 /// ##### Examples
536 /// Unfortunately, there isn't a great way to reliably induce no-op parks, as they occur as
537 /// false-positive events under concurrency.
538 ///
539 /// The below example triggers fewer than two parks in the single-threaded runtime:
540 /// ```
541 /// #[tokio::main(flavor = "current_thread")]
542 /// async fn main() {
543 /// let handle = tokio::runtime::Handle::current();
544 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
545 /// let mut intervals = monitor.intervals();
546 /// let mut next_interval = || intervals.next().unwrap();
547 ///
548 /// assert_eq!(next_interval().total_park_count, 0);
549 ///
550 /// async {
551 /// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
552 /// }.await;
553 ///
554 /// assert!(next_interval().total_park_count > 0);
555 /// }
556 /// ```
557 ///
558 /// The below example triggers fewer than two parks in the multi-threaded runtime:
559 /// ```
560 /// #[tokio::main(flavor = "multi_thread")]
561 /// async fn main() {
562 /// let handle = tokio::runtime::Handle::current();
563 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
564 /// let mut intervals = monitor.intervals();
565 /// let mut next_interval = || intervals.next().unwrap();
566 ///
567 /// async {
568 /// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
569 /// }.await;
570 ///
571 /// assert!(next_interval().total_noop_count > 0);
572 /// }
573 /// ```
574 pub total_noop_count: u64,
575
576 /// The maximum number of times any worker thread unparked but performed no work before parking
577 /// again.
578 ///
579 /// ##### Definition
580 /// This metric is derived from the maximum of
581 /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
582 ///
583 /// ##### See also
584 /// - [`RuntimeMetrics::total_noop_count`]
585 /// - [`RuntimeMetrics::min_noop_count`]
586 pub max_noop_count: u64,
587
588 /// The minimum number of times any worker thread unparked but performed no work before parking
589 /// again.
590 ///
591 /// ##### Definition
592 /// This metric is derived from the minimum of
593 /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
594 ///
595 /// ##### See also
596 /// - [`RuntimeMetrics::total_noop_count`]
597 /// - [`RuntimeMetrics::max_noop_count`]
598 pub min_noop_count: u64,
599
600 /// The number of tasks worker threads stole from another worker thread.
601 ///
602 /// The worker steal count increases by the amount of stolen tasks each time the worker
603 /// has processed its scheduled queue and successfully steals more pending tasks from another
604 /// worker.
605 ///
606 /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
607 /// using the current thread runtime.
608 ///
609 /// ##### Definition
610 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] for
611 /// all worker threads.
612 ///
613 /// ##### See also
614 /// - [`RuntimeMetrics::min_steal_count`]
615 /// - [`RuntimeMetrics::max_steal_count`]
616 ///
617 /// ##### Examples
618 /// In the below example, a blocking channel is used to backup one worker thread:
619 /// ```
620 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
621 /// async fn main() {
622 /// let handle = tokio::runtime::Handle::current();
623 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
624 /// let mut intervals = monitor.intervals();
625 /// let mut next_interval = || intervals.next().unwrap();
626 ///
627 /// let interval = next_interval(); // end of first sampling interval
628 /// assert_eq!(interval.total_steal_count, 0);
629 /// assert_eq!(interval.min_steal_count, 0);
630 /// assert_eq!(interval.max_steal_count, 0);
631 ///
632 /// // induce a steal
633 /// async {
634 /// let (tx, rx) = std::sync::mpsc::channel();
635 /// // Move to the runtime.
636 /// tokio::spawn(async move {
637 /// // Spawn the task that sends to the channel
638 /// tokio::spawn(async move {
639 /// tx.send(()).unwrap();
640 /// });
641 /// // Spawn a task that bumps the previous task out of the "next
642 /// // scheduled" slot.
643 /// tokio::spawn(async {});
644 /// // Blocking receive on the channel.
645 /// rx.recv().unwrap();
646 /// flush_metrics().await;
647 /// }).await.unwrap();
648 /// flush_metrics().await;
649 /// }.await;
650 ///
651 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
652 /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
653 ///
654 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3
655 /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
656 /// }
657 ///
658 /// async fn flush_metrics() {
659 /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
660 /// }
661 /// ```
662 pub total_steal_count: u64,
663
664 /// The maximum number of tasks any worker thread stole from another worker thread.
665 ///
666 /// ##### Definition
667 /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
668 /// across all worker threads.
669 ///
670 /// ##### See also
671 /// - [`RuntimeMetrics::total_steal_count`]
672 /// - [`RuntimeMetrics::min_steal_count`]
673 pub max_steal_count: u64,
674
675 /// The minimum number of tasks any worker thread stole from another worker thread.
676 ///
677 /// ##### Definition
678 /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
679 /// across all worker threads.
680 ///
681 /// ##### See also
682 /// - [`RuntimeMetrics::total_steal_count`]
683 /// - [`RuntimeMetrics::max_steal_count`]
684 pub min_steal_count: u64,
685
686 /// The number of times worker threads stole tasks from another worker thread.
687 ///
688 /// The worker steal operations increases by one each time the worker has processed its
689 /// scheduled queue and successfully steals more pending tasks from another worker.
690 ///
691 /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
692 /// using the current thread runtime.
693 ///
694 /// ##### Definition
695 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
696 /// for all worker threads.
697 ///
698 /// ##### See also
699 /// - [`RuntimeMetrics::min_steal_operations`]
700 /// - [`RuntimeMetrics::max_steal_operations`]
701 ///
702 /// ##### Examples
703 /// In the below example, a blocking channel is used to backup one worker thread:
704 /// ```
705 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
706 /// async fn main() {
707 /// let handle = tokio::runtime::Handle::current();
708 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
709 /// let mut intervals = monitor.intervals();
710 /// let mut next_interval = || intervals.next().unwrap();
711 ///
712 /// let interval = next_interval(); // end of first sampling interval
713 /// assert_eq!(interval.total_steal_operations, 0);
714 /// assert_eq!(interval.min_steal_operations, 0);
715 /// assert_eq!(interval.max_steal_operations, 0);
716 ///
717 /// // induce a steal
718 /// async {
719 /// let (tx, rx) = std::sync::mpsc::channel();
720 /// // Move to the runtime.
721 /// tokio::spawn(async move {
722 /// // Spawn the task that sends to the channel
723 /// tokio::spawn(async move {
724 /// tx.send(()).unwrap();
725 /// });
726 /// // Spawn a task that bumps the previous task out of the "next
727 /// // scheduled" slot.
728 /// tokio::spawn(async {});
729 /// // Blocking receive on the channe.
730 /// rx.recv().unwrap();
731 /// flush_metrics().await;
732 /// }).await.unwrap();
733 /// flush_metrics().await;
734 /// }.await;
735 ///
736 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
737 /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
738 ///
739 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3
740 /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
741 /// }
742 ///
743 /// async fn flush_metrics() {
744 /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
745 /// }
746 /// ```
747 pub total_steal_operations: u64,
748
749 /// The maximum number of times any worker thread stole tasks from another worker thread.
750 ///
751 /// ##### Definition
752 /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
753 /// across all worker threads.
754 ///
755 /// ##### See also
756 /// - [`RuntimeMetrics::total_steal_operations`]
757 /// - [`RuntimeMetrics::min_steal_operations`]
758 pub max_steal_operations: u64,
759
760 /// The minimum number of times any worker thread stole tasks from another worker thread.
761 ///
762 /// ##### Definition
763 /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
764 /// across all worker threads.
765 ///
766 /// ##### See also
767 /// - [`RuntimeMetrics::total_steal_operations`]
768 /// - [`RuntimeMetrics::max_steal_operations`]
769 pub min_steal_operations: u64,
770
771 /// The number of tasks scheduled from **outside** of the runtime.
772 ///
773 /// The remote schedule count increases by one each time a task is woken from **outside** of
774 /// the runtime. This usually means that a task is spawned or notified from a non-runtime
775 /// thread and must be queued using the Runtime's global queue, which tends to be slower.
776 ///
777 /// ##### Definition
778 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::remote_schedule_count`].
779 ///
780 /// ##### Examples
781 /// In the below example, a remote schedule is induced by spawning a system thread, then
782 /// spawning a tokio task from that system thread:
783 /// ```
784 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
785 /// async fn main() {
786 /// let handle = tokio::runtime::Handle::current();
787 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
788 /// let mut intervals = monitor.intervals();
789 /// let mut next_interval = || intervals.next().unwrap();
790 ///
791 /// let interval = next_interval(); // end of first sampling interval
792 /// assert_eq!(interval.num_remote_schedules, 0);
793 ///
794 /// // spawn a non-runtime thread
795 /// std::thread::spawn(move || {
796 /// // spawn two tasks from this non-runtime thread
797 /// async move {
798 /// handle.spawn(async {}).await;
799 /// handle.spawn(async {}).await;
800 /// }
801 /// }).join().unwrap().await;
802 ///
803 /// let interval = next_interval(); // end of second sampling interval
804 /// assert_eq!(interval.num_remote_schedules, 2);
805 ///
806 /// let interval = next_interval(); // end of third sampling interval
807 /// assert_eq!(interval.num_remote_schedules, 0);
808 /// }
809 /// ```
810 pub num_remote_schedules: u64,
811
812 /// The number of tasks scheduled from worker threads.
813 ///
814 /// The local schedule count increases by one each time a task is woken from **inside** of the
815 /// runtime. This usually means that a task is spawned or notified from within a runtime thread
816 /// and will be queued on the worker-local queue.
817 ///
818 /// ##### Definition
819 /// This metric is derived from the sum of
820 /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] across all worker threads.
821 ///
822 /// ##### See also
823 /// - [`RuntimeMetrics::min_local_schedule_count`]
824 /// - [`RuntimeMetrics::max_local_schedule_count`]
825 ///
826 /// ##### Examples
827 /// ###### With `current_thread` runtime
828 /// In the below example, two tasks are spawned from the context of a third tokio task:
829 /// ```
830 /// #[tokio::main(flavor = "current_thread")]
831 /// async fn main() {
832 /// let handle = tokio::runtime::Handle::current();
833 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
834 /// let mut intervals = monitor.intervals();
835 /// let mut next_interval = || intervals.next().unwrap();
836 ///
837 /// let interval = { flush_metrics().await; next_interval() }; // end interval 2
838 /// assert_eq!(interval.total_local_schedule_count, 0);
839 ///
840 /// let task = async {
841 /// tokio::spawn(async {}); // local schedule 1
842 /// tokio::spawn(async {}); // local schedule 2
843 /// };
844 ///
845 /// let handle = tokio::spawn(task); // local schedule 3
846 ///
847 /// let interval = { flush_metrics().await; next_interval() }; // end interval 2
848 /// assert_eq!(interval.total_local_schedule_count, 3);
849 ///
850 /// let _ = handle.await;
851 ///
852 /// let interval = { flush_metrics().await; next_interval() }; // end interval 3
853 /// assert_eq!(interval.total_local_schedule_count, 0);
854 /// }
855 ///
856 /// async fn flush_metrics() {
857 /// tokio::task::yield_now().await;
858 /// }
859 /// ```
860 ///
861 /// ###### With `multi_thread` runtime
862 /// In the below example, 100 tasks are spawned:
863 /// ```
864 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
865 /// async fn main() {
866 /// let handle = tokio::runtime::Handle::current();
867 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
868 /// let mut intervals = monitor.intervals();
869 /// let mut next_interval = || intervals.next().unwrap();
870 ///
871 /// let interval = next_interval(); // end of interval 1
872 /// assert_eq!(interval.total_local_schedule_count, 0);
873 ///
874 /// use std::sync::atomic::{AtomicBool, Ordering};
875 /// static SPINLOCK: AtomicBool = AtomicBool::new(true);
876 ///
877 /// // block the other worker thread
878 /// tokio::spawn(async {
879 /// while SPINLOCK.load(Ordering::SeqCst) {}
880 /// });
881 ///
882 /// // FIXME: why does this need to be in a `spawn`?
883 /// let _ = tokio::spawn(async {
884 /// // spawn 100 tasks
885 /// for _ in 0..100 {
886 /// tokio::spawn(async {});
887 /// }
888 /// // this spawns 1 more task
889 /// flush_metrics().await;
890 /// }).await;
891 ///
892 /// // unblock the other worker thread
893 /// SPINLOCK.store(false, Ordering::SeqCst);
894 ///
895 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
896 /// assert_eq!(interval.total_local_schedule_count, 100 + 1);
897 /// }
898 ///
899 /// async fn flush_metrics() {
900 /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
901 /// }
902 /// ```
903 pub total_local_schedule_count: u64,
904
905 /// The maximum number of tasks scheduled from any one worker thread.
906 ///
907 /// ##### Definition
908 /// This metric is derived from the maximum of
909 /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
910 ///
911 /// ##### See also
912 /// - [`RuntimeMetrics::total_local_schedule_count`]
913 /// - [`RuntimeMetrics::min_local_schedule_count`]
914 pub max_local_schedule_count: u64,
915
916 /// The minimum number of tasks scheduled from any one worker thread.
917 ///
918 /// ##### Definition
919 /// This metric is derived from the minimum of
920 /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
921 ///
922 /// ##### See also
923 /// - [`RuntimeMetrics::total_local_schedule_count`]
924 /// - [`RuntimeMetrics::max_local_schedule_count`]
925 pub min_local_schedule_count: u64,
926
927 /// The number of times worker threads saturated their local queues.
928 ///
929 /// The worker steal count increases by one each time the worker attempts to schedule a task
930 /// locally, but its local queue is full. When this happens, half of the
931 /// local queue is moved to the global queue.
932 ///
933 /// This metric only applies to the **multi-threaded** scheduler.
934 ///
935 /// ##### Definition
936 /// This metric is derived from the sum of
937 /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
938 ///
939 /// ##### See also
940 /// - [`RuntimeMetrics::min_overflow_count`]
941 /// - [`RuntimeMetrics::max_overflow_count`]
942 ///
943 /// ##### Examples
944 /// ```
945 /// #[tokio::main(flavor = "multi_thread", worker_threads = 1)]
946 /// async fn main() {
947 /// let handle = tokio::runtime::Handle::current();
948 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
949 /// let mut intervals = monitor.intervals();
950 /// let mut next_interval = || intervals.next().unwrap();
951 ///
952 /// let interval = next_interval(); // end of interval 1
953 /// assert_eq!(interval.total_overflow_count, 0);
954 ///
955 /// use std::sync::atomic::{AtomicBool, Ordering};
956 ///
957 /// // spawn a ton of tasks
958 /// let _ = tokio::spawn(async {
959 /// // we do this in a `tokio::spawn` because it is impossible to
960 /// // overflow the main task
961 /// for _ in 0..300 {
962 /// tokio::spawn(async {});
963 /// }
964 /// }).await;
965 ///
966 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
967 /// assert_eq!(interval.total_overflow_count, 1);
968 /// }
969 ///
970 /// async fn flush_metrics() {
971 /// let _ = tokio::time::sleep(std::time::Duration::from_millis(1)).await;
972 /// }
973 /// ```
974 pub total_overflow_count: u64,
975
976 /// The maximum number of times any one worker saturated its local queue.
977 ///
978 /// ##### Definition
979 /// This metric is derived from the maximum of
980 /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
981 ///
982 /// ##### See also
983 /// - [`RuntimeMetrics::total_overflow_count`]
984 /// - [`RuntimeMetrics::min_overflow_count`]
985 pub max_overflow_count: u64,
986
987 /// The minimum number of times any one worker saturated its local queue.
988 ///
989 /// ##### Definition
990 /// This metric is derived from the maximum of
991 /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
992 ///
993 /// ##### See also
994 /// - [`RuntimeMetrics::total_overflow_count`]
995 /// - [`RuntimeMetrics::max_overflow_count`]
996 pub min_overflow_count: u64,
997
998 /// The number of tasks that have been polled across all worker threads.
999 ///
1000 /// The worker poll count increases by one each time a worker polls a scheduled task.
1001 ///
1002 /// ##### Definition
1003 /// This metric is derived from the sum of
1004 /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1005 ///
1006 /// ##### See also
1007 /// - [`RuntimeMetrics::min_polls_count`]
1008 /// - [`RuntimeMetrics::max_polls_count`]
1009 ///
1010 /// ##### Examples
1011 /// In the below example, 42 tasks are spawned and polled:
1012 /// ```
1013 /// #[tokio::main(flavor = "current_thread")]
1014 /// async fn main() {
1015 /// let handle = tokio::runtime::Handle::current();
1016 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1017 /// let mut intervals = monitor.intervals();
1018 /// let mut next_interval = || intervals.next().unwrap();
1019 ///
1020 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 1
1021 /// assert_eq!(interval.total_polls_count, 0);
1022 /// assert_eq!(interval.min_polls_count, 0);
1023 /// assert_eq!(interval.max_polls_count, 0);
1024 ///
1025 /// const N: u64 = 42;
1026 ///
1027 /// for _ in 0..N {
1028 /// let _ = tokio::spawn(async {}).await;
1029 /// }
1030 ///
1031 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
1032 /// assert_eq!(interval.total_polls_count, N);
1033 /// assert_eq!(interval.min_polls_count, N);
1034 /// assert_eq!(interval.max_polls_count, N);
1035 /// }
1036 ///
1037 /// async fn flush_metrics() {
1038 /// let _ = tokio::task::yield_now().await;
1039 /// }
1040 /// ```
1041 pub total_polls_count: u64,
1042
1043 /// The maximum number of tasks that have been polled in any worker thread.
1044 ///
1045 /// ##### Definition
1046 /// This metric is derived from the maximum of
1047 /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1048 ///
1049 /// ##### See also
1050 /// - [`RuntimeMetrics::total_polls_count`]
1051 /// - [`RuntimeMetrics::min_polls_count`]
1052 pub max_polls_count: u64,
1053
1054 /// The minimum number of tasks that have been polled in any worker thread.
1055 ///
1056 /// ##### Definition
1057 /// This metric is derived from the minimum of
1058 /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1059 ///
1060 /// ##### See also
1061 /// - [`RuntimeMetrics::total_polls_count`]
1062 /// - [`RuntimeMetrics::max_polls_count`]
1063 pub min_polls_count: u64,
1064
1065 /// The total number of tasks currently scheduled in workers' local queues.
1066 ///
1067 /// Tasks that are spawned or notified from within a runtime thread are scheduled using that
1068 /// worker's local queue. This metric returns the **current** number of tasks pending in all
1069 /// workers' local queues. As such, the returned value may increase or decrease as new tasks
1070 /// are scheduled and processed.
1071 ///
1072 /// ##### Definition
1073 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`].
1074 ///
1075 /// ##### See also
1076 /// - [`RuntimeMetrics::min_local_queue_depth`]
1077 /// - [`RuntimeMetrics::max_local_queue_depth`]
1078 ///
1079 /// ##### Example
1080 ///
1081 /// ###### With `current_thread` runtime
1082 /// The below example spawns 100 tasks:
1083 /// ```
1084 /// #[tokio::main(flavor = "current_thread")]
1085 /// async fn main() {
1086 /// const N: usize = 100;
1087 ///
1088 /// let handle = tokio::runtime::Handle::current();
1089 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1090 /// let mut intervals = monitor.intervals();
1091 /// let mut next_interval = || intervals.next().unwrap();
1092 ///
1093 /// let interval = next_interval(); // end of interval 1
1094 /// assert_eq!(interval.total_local_queue_depth, 0);
1095 ///
1096 ///
1097 /// for _ in 0..N {
1098 /// tokio::spawn(async {});
1099 /// }
1100 /// let interval = next_interval(); // end of interval 2
1101 /// assert_eq!(interval.total_local_queue_depth, N);
1102 /// }
1103 /// ```
1104 ///
1105 /// ###### With `multi_thread` runtime
1106 /// The below example spawns 100 tasks and observes them in the
1107 /// local queue:
1108 /// ```
1109 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
1110 /// async fn main() {
1111 /// use std::sync::mpsc;
1112 /// use tokio::sync::oneshot;
1113 ///
1114 /// const N: usize = 100;
1115 ///
1116 /// let handle = tokio::runtime::Handle::current();
1117 ///
1118 /// // block one worker so the other is the only one running
1119 /// let (block_tx, block_rx) = mpsc::channel::<()>();
1120 /// let (started_tx, started_rx) = oneshot::channel();
1121 /// tokio::spawn(async move {
1122 /// let _ = started_tx.send(());
1123 /// let _ = block_rx.recv();
1124 /// });
1125 /// let _ = started_rx.await;
1126 ///
1127 /// // spawn + sample from the free worker thread
1128 /// let (depth_tx, depth_rx) = oneshot::channel();
1129 /// tokio::spawn(async move {
1130 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1131 /// let mut intervals = monitor.intervals();
1132 /// let _ = intervals.next().unwrap(); // baseline
1133 ///
1134 /// for _ in 0..N {
1135 /// tokio::spawn(async {});
1136 /// }
1137 ///
1138 /// let depth = intervals.next().unwrap().total_local_queue_depth;
1139 /// let _ = depth_tx.send(depth);
1140 /// });
1141 ///
1142 /// let depth = depth_rx.await.unwrap();
1143 ///
1144 /// // Tokio may place one spawned task in a LIFO slot rather than the
1145 /// // local queue, which may not be reflected in `worker_local_queue_depth`,
1146 /// // so accept N or N - 1.
1147 /// assert!(depth == N || depth == N - 1, "depth = {depth}");
1148 ///
1149 /// let _ = block_tx.send(());
1150 /// }
1151 /// ```
1152 pub total_local_queue_depth: usize,
1153
1154 /// The maximum number of tasks currently scheduled any worker's local queue.
1155 ///
1156 /// ##### Definition
1157 /// This metric is derived from the maximum of
1158 /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1159 ///
1160 /// ##### See also
1161 /// - [`RuntimeMetrics::total_local_queue_depth`]
1162 /// - [`RuntimeMetrics::min_local_queue_depth`]
1163 pub max_local_queue_depth: usize,
1164
1165 /// The minimum number of tasks currently scheduled any worker's local queue.
1166 ///
1167 /// ##### Definition
1168 /// This metric is derived from the minimum of
1169 /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1170 ///
1171 /// ##### See also
1172 /// - [`RuntimeMetrics::total_local_queue_depth`]
1173 /// - [`RuntimeMetrics::max_local_queue_depth`]
1174 pub min_local_queue_depth: usize,
1175
1176 /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool.
1177 ///
1178 /// ##### Definition
1179 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::blocking_queue_depth`].
1180 pub blocking_queue_depth: usize,
1181
1182 /// The number of additional threads spawned by the runtime.
1183 ///
1184 /// ##### Definition
1185 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_blocking_threads`].
1186 pub blocking_threads_count: usize,
1187
1188 /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls.
1189 ///
1190 /// ##### Definition
1191 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_idle_blocking_threads`].
1192 pub idle_blocking_threads_count: usize,
1193
1194 /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets.
1195 ///
1196 /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget.
1197 ///
1198 /// The counter is monotonically increasing. It is never decremented or reset to zero.
1199 ///
1200 /// ##### Definition
1201 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::budget_forced_yield_count`].
1202 pub budget_forced_yield_count: u64,
1203
1204 /// Returns the number of ready events processed by the runtime’s I/O driver.
1205 ///
1206 /// ##### Definition
1207 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::io_driver_ready_count`].
1208 pub io_driver_ready_count: u64,
1209 }
1210}
1211
1212macro_rules! define_semi_stable {
1213 (
1214 $(#[$($attributes:tt)*])*
1215 $vis:vis struct $name:ident {
1216 stable {
1217 $($stable_name:ident: $stable_ty:ty),*
1218 $(,)?
1219 }
1220 $(,)?
1221 unstable {
1222 $($unstable_name:ident: $unstable_ty:ty),*
1223 $(,)?
1224 }
1225 }
1226 ) => {
1227 $(#[$($attributes)*])*
1228 $vis struct $name {
1229 $(
1230 $stable_name: $stable_ty,
1231 )*
1232 $(
1233 #[cfg(tokio_unstable)]
1234 #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
1235 $unstable_name: $unstable_ty,
1236 )*
1237 }
1238 };
1239}
1240
1241define_semi_stable! {
1242 /// Snapshot of per-worker metrics
1243 #[derive(Debug, Default)]
1244 struct Worker {
1245 stable {
1246 worker: usize,
1247 total_park_count: u64,
1248 total_busy_duration: Duration,
1249 }
1250 unstable {
1251 total_noop_count: u64,
1252 total_steal_count: u64,
1253 total_steal_operations: u64,
1254 total_local_schedule_count: u64,
1255 total_overflow_count: u64,
1256 total_polls_count: u64,
1257 poll_time_histogram: Vec<u64>,
1258 }
1259 }
1260}
1261
1262define_semi_stable! {
1263 /// Iterator returned by [`RuntimeMonitor::intervals`].
1264 ///
1265 /// See that method's documentation for more details.
1266 #[derive(Debug)]
1267 pub struct RuntimeIntervals {
1268 stable {
1269 runtime: runtime::RuntimeMetrics,
1270 started_at: Instant,
1271 workers: Vec<Worker>,
1272 }
1273 unstable {
1274 // Number of tasks scheduled from *outside* of the runtime
1275 num_remote_schedules: u64,
1276 budget_forced_yield_count: u64,
1277 io_driver_ready_count: u64,
1278 // Cached bucket ranges, static config that doesn't change after runtime creation.
1279 bucket_ranges: Vec<Range<Duration>>,
1280 }
1281 }
1282}
1283
1284impl RuntimeIntervals {
1285 fn probe(&mut self) -> RuntimeMetrics {
1286 let now = Instant::now();
1287
1288 let mut metrics = RuntimeMetrics {
1289 workers_count: self.runtime.num_workers(),
1290 live_tasks_count: self.runtime.num_alive_tasks(),
1291 elapsed: now.saturating_duration_since(self.started_at),
1292 global_queue_depth: self.runtime.global_queue_depth(),
1293 min_park_count: u64::MAX,
1294 min_busy_duration: Duration::from_secs(1000000000),
1295 ..Default::default()
1296 };
1297
1298 #[cfg(tokio_unstable)]
1299 {
1300 let num_remote_schedules = self.runtime.remote_schedule_count();
1301 let budget_forced_yields = self.runtime.budget_forced_yield_count();
1302 let io_driver_ready_events = self.runtime.io_driver_ready_count();
1303
1304 metrics.num_remote_schedules = num_remote_schedules.saturating_sub(self.num_remote_schedules);
1305 metrics.min_noop_count = u64::MAX;
1306 metrics.min_steal_count = u64::MAX;
1307 metrics.min_local_schedule_count = u64::MAX;
1308 metrics.min_overflow_count = u64::MAX;
1309 metrics.min_polls_count = u64::MAX;
1310 metrics.min_local_queue_depth = usize::MAX;
1311 metrics.mean_poll_duration_worker_min = Duration::MAX;
1312 metrics.poll_time_histogram = PollTimeHistogram::new(
1313 self.bucket_ranges
1314 .iter()
1315 .map(|range| HistogramBucket::new(range.start, range.end, 0))
1316 .collect(),
1317 );
1318 metrics.budget_forced_yield_count =
1319 budget_forced_yields.saturating_sub(self.budget_forced_yield_count);
1320 metrics.io_driver_ready_count = io_driver_ready_events.saturating_sub(self.io_driver_ready_count);
1321
1322 self.num_remote_schedules = num_remote_schedules;
1323 self.budget_forced_yield_count = budget_forced_yields;
1324 self.io_driver_ready_count = io_driver_ready_events;
1325 }
1326 self.started_at = now;
1327
1328 for worker in &mut self.workers {
1329 worker.probe(&self.runtime, &mut metrics);
1330 }
1331
1332 #[cfg(tokio_unstable)]
1333 {
1334 if metrics.total_polls_count == 0 {
1335 debug_assert_eq!(metrics.mean_poll_duration, Duration::default());
1336
1337 metrics.mean_poll_duration_worker_max = Duration::default();
1338 metrics.mean_poll_duration_worker_min = Duration::default();
1339 }
1340 }
1341
1342 metrics
1343 }
1344}
1345
1346impl Iterator for RuntimeIntervals {
1347 type Item = RuntimeMetrics;
1348
1349 fn next(&mut self) -> Option<RuntimeMetrics> {
1350 Some(self.probe())
1351 }
1352}
1353
1354impl RuntimeMonitor {
1355 /// Creates a new [`RuntimeMonitor`].
1356 pub fn new(runtime: &runtime::Handle) -> RuntimeMonitor {
1357 let runtime = runtime.metrics();
1358
1359 RuntimeMonitor { runtime }
1360 }
1361
1362 /// Produces an unending iterator of [`RuntimeMetrics`].
1363 ///
1364 /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1365 /// produced by [`RuntimeMonitor::intervals`]. The item type of this iterator is [`RuntimeMetrics`],
1366 /// which is a bundle of runtime metrics that describe *only* changes occurring within that sampling
1367 /// interval.
1368 ///
1369 /// # Example
1370 ///
1371 /// ```
1372 /// use std::time::Duration;
1373 ///
1374 /// #[tokio::main]
1375 /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1376 /// let handle = tokio::runtime::Handle::current();
1377 /// // construct the runtime metrics monitor
1378 /// let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1379 ///
1380 /// // print runtime metrics every 500ms
1381 /// {
1382 /// tokio::spawn(async move {
1383 /// for interval in runtime_monitor.intervals() {
1384 /// // pretty-print the metric interval
1385 /// println!("{:?}", interval);
1386 /// // wait 500ms
1387 /// tokio::time::sleep(Duration::from_millis(500)).await;
1388 /// }
1389 /// });
1390 /// }
1391 ///
1392 /// // await some tasks
1393 /// tokio::join![
1394 /// do_work(),
1395 /// do_work(),
1396 /// do_work(),
1397 /// ];
1398 ///
1399 /// Ok(())
1400 /// }
1401 ///
1402 /// async fn do_work() {
1403 /// for _ in 0..25 {
1404 /// tokio::task::yield_now().await;
1405 /// tokio::time::sleep(Duration::from_millis(100)).await;
1406 /// }
1407 /// }
1408 /// ```
1409 pub fn intervals(&self) -> RuntimeIntervals {
1410 let started_at = Instant::now();
1411
1412 let workers = (0..self.runtime.num_workers())
1413 .map(|worker| Worker::new(worker, &self.runtime))
1414 .collect();
1415
1416 RuntimeIntervals {
1417 runtime: self.runtime.clone(),
1418 started_at,
1419 workers,
1420
1421 #[cfg(tokio_unstable)]
1422 num_remote_schedules: self.runtime.remote_schedule_count(),
1423 #[cfg(tokio_unstable)]
1424 budget_forced_yield_count: self.runtime.budget_forced_yield_count(),
1425 #[cfg(tokio_unstable)]
1426 io_driver_ready_count: self.runtime.io_driver_ready_count(),
1427 #[cfg(tokio_unstable)]
1428 bucket_ranges: (0..self.runtime.poll_time_histogram_num_buckets())
1429 .map(|i| self.runtime.poll_time_histogram_bucket_range(i))
1430 .collect(),
1431 }
1432 }
1433}
1434
1435impl Worker {
1436 fn new(worker: usize, rt: &runtime::RuntimeMetrics) -> Worker {
1437 #[allow(unused_mut, clippy::needless_update)]
1438 let mut wrk = Worker {
1439 worker,
1440 total_park_count: rt.worker_park_count(worker),
1441 total_busy_duration: rt.worker_total_busy_duration(worker),
1442 ..Default::default()
1443 };
1444
1445 #[cfg(tokio_unstable)]
1446 {
1447 let poll_time_histogram = if rt.poll_time_histogram_enabled() {
1448 vec![0; rt.poll_time_histogram_num_buckets()]
1449 } else {
1450 vec![]
1451 };
1452 wrk.total_noop_count = rt.worker_noop_count(worker);
1453 wrk.total_steal_count = rt.worker_steal_count(worker);
1454 wrk.total_steal_operations = rt.worker_steal_operations(worker);
1455 wrk.total_local_schedule_count = rt.worker_local_schedule_count(worker);
1456 wrk.total_overflow_count = rt.worker_overflow_count(worker);
1457 wrk.total_polls_count = rt.worker_poll_count(worker);
1458 wrk.poll_time_histogram = poll_time_histogram;
1459 };
1460 wrk
1461 }
1462
1463 fn probe(&mut self, rt: &runtime::RuntimeMetrics, metrics: &mut RuntimeMetrics) {
1464 macro_rules! metric {
1465 ( $sum:ident, $max:ident, $min:ident, $probe:ident ) => {{
1466 let val = rt.$probe(self.worker);
1467 let delta = val - self.$sum;
1468 self.$sum = val;
1469
1470 metrics.$sum += delta;
1471
1472 if delta > metrics.$max {
1473 metrics.$max = delta;
1474 }
1475
1476 if delta < metrics.$min {
1477 metrics.$min = delta;
1478 }
1479 }};
1480 }
1481
1482 metric!(
1483 total_park_count,
1484 max_park_count,
1485 min_park_count,
1486 worker_park_count
1487 );
1488 metric!(
1489 total_busy_duration,
1490 max_busy_duration,
1491 min_busy_duration,
1492 worker_total_busy_duration
1493 );
1494
1495 #[cfg(tokio_unstable)]
1496 {
1497 let mut worker_polls_count = self.total_polls_count;
1498 let total_polls_count = metrics.total_polls_count;
1499
1500 metric!(
1501 total_noop_count,
1502 max_noop_count,
1503 min_noop_count,
1504 worker_noop_count
1505 );
1506 metric!(
1507 total_steal_count,
1508 max_steal_count,
1509 min_steal_count,
1510 worker_steal_count
1511 );
1512 metric!(
1513 total_steal_operations,
1514 max_steal_operations,
1515 min_steal_operations,
1516 worker_steal_operations
1517 );
1518 metric!(
1519 total_local_schedule_count,
1520 max_local_schedule_count,
1521 min_local_schedule_count,
1522 worker_local_schedule_count
1523 );
1524 metric!(
1525 total_overflow_count,
1526 max_overflow_count,
1527 min_overflow_count,
1528 worker_overflow_count
1529 );
1530 metric!(
1531 total_polls_count,
1532 max_polls_count,
1533 min_polls_count,
1534 worker_poll_count
1535 );
1536
1537 // Get the number of polls since last probe
1538 worker_polls_count = self.total_polls_count.saturating_sub(worker_polls_count);
1539
1540 // Update the mean task poll duration if there were polls
1541 if worker_polls_count > 0 {
1542 let val = rt.worker_mean_poll_time(self.worker);
1543
1544 if val > metrics.mean_poll_duration_worker_max {
1545 metrics.mean_poll_duration_worker_max = val;
1546 }
1547
1548 if val < metrics.mean_poll_duration_worker_min {
1549 metrics.mean_poll_duration_worker_min = val;
1550 }
1551
1552 // First, scale the current value down
1553 let ratio = total_polls_count as f64 / metrics.total_polls_count as f64;
1554 let mut mean = metrics.mean_poll_duration.as_nanos() as f64 * ratio;
1555
1556 // Add the scaled current worker's mean poll duration
1557 let ratio = worker_polls_count as f64 / metrics.total_polls_count as f64;
1558 mean += val.as_nanos() as f64 * ratio;
1559
1560 metrics.mean_poll_duration = Duration::from_nanos(mean as u64);
1561 }
1562
1563 // Update the histogram counts if there were polls since last count
1564 if worker_polls_count > 0 {
1565 for (bucket, entry) in metrics.poll_time_histogram.buckets_mut().iter_mut().enumerate() {
1566 let new = rt.poll_time_histogram_bucket_count(self.worker, bucket);
1567 let delta = new.saturating_sub(self.poll_time_histogram[bucket]);
1568 self.poll_time_histogram[bucket] = new;
1569
1570 entry.add_count(delta);
1571 }
1572 }
1573
1574 // Local scheduled tasks is an absolute value
1575 let local_scheduled_tasks = rt.worker_local_queue_depth(self.worker);
1576 metrics.total_local_queue_depth = metrics.total_local_queue_depth.saturating_add(local_scheduled_tasks);
1577
1578 if local_scheduled_tasks > metrics.max_local_queue_depth {
1579 metrics.max_local_queue_depth = local_scheduled_tasks;
1580 }
1581
1582 if local_scheduled_tasks < metrics.min_local_queue_depth {
1583 metrics.min_local_queue_depth = local_scheduled_tasks;
1584 }
1585
1586 // Blocking queue depth is an absolute value too
1587 metrics.blocking_queue_depth = rt.blocking_queue_depth();
1588
1589 metrics.blocking_threads_count = rt.num_blocking_threads();
1590 metrics.idle_blocking_threads_count = rt.num_idle_blocking_threads();
1591 }
1592 }
1593}
1594
1595derived_metrics!(
1596 [RuntimeMetrics] {
1597 stable {
1598 /// Returns the ratio of the [`RuntimeMetrics::total_busy_duration`] to the [`RuntimeMetrics::elapsed`].
1599 pub fn busy_ratio(&self) -> f64 {
1600 self.total_busy_duration.as_nanos() as f64 / self.elapsed.as_nanos() as f64
1601 }
1602 }
1603 unstable {
1604 /// Returns the ratio of the [`RuntimeMetrics::total_polls_count`] to the [`RuntimeMetrics::total_noop_count`].
1605 pub fn mean_polls_per_park(&self) -> f64 {
1606 let total_park_count = self.total_park_count.saturating_sub(self.total_noop_count);
1607 if total_park_count == 0 {
1608 0.0
1609 } else {
1610 self.total_polls_count as f64 / total_park_count as f64
1611 }
1612 }
1613 }
1614 }
1615);
1616
1617#[cfg(all(test, tokio_unstable, feature = "metrique-integration"))]
1618mod metrique_integration_tests {
1619 use super::*;
1620 use metrique::test_util::test_metric;
1621
1622 /// Compile-time regression: if a field is added whose type doesn't
1623 /// implement `CloseValue`, this will fail to compile.
1624 #[test]
1625 fn metrique_integration_produces_expected_fields() {
1626 let metrics = RuntimeMetrics {
1627 workers_count: 4,
1628 total_park_count: 100,
1629 poll_time_histogram: PollTimeHistogram::new(vec![
1630 HistogramBucket::new(Duration::from_micros(0), Duration::from_micros(100), 10),
1631 HistogramBucket::new(Duration::from_micros(100), Duration::from_micros(200), 0),
1632 HistogramBucket::new(Duration::from_micros(200), Duration::from_micros(500), 3),
1633 ]),
1634 ..Default::default()
1635 };
1636
1637 let entry = test_metric(metrics);
1638
1639 // Stable fields
1640 assert_eq!(entry.metrics["workers_count"], 4);
1641 assert_eq!(entry.metrics["total_park_count"], 100);
1642 assert_eq!(entry.metrics["elapsed"].as_f64(), 0.0);
1643 assert_eq!(entry.metrics["total_busy_duration"].as_f64(), 0.0);
1644 assert_eq!(entry.metrics["global_queue_depth"].as_u64(), 0);
1645
1646 // Unstable fields
1647 assert_eq!(entry.metrics["mean_poll_duration"].as_f64(), 0.0);
1648 assert_eq!(entry.metrics["total_steal_count"].as_u64(), 0);
1649 assert_eq!(entry.metrics["total_polls_count"].as_u64(), 0);
1650
1651 // 2 non-zero buckets (count 10 and 3) should produce 2 observations
1652 let hist = &entry.metrics["poll_time_histogram"];
1653 assert_eq!(hist.distribution.len(), 2, "expected 2 non-zero buckets");
1654
1655 // midpoint of 0..100µs = 50µs, count = 10
1656 match hist.distribution[0] {
1657 metrique::writer::Observation::Repeated { total, occurrences } => {
1658 assert_eq!(occurrences, 10);
1659 assert!((total - 500.0).abs() < 0.01, "expected 50 * 10 = 500, got {total}");
1660 }
1661 other => panic!("expected Repeated, got {other:?}"),
1662 }
1663
1664 // midpoint of 200..500µs = 350µs, count = 3
1665 match hist.distribution[1] {
1666 metrique::writer::Observation::Repeated { total, occurrences } => {
1667 assert_eq!(occurrences, 3);
1668 assert!((total - 1050.0).abs() < 0.01, "expected 350 * 3 = 1050, got {total}");
1669 }
1670 other => panic!("expected Repeated, got {other:?}"),
1671 }
1672 }
1673
1674 /// Collect `RuntimeMetrics` from a live Tokio runtime and verify the pipeline produces valid output.
1675 #[cfg(feature = "rt")]
1676 #[test]
1677 fn metrique_end_to_end() {
1678 let rt = tokio::runtime::Builder::new_current_thread()
1679 .enable_all()
1680 .enable_metrics_poll_time_histogram()
1681 .build()
1682 .unwrap();
1683
1684 rt.block_on(async {
1685 let handle = tokio::runtime::Handle::current();
1686 let monitor = RuntimeMonitor::new(&handle);
1687 let mut intervals = monitor.intervals();
1688
1689 let _ = intervals.next().unwrap();
1690
1691 // Spawn tasks to create some work for the runtime to poll.
1692 let mut metrics_with_polls = None;
1693 for _ in 0..4 {
1694 for _ in 0..25 {
1695 tokio::spawn(async {
1696 tokio::task::yield_now().await;
1697 })
1698 .await
1699 .unwrap();
1700 }
1701 // Slow poll (>900µs) to land in the last histogram bucket.
1702 tokio::spawn(async {
1703 std::thread::sleep(Duration::from_millis(1));
1704 })
1705 .await
1706 .unwrap();
1707
1708 let metrics = intervals.next().unwrap();
1709 let total_polls: u64 = metrics.poll_time_histogram.buckets().iter().map(|b| b.count()).sum();
1710 if total_polls > 0 {
1711 metrics_with_polls = Some(metrics);
1712 break;
1713 }
1714 }
1715 let metrics = metrics_with_polls.expect("expected polls to be recorded within 4 sampled intervals");
1716
1717 let expected_workers_count = metrics.workers_count;
1718 let expected_non_zero_buckets = metrics
1719 .poll_time_histogram
1720 .buckets()
1721 .iter()
1722 .filter(|b| b.count() > 0)
1723 .count();
1724
1725 let expected_total_polls: u64 = metrics.poll_time_histogram.buckets().iter().map(|b| b.count()).sum();
1726 assert!(expected_workers_count > 0);
1727 assert!(expected_total_polls > 0);
1728
1729 let last_bucket = metrics.poll_time_histogram.buckets().last().unwrap();
1730
1731 // Sanity check: Tokio's last histogram bucket ends at Duration::from_nanos(u64::MAX)
1732 assert_eq!(last_bucket.range_end(), Duration::from_nanos(u64::MAX));
1733 assert!(last_bucket.count() > 0, "expected slow poll to land in last bucket");
1734 let last_bucket_start_us = last_bucket.range_start().as_micros() as f64;
1735 let last_bucket_count = last_bucket.count();
1736
1737 let entry = test_metric(metrics);
1738
1739 assert_eq!(entry.metrics["workers_count"], expected_workers_count as u64);
1740 assert!(entry.metrics["elapsed"].as_f64() >= 0.0);
1741 assert!(entry.metrics["total_busy_duration"].as_f64() >= 0.0);
1742
1743 let hist = &entry.metrics["poll_time_histogram"];
1744 assert_eq!(hist.distribution.len(), expected_non_zero_buckets);
1745 let observed_total_occurrences: u64 = hist
1746 .distribution
1747 .iter()
1748 .map(|obs| match obs {
1749 metrique::writer::Observation::Repeated { occurrences, .. } => *occurrences,
1750 other => panic!("expected Repeated, got {other:?}"),
1751 })
1752 .sum();
1753 assert_eq!(observed_total_occurrences, expected_total_polls);
1754
1755 // The last observation corresponds to the last histogram bucket.
1756 // Verify it uses range_start as the representative value instead of a midpoint,
1757 // since the last bucket range_end is Duration::from_nanos(u64::MAX).
1758 let last_obs = hist.distribution.last().unwrap();
1759 match last_obs {
1760 metrique::writer::Observation::Repeated { total, occurrences } => {
1761 assert_eq!(*occurrences, last_bucket_count);
1762 let expected_total = last_bucket_start_us * last_bucket_count as f64;
1763 assert!(
1764 (total - expected_total).abs() < 0.01,
1765 "last bucket should use range_start ({last_bucket_start_us}µs) as representative value, \
1766 expected total={expected_total}, got {total}"
1767 );
1768 }
1769 other => panic!("expected Repeated, got {other:?}"),
1770 }
1771 });
1772 }
1773}