tokio_metrics/runtime.rs
1use std::time::{Duration, Instant};
2use tokio::runtime;
3use crate::derived_metrics::derived_metrics;
4
5#[cfg(feature = "metrics-rs-integration")]
6pub(crate) mod metrics_rs_integration;
7
8/// Monitors key metrics of the tokio runtime.
9///
10/// ### Usage
11/// ```
12/// use std::time::Duration;
13/// use tokio_metrics::RuntimeMonitor;
14///
15/// #[tokio::main]
16/// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
17/// let handle = tokio::runtime::Handle::current();
18///
19/// // print runtime metrics every 500ms
20/// {
21/// let runtime_monitor = RuntimeMonitor::new(&handle);
22/// tokio::spawn(async move {
23/// for interval in runtime_monitor.intervals() {
24/// // pretty-print the metric interval
25/// println!("{:?}", interval);
26/// // wait 500ms
27/// tokio::time::sleep(Duration::from_millis(500)).await;
28/// }
29/// });
30/// }
31///
32/// // await some tasks
33/// tokio::join![
34/// do_work(),
35/// do_work(),
36/// do_work(),
37/// ];
38///
39/// Ok(())
40/// }
41///
42/// async fn do_work() {
43/// for _ in 0..25 {
44/// tokio::task::yield_now().await;
45/// tokio::time::sleep(Duration::from_millis(100)).await;
46/// }
47/// }
48/// ```
49#[derive(Debug)]
50pub struct RuntimeMonitor {
51 /// Handle to the runtime
52 runtime: runtime::RuntimeMetrics,
53}
54
55macro_rules! define_runtime_metrics {
56 (
57 stable {
58 $(
59 $(#[$($attributes:tt)*])*
60 $vis:vis $name:ident: $ty:ty
61 ),*
62 $(,)?
63 }
64 unstable {
65 $(
66 $(#[$($unstable_attributes:tt)*])*
67 $unstable_vis:vis $unstable_name:ident: $unstable_ty:ty
68 ),*
69 $(,)?
70 }
71 ) => {
72 /// Key runtime metrics.
73 #[non_exhaustive]
74 #[derive(Default, Debug, Clone)]
75 pub struct RuntimeMetrics {
76 $(
77 $(#[$($attributes)*])*
78 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
79 $vis $name: $ty,
80 )*
81 $(
82 $(#[$($unstable_attributes)*])*
83 #[cfg(tokio_unstable)]
84 #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
85 $unstable_vis $unstable_name: $unstable_ty,
86 )*
87 }
88 };
89}
90
91define_runtime_metrics! {
92 stable {
93 /// The number of worker threads used by the runtime.
94 ///
95 /// This metric is static for a runtime.
96 ///
97 /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`].
98 /// When using the `current_thread` runtime, the return value is always `1`.
99 ///
100 /// The number of workers is set by configuring
101 /// [`worker_threads`][`tokio::runtime::Builder::worker_threads`] with
102 /// [`tokio::runtime::Builder`], or by parameterizing [`tokio::main`].
103 ///
104 /// ##### Examples
105 /// In the below example, the number of workers is set by parameterizing [`tokio::main`]:
106 /// ```
107 /// use tokio::runtime::Handle;
108 ///
109 /// #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
110 /// async fn main() {
111 /// let handle = tokio::runtime::Handle::current();
112 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
113 /// let mut intervals = monitor.intervals();
114 /// let mut next_interval = || intervals.next().unwrap();
115 ///
116 /// assert_eq!(next_interval().workers_count, 10);
117 /// }
118 /// ```
119 ///
120 /// [`tokio::main`]: https://docs.rs/tokio/latest/tokio/attr.main.html
121 ///
122 /// When using the `current_thread` runtime, the return value is always `1`; e.g.:
123 /// ```
124 /// use tokio::runtime::Handle;
125 ///
126 /// #[tokio::main(flavor = "current_thread")]
127 /// async fn main() {
128 /// let handle = tokio::runtime::Handle::current();
129 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
130 /// let mut intervals = monitor.intervals();
131 /// let mut next_interval = || intervals.next().unwrap();
132 ///
133 /// assert_eq!(next_interval().workers_count, 1);
134 /// }
135 /// ```
136 ///
137 /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`]; e.g.:
138 /// ```
139 /// use tokio::runtime::Handle;
140 ///
141 /// #[tokio::main]
142 /// async fn main() {
143 /// let handle = Handle::current();
144 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
145 /// let mut intervals = monitor.intervals();
146 /// let mut next_interval = || intervals.next().unwrap();
147 ///
148 /// assert_eq!(next_interval().workers_count, handle.metrics().num_workers());
149 /// }
150 /// ```
151 pub workers_count: usize,
152
153 /// The current number of alive tasks in the runtime.
154 ///
155 /// ##### Definition
156 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_alive_tasks`].
157 pub live_tasks_count: usize,
158
159 /// The number of times worker threads parked.
160 ///
161 /// The worker park count increases by one each time the worker parks the thread waiting for
162 /// new inbound events to process. This usually means the worker has processed all pending work
163 /// and is currently idle.
164 ///
165 /// ##### Definition
166 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_park_count`]
167 /// across all worker threads.
168 ///
169 /// ##### See also
170 /// - [`RuntimeMetrics::max_park_count`]
171 /// - [`RuntimeMetrics::min_park_count`]
172 ///
173 /// ##### Examples
174 /// ```
175 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
176 /// async fn main() {
177 /// let handle = tokio::runtime::Handle::current();
178 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
179 /// let mut intervals = monitor.intervals();
180 /// let mut next_interval = || intervals.next().unwrap();
181 ///
182 /// let interval = next_interval(); // end of interval 1
183 /// assert_eq!(interval.total_park_count, 0);
184 ///
185 /// induce_parks().await;
186 ///
187 /// let interval = next_interval(); // end of interval 2
188 /// assert!(interval.total_park_count >= 1); // usually 1 or 2 parks
189 /// }
190 ///
191 /// async fn induce_parks() {
192 /// let _ = tokio::time::timeout(std::time::Duration::ZERO, async {
193 /// loop { tokio::task::yield_now().await; }
194 /// }).await;
195 /// }
196 /// ```
197 pub total_park_count: u64,
198
199 /// The maximum number of times any worker thread parked.
200 ///
201 /// ##### Definition
202 /// This metric is derived from the maximum of
203 /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
204 ///
205 /// ##### See also
206 /// - [`RuntimeMetrics::total_park_count`]
207 /// - [`RuntimeMetrics::min_park_count`]
208 pub max_park_count: u64,
209
210 /// The minimum number of times any worker thread parked.
211 ///
212 /// ##### Definition
213 /// This metric is derived from the maximum of
214 /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
215 ///
216 /// ##### See also
217 /// - [`RuntimeMetrics::total_park_count`]
218 /// - [`RuntimeMetrics::max_park_count`]
219 pub min_park_count: u64,
220
221 /// The amount of time worker threads were busy.
222 ///
223 /// The worker busy duration increases whenever the worker is spending time processing work.
224 /// Using this value can indicate the total load of workers.
225 ///
226 /// ##### Definition
227 /// This metric is derived from the sum of
228 /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
229 ///
230 /// ##### See also
231 /// - [`RuntimeMetrics::min_busy_duration`]
232 /// - [`RuntimeMetrics::max_busy_duration`]
233 ///
234 /// ##### Examples
235 /// In the below example, tasks spend a total of 3s busy:
236 /// ```
237 /// use tokio::time::Duration;
238 ///
239 /// fn main() {
240 /// let start = tokio::time::Instant::now();
241 ///
242 /// let rt = tokio::runtime::Builder::new_current_thread()
243 /// .enable_all()
244 /// .build()
245 /// .unwrap();
246 ///
247 /// let handle = rt.handle();
248 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
249 /// let mut intervals = monitor.intervals();
250 /// let mut next_interval = || intervals.next().unwrap();
251 ///
252 /// let delay_1s = Duration::from_secs(1);
253 /// let delay_3s = Duration::from_secs(3);
254 ///
255 /// rt.block_on(async {
256 /// // keep the main task busy for 1s
257 /// spin_for(delay_1s);
258 ///
259 /// // spawn a task and keep it busy for 2s
260 /// let _ = tokio::spawn(async move {
261 /// spin_for(delay_3s);
262 /// }).await;
263 /// });
264 ///
265 /// // flush metrics
266 /// drop(rt);
267 ///
268 /// let elapsed = start.elapsed();
269 ///
270 /// let interval = next_interval(); // end of interval 2
271 /// assert!(interval.total_busy_duration >= delay_1s + delay_3s);
272 /// assert!(interval.total_busy_duration <= elapsed);
273 /// }
274 ///
275 /// fn time<F>(task: F) -> Duration
276 /// where
277 /// F: Fn() -> ()
278 /// {
279 /// let start = tokio::time::Instant::now();
280 /// task();
281 /// start.elapsed()
282 /// }
283 ///
284 /// /// Block the current thread for a given `duration`.
285 /// fn spin_for(duration: Duration) {
286 /// let start = tokio::time::Instant::now();
287 /// while start.elapsed() <= duration {}
288 /// }
289 /// ```
290 ///
291 /// Busy times may not accumulate as the above example suggests (FIXME: Why?); e.g., if we
292 /// remove the three second delay, the time spent busy falls to mere microseconds:
293 /// ```should_panic
294 /// use tokio::time::Duration;
295 ///
296 /// fn main() {
297 /// let rt = tokio::runtime::Builder::new_current_thread()
298 /// .enable_all()
299 /// .build()
300 /// .unwrap();
301 ///
302 /// let handle = rt.handle();
303 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
304 /// let mut intervals = monitor.intervals();
305 /// let mut next_interval = || intervals.next().unwrap();
306 ///
307 /// let delay_1s = Duration::from_secs(1);
308 ///
309 /// let elapsed = time(|| rt.block_on(async {
310 /// // keep the main task busy for 1s
311 /// spin_for(delay_1s);
312 /// }));
313 ///
314 /// // flush metrics
315 /// drop(rt);
316 ///
317 /// let interval = next_interval(); // end of interval 2
318 /// assert!(interval.total_busy_duration >= delay_1s); // FAIL
319 /// assert!(interval.total_busy_duration <= elapsed);
320 /// }
321 ///
322 /// fn time<F>(task: F) -> Duration
323 /// where
324 /// F: Fn() -> ()
325 /// {
326 /// let start = tokio::time::Instant::now();
327 /// task();
328 /// start.elapsed()
329 /// }
330 ///
331 /// /// Block the current thread for a given `duration`.
332 /// fn spin_for(duration: Duration) {
333 /// let start = tokio::time::Instant::now();
334 /// while start.elapsed() <= duration {}
335 /// }
336 /// ```
337 pub total_busy_duration: Duration,
338
339 /// The maximum amount of time a worker thread was busy.
340 ///
341 /// ##### Definition
342 /// This metric is derived from the maximum of
343 /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
344 ///
345 /// ##### See also
346 /// - [`RuntimeMetrics::total_busy_duration`]
347 /// - [`RuntimeMetrics::min_busy_duration`]
348 pub max_busy_duration: Duration,
349
350 /// The minimum amount of time a worker thread was busy.
351 ///
352 /// ##### Definition
353 /// This metric is derived from the minimum of
354 /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
355 ///
356 /// ##### See also
357 /// - [`RuntimeMetrics::total_busy_duration`]
358 /// - [`RuntimeMetrics::max_busy_duration`]
359 pub min_busy_duration: Duration,
360
361 /// The number of tasks currently scheduled in the runtime's global queue.
362 ///
363 /// Tasks that are spawned or notified from a non-runtime thread are scheduled using the
364 /// runtime's global queue. This metric returns the **current** number of tasks pending in
365 /// the global queue. As such, the returned value may increase or decrease as new tasks are
366 /// scheduled and processed.
367 ///
368 /// ##### Definition
369 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::global_queue_depth`].
370 ///
371 /// ##### Example
372 /// ```
373 /// # let current_thread = tokio::runtime::Builder::new_current_thread()
374 /// # .enable_all()
375 /// # .build()
376 /// # .unwrap();
377 /// #
378 /// # let multi_thread = tokio::runtime::Builder::new_multi_thread()
379 /// # .worker_threads(2)
380 /// # .enable_all()
381 /// # .build()
382 /// # .unwrap();
383 /// #
384 /// # for runtime in [current_thread, multi_thread] {
385 /// let handle = runtime.handle().clone();
386 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
387 /// let mut intervals = monitor.intervals();
388 /// let mut next_interval = || intervals.next().unwrap();
389 ///
390 /// let interval = next_interval(); // end of interval 1
391 /// # #[cfg(tokio_unstable)]
392 /// assert_eq!(interval.num_remote_schedules, 0);
393 ///
394 /// // spawn a system thread outside of the runtime
395 /// std::thread::spawn(move || {
396 /// // spawn two tasks from this non-runtime thread
397 /// handle.spawn(async {});
398 /// handle.spawn(async {});
399 /// }).join().unwrap();
400 ///
401 /// // flush metrics
402 /// drop(runtime);
403 ///
404 /// let interval = next_interval(); // end of interval 2
405 /// # #[cfg(tokio_unstable)]
406 /// assert_eq!(interval.num_remote_schedules, 2);
407 /// # }
408 /// ```
409 pub global_queue_depth: usize,
410
411 /// Total amount of time elapsed since observing runtime metrics.
412 pub elapsed: Duration,
413 }
414 unstable {
415 /// The average duration of a single invocation of poll on a task.
416 ///
417 /// This average is an exponentially-weighted moving average of the duration
418 /// of task polls on all runtime workers.
419 ///
420 /// ##### Examples
421 /// ```
422 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
423 /// async fn main() {
424 /// let handle = tokio::runtime::Handle::current();
425 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
426 /// let mut intervals = monitor.intervals();
427 /// let mut next_interval = || intervals.next().unwrap();
428 ///
429 /// let interval = next_interval();
430 /// println!("mean task poll duration is {:?}", interval.mean_poll_duration);
431 /// }
432 /// ```
433 pub mean_poll_duration: Duration,
434
435 /// The average duration of a single invocation of poll on a task on the
436 /// worker with the lowest value.
437 ///
438 /// This average is an exponentially-weighted moving average of the duration
439 /// of task polls on the runtime worker with the lowest value.
440 ///
441 /// ##### Examples
442 /// ```
443 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
444 /// async fn main() {
445 /// let handle = tokio::runtime::Handle::current();
446 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
447 /// let mut intervals = monitor.intervals();
448 /// let mut next_interval = || intervals.next().unwrap();
449 ///
450 /// let interval = next_interval();
451 /// println!("min mean task poll duration is {:?}", interval.mean_poll_duration_worker_min);
452 /// }
453 /// ```
454 pub mean_poll_duration_worker_min: Duration,
455
456 /// The average duration of a single invocation of poll on a task on the
457 /// worker with the highest value.
458 ///
459 /// This average is an exponentially-weighted moving average of the duration
460 /// of task polls on the runtime worker with the highest value.
461 ///
462 /// ##### Examples
463 /// ```
464 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
465 /// async fn main() {
466 /// let handle = tokio::runtime::Handle::current();
467 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
468 /// let mut intervals = monitor.intervals();
469 /// let mut next_interval = || intervals.next().unwrap();
470 ///
471 /// let interval = next_interval();
472 /// println!("max mean task poll duration is {:?}", interval.mean_poll_duration_worker_max);
473 /// }
474 /// ```
475 pub mean_poll_duration_worker_max: Duration,
476
477 /// A histogram of task polls since the previous probe grouped by poll
478 /// times.
479 ///
480 /// This metric must be explicitly enabled when creating the runtime with
481 /// [`enable_metrics_poll_time_histogram`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
482 /// Bucket sizes are fixed and configured at the runtime level. See
483 /// configuration options on
484 /// [`runtime::Builder`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
485 ///
486 /// ##### Examples
487 /// ```
488 /// use tokio::runtime::HistogramConfiguration;
489 /// use std::time::Duration;
490 ///
491 /// let config = HistogramConfiguration::linear(Duration::from_micros(50), 12);
492 ///
493 /// let rt = tokio::runtime::Builder::new_multi_thread()
494 /// .enable_metrics_poll_time_histogram()
495 /// .metrics_poll_time_histogram_configuration(config)
496 /// .build()
497 /// .unwrap();
498 ///
499 /// rt.block_on(async {
500 /// let handle = tokio::runtime::Handle::current();
501 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
502 /// let mut intervals = monitor.intervals();
503 /// let mut next_interval = || intervals.next().unwrap();
504 ///
505 /// let interval = next_interval();
506 /// println!("poll count histogram {:?}", interval.poll_time_histogram);
507 /// });
508 /// ```
509 pub poll_time_histogram: Vec<u64>,
510
511 /// The number of times worker threads unparked but performed no work before parking again.
512 ///
513 /// The worker no-op count increases by one each time the worker unparks the thread but finds
514 /// no new work and goes back to sleep. This indicates a false-positive wake up.
515 ///
516 /// ##### Definition
517 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_noop_count`]
518 /// across all worker threads.
519 ///
520 /// ##### Examples
521 /// Unfortunately, there isn't a great way to reliably induce no-op parks, as they occur as
522 /// false-positive events under concurrency.
523 ///
524 /// The below example triggers fewer than two parks in the single-threaded runtime:
525 /// ```
526 /// #[tokio::main(flavor = "current_thread")]
527 /// async fn main() {
528 /// let handle = tokio::runtime::Handle::current();
529 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
530 /// let mut intervals = monitor.intervals();
531 /// let mut next_interval = || intervals.next().unwrap();
532 ///
533 /// assert_eq!(next_interval().total_park_count, 0);
534 ///
535 /// async {
536 /// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
537 /// }.await;
538 ///
539 /// assert!(next_interval().total_park_count > 0);
540 /// }
541 /// ```
542 ///
543 /// The below example triggers fewer than two parks in the multi-threaded runtime:
544 /// ```
545 /// #[tokio::main(flavor = "multi_thread")]
546 /// async fn main() {
547 /// let handle = tokio::runtime::Handle::current();
548 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
549 /// let mut intervals = monitor.intervals();
550 /// let mut next_interval = || intervals.next().unwrap();
551 ///
552 /// assert_eq!(next_interval().total_noop_count, 0);
553 ///
554 /// async {
555 /// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
556 /// }.await;
557 ///
558 /// assert!(next_interval().total_noop_count > 0);
559 /// }
560 /// ```
561 pub total_noop_count: u64,
562
563 /// The maximum number of times any worker thread unparked but performed no work before parking
564 /// again.
565 ///
566 /// ##### Definition
567 /// This metric is derived from the maximum of
568 /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
569 ///
570 /// ##### See also
571 /// - [`RuntimeMetrics::total_noop_count`]
572 /// - [`RuntimeMetrics::min_noop_count`]
573 pub max_noop_count: u64,
574
575 /// The minimum number of times any worker thread unparked but performed no work before parking
576 /// again.
577 ///
578 /// ##### Definition
579 /// This metric is derived from the minimum of
580 /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
581 ///
582 /// ##### See also
583 /// - [`RuntimeMetrics::total_noop_count`]
584 /// - [`RuntimeMetrics::max_noop_count`]
585 pub min_noop_count: u64,
586
587 /// The number of tasks worker threads stole from another worker thread.
588 ///
589 /// The worker steal count increases by the amount of stolen tasks each time the worker
590 /// has processed its scheduled queue and successfully steals more pending tasks from another
591 /// worker.
592 ///
593 /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
594 /// using the current thread runtime.
595 ///
596 /// ##### Definition
597 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] for
598 /// all worker threads.
599 ///
600 /// ##### See also
601 /// - [`RuntimeMetrics::min_steal_count`]
602 /// - [`RuntimeMetrics::max_steal_count`]
603 ///
604 /// ##### Examples
605 /// In the below example, a blocking channel is used to backup one worker thread:
606 /// ```
607 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
608 /// async fn main() {
609 /// let handle = tokio::runtime::Handle::current();
610 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
611 /// let mut intervals = monitor.intervals();
612 /// let mut next_interval = || intervals.next().unwrap();
613 ///
614 /// let interval = next_interval(); // end of first sampling interval
615 /// assert_eq!(interval.total_steal_count, 0);
616 /// assert_eq!(interval.min_steal_count, 0);
617 /// assert_eq!(interval.max_steal_count, 0);
618 ///
619 /// // induce a steal
620 /// async {
621 /// let (tx, rx) = std::sync::mpsc::channel();
622 /// // Move to the runtime.
623 /// tokio::spawn(async move {
624 /// // Spawn the task that sends to the channel
625 /// tokio::spawn(async move {
626 /// tx.send(()).unwrap();
627 /// });
628 /// // Spawn a task that bumps the previous task out of the "next
629 /// // scheduled" slot.
630 /// tokio::spawn(async {});
631 /// // Blocking receive on the channel.
632 /// rx.recv().unwrap();
633 /// flush_metrics().await;
634 /// }).await.unwrap();
635 /// flush_metrics().await;
636 /// }.await;
637 ///
638 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
639 /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
640 ///
641 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3
642 /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
643 /// }
644 ///
645 /// async fn flush_metrics() {
646 /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
647 /// }
648 /// ```
649 pub total_steal_count: u64,
650
651 /// The maximum number of tasks any worker thread stole from another worker thread.
652 ///
653 /// ##### Definition
654 /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
655 /// across all worker threads.
656 ///
657 /// ##### See also
658 /// - [`RuntimeMetrics::total_steal_count`]
659 /// - [`RuntimeMetrics::min_steal_count`]
660 pub max_steal_count: u64,
661
662 /// The minimum number of tasks any worker thread stole from another worker thread.
663 ///
664 /// ##### Definition
665 /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
666 /// across all worker threads.
667 ///
668 /// ##### See also
669 /// - [`RuntimeMetrics::total_steal_count`]
670 /// - [`RuntimeMetrics::max_steal_count`]
671 pub min_steal_count: u64,
672
673 /// The number of times worker threads stole tasks from another worker thread.
674 ///
675 /// The worker steal operations increases by one each time the worker has processed its
676 /// scheduled queue and successfully steals more pending tasks from another worker.
677 ///
678 /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
679 /// using the current thread runtime.
680 ///
681 /// ##### Definition
682 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
683 /// for all worker threads.
684 ///
685 /// ##### See also
686 /// - [`RuntimeMetrics::min_steal_operations`]
687 /// - [`RuntimeMetrics::max_steal_operations`]
688 ///
689 /// ##### Examples
690 /// In the below example, a blocking channel is used to backup one worker thread:
691 /// ```
692 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
693 /// async fn main() {
694 /// let handle = tokio::runtime::Handle::current();
695 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
696 /// let mut intervals = monitor.intervals();
697 /// let mut next_interval = || intervals.next().unwrap();
698 ///
699 /// let interval = next_interval(); // end of first sampling interval
700 /// assert_eq!(interval.total_steal_operations, 0);
701 /// assert_eq!(interval.min_steal_operations, 0);
702 /// assert_eq!(interval.max_steal_operations, 0);
703 ///
704 /// // induce a steal
705 /// async {
706 /// let (tx, rx) = std::sync::mpsc::channel();
707 /// // Move to the runtime.
708 /// tokio::spawn(async move {
709 /// // Spawn the task that sends to the channel
710 /// tokio::spawn(async move {
711 /// tx.send(()).unwrap();
712 /// });
713 /// // Spawn a task that bumps the previous task out of the "next
714 /// // scheduled" slot.
715 /// tokio::spawn(async {});
716 /// // Blocking receive on the channe.
717 /// rx.recv().unwrap();
718 /// flush_metrics().await;
719 /// }).await.unwrap();
720 /// flush_metrics().await;
721 /// }.await;
722 ///
723 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
724 /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
725 ///
726 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3
727 /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
728 /// }
729 ///
730 /// async fn flush_metrics() {
731 /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
732 /// }
733 /// ```
734 pub total_steal_operations: u64,
735
736 /// The maximum number of times any worker thread stole tasks from another worker thread.
737 ///
738 /// ##### Definition
739 /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
740 /// across all worker threads.
741 ///
742 /// ##### See also
743 /// - [`RuntimeMetrics::total_steal_operations`]
744 /// - [`RuntimeMetrics::min_steal_operations`]
745 pub max_steal_operations: u64,
746
747 /// The minimum number of times any worker thread stole tasks from another worker thread.
748 ///
749 /// ##### Definition
750 /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
751 /// across all worker threads.
752 ///
753 /// ##### See also
754 /// - [`RuntimeMetrics::total_steal_operations`]
755 /// - [`RuntimeMetrics::max_steal_operations`]
756 pub min_steal_operations: u64,
757
758 /// The number of tasks scheduled from **outside** of the runtime.
759 ///
760 /// The remote schedule count increases by one each time a task is woken from **outside** of
761 /// the runtime. This usually means that a task is spawned or notified from a non-runtime
762 /// thread and must be queued using the Runtime's global queue, which tends to be slower.
763 ///
764 /// ##### Definition
765 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::remote_schedule_count`].
766 ///
767 /// ##### Examples
768 /// In the below example, a remote schedule is induced by spawning a system thread, then
769 /// spawning a tokio task from that system thread:
770 /// ```
771 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
772 /// async fn main() {
773 /// let handle = tokio::runtime::Handle::current();
774 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
775 /// let mut intervals = monitor.intervals();
776 /// let mut next_interval = || intervals.next().unwrap();
777 ///
778 /// let interval = next_interval(); // end of first sampling interval
779 /// assert_eq!(interval.num_remote_schedules, 0);
780 ///
781 /// // spawn a non-runtime thread
782 /// std::thread::spawn(move || {
783 /// // spawn two tasks from this non-runtime thread
784 /// async move {
785 /// handle.spawn(async {}).await;
786 /// handle.spawn(async {}).await;
787 /// }
788 /// }).join().unwrap().await;
789 ///
790 /// let interval = next_interval(); // end of second sampling interval
791 /// assert_eq!(interval.num_remote_schedules, 2);
792 ///
793 /// let interval = next_interval(); // end of third sampling interval
794 /// assert_eq!(interval.num_remote_schedules, 0);
795 /// }
796 /// ```
797 pub num_remote_schedules: u64,
798
799 /// The number of tasks scheduled from worker threads.
800 ///
801 /// The local schedule count increases by one each time a task is woken from **inside** of the
802 /// runtime. This usually means that a task is spawned or notified from within a runtime thread
803 /// and will be queued on the worker-local queue.
804 ///
805 /// ##### Definition
806 /// This metric is derived from the sum of
807 /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] across all worker threads.
808 ///
809 /// ##### See also
810 /// - [`RuntimeMetrics::min_local_schedule_count`]
811 /// - [`RuntimeMetrics::max_local_schedule_count`]
812 ///
813 /// ##### Examples
814 /// ###### With `current_thread` runtime
815 /// In the below example, two tasks are spawned from the context of a third tokio task:
816 /// ```
817 /// #[tokio::main(flavor = "current_thread")]
818 /// async fn main() {
819 /// let handle = tokio::runtime::Handle::current();
820 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
821 /// let mut intervals = monitor.intervals();
822 /// let mut next_interval = || intervals.next().unwrap();
823 ///
824 /// let interval = { flush_metrics().await; next_interval() }; // end interval 2
825 /// assert_eq!(interval.total_local_schedule_count, 0);
826 ///
827 /// let task = async {
828 /// tokio::spawn(async {}); // local schedule 1
829 /// tokio::spawn(async {}); // local schedule 2
830 /// };
831 ///
832 /// let handle = tokio::spawn(task); // local schedule 3
833 ///
834 /// let interval = { flush_metrics().await; next_interval() }; // end interval 2
835 /// assert_eq!(interval.total_local_schedule_count, 3);
836 ///
837 /// let _ = handle.await;
838 ///
839 /// let interval = { flush_metrics().await; next_interval() }; // end interval 3
840 /// assert_eq!(interval.total_local_schedule_count, 0);
841 /// }
842 ///
843 /// async fn flush_metrics() {
844 /// tokio::task::yield_now().await;
845 /// }
846 /// ```
847 ///
848 /// ###### With `multi_thread` runtime
849 /// In the below example, 100 tasks are spawned:
850 /// ```
851 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
852 /// async fn main() {
853 /// let handle = tokio::runtime::Handle::current();
854 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
855 /// let mut intervals = monitor.intervals();
856 /// let mut next_interval = || intervals.next().unwrap();
857 ///
858 /// let interval = next_interval(); // end of interval 1
859 /// assert_eq!(interval.total_local_schedule_count, 0);
860 ///
861 /// use std::sync::atomic::{AtomicBool, Ordering};
862 /// static SPINLOCK: AtomicBool = AtomicBool::new(true);
863 ///
864 /// // block the other worker thread
865 /// tokio::spawn(async {
866 /// while SPINLOCK.load(Ordering::SeqCst) {}
867 /// });
868 ///
869 /// // FIXME: why does this need to be in a `spawn`?
870 /// let _ = tokio::spawn(async {
871 /// // spawn 100 tasks
872 /// for _ in 0..100 {
873 /// tokio::spawn(async {});
874 /// }
875 /// // this spawns 1 more task
876 /// flush_metrics().await;
877 /// }).await;
878 ///
879 /// // unblock the other worker thread
880 /// SPINLOCK.store(false, Ordering::SeqCst);
881 ///
882 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
883 /// assert_eq!(interval.total_local_schedule_count, 100 + 1);
884 /// }
885 ///
886 /// async fn flush_metrics() {
887 /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
888 /// }
889 /// ```
890 pub total_local_schedule_count: u64,
891
892 /// The maximum number of tasks scheduled from any one worker thread.
893 ///
894 /// ##### Definition
895 /// This metric is derived from the maximum of
896 /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
897 ///
898 /// ##### See also
899 /// - [`RuntimeMetrics::total_local_schedule_count`]
900 /// - [`RuntimeMetrics::min_local_schedule_count`]
901 pub max_local_schedule_count: u64,
902
903 /// The minimum number of tasks scheduled from any one worker thread.
904 ///
905 /// ##### Definition
906 /// This metric is derived from the minimum of
907 /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
908 ///
909 /// ##### See also
910 /// - [`RuntimeMetrics::total_local_schedule_count`]
911 /// - [`RuntimeMetrics::max_local_schedule_count`]
912 pub min_local_schedule_count: u64,
913
914 /// The number of times worker threads saturated their local queues.
915 ///
916 /// The worker steal count increases by one each time the worker attempts to schedule a task
917 /// locally, but its local queue is full. When this happens, half of the
918 /// local queue is moved to the global queue.
919 ///
920 /// This metric only applies to the **multi-threaded** scheduler.
921 ///
922 /// ##### Definition
923 /// This metric is derived from the sum of
924 /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
925 ///
926 /// ##### See also
927 /// - [`RuntimeMetrics::min_overflow_count`]
928 /// - [`RuntimeMetrics::max_overflow_count`]
929 ///
930 /// ##### Examples
931 /// ```
932 /// #[tokio::main(flavor = "multi_thread", worker_threads = 1)]
933 /// async fn main() {
934 /// let handle = tokio::runtime::Handle::current();
935 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
936 /// let mut intervals = monitor.intervals();
937 /// let mut next_interval = || intervals.next().unwrap();
938 ///
939 /// let interval = next_interval(); // end of interval 1
940 /// assert_eq!(interval.total_overflow_count, 0);
941 ///
942 /// use std::sync::atomic::{AtomicBool, Ordering};
943 ///
944 /// // spawn a ton of tasks
945 /// let _ = tokio::spawn(async {
946 /// // we do this in a `tokio::spawn` because it is impossible to
947 /// // overflow the main task
948 /// for _ in 0..300 {
949 /// tokio::spawn(async {});
950 /// }
951 /// }).await;
952 ///
953 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
954 /// assert_eq!(interval.total_overflow_count, 1);
955 /// }
956 ///
957 /// async fn flush_metrics() {
958 /// let _ = tokio::time::sleep(std::time::Duration::from_millis(1)).await;
959 /// }
960 /// ```
961 pub total_overflow_count: u64,
962
963 /// The maximum number of times any one worker saturated its local queue.
964 ///
965 /// ##### Definition
966 /// This metric is derived from the maximum of
967 /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
968 ///
969 /// ##### See also
970 /// - [`RuntimeMetrics::total_overflow_count`]
971 /// - [`RuntimeMetrics::min_overflow_count`]
972 pub max_overflow_count: u64,
973
974 /// The minimum number of times any one worker saturated its local queue.
975 ///
976 /// ##### Definition
977 /// This metric is derived from the maximum of
978 /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
979 ///
980 /// ##### See also
981 /// - [`RuntimeMetrics::total_overflow_count`]
982 /// - [`RuntimeMetrics::max_overflow_count`]
983 pub min_overflow_count: u64,
984
985 /// The number of tasks that have been polled across all worker threads.
986 ///
987 /// The worker poll count increases by one each time a worker polls a scheduled task.
988 ///
989 /// ##### Definition
990 /// This metric is derived from the sum of
991 /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
992 ///
993 /// ##### See also
994 /// - [`RuntimeMetrics::min_polls_count`]
995 /// - [`RuntimeMetrics::max_polls_count`]
996 ///
997 /// ##### Examples
998 /// In the below example, 42 tasks are spawned and polled:
999 /// ```
1000 /// #[tokio::main(flavor = "current_thread")]
1001 /// async fn main() {
1002 /// let handle = tokio::runtime::Handle::current();
1003 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1004 /// let mut intervals = monitor.intervals();
1005 /// let mut next_interval = || intervals.next().unwrap();
1006 ///
1007 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 1
1008 /// assert_eq!(interval.total_polls_count, 0);
1009 /// assert_eq!(interval.min_polls_count, 0);
1010 /// assert_eq!(interval.max_polls_count, 0);
1011 ///
1012 /// const N: u64 = 42;
1013 ///
1014 /// for _ in 0..N {
1015 /// let _ = tokio::spawn(async {}).await;
1016 /// }
1017 ///
1018 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
1019 /// assert_eq!(interval.total_polls_count, N);
1020 /// assert_eq!(interval.min_polls_count, N);
1021 /// assert_eq!(interval.max_polls_count, N);
1022 /// }
1023 ///
1024 /// async fn flush_metrics() {
1025 /// let _ = tokio::task::yield_now().await;
1026 /// }
1027 /// ```
1028 pub total_polls_count: u64,
1029
1030 /// The maximum number of tasks that have been polled in any worker thread.
1031 ///
1032 /// ##### Definition
1033 /// This metric is derived from the maximum of
1034 /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1035 ///
1036 /// ##### See also
1037 /// - [`RuntimeMetrics::total_polls_count`]
1038 /// - [`RuntimeMetrics::min_polls_count`]
1039 pub max_polls_count: u64,
1040
1041 /// The minimum number of tasks that have been polled in any worker thread.
1042 ///
1043 /// ##### Definition
1044 /// This metric is derived from the minimum of
1045 /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1046 ///
1047 /// ##### See also
1048 /// - [`RuntimeMetrics::total_polls_count`]
1049 /// - [`RuntimeMetrics::max_polls_count`]
1050 pub min_polls_count: u64,
1051
1052 /// The total number of tasks currently scheduled in workers' local queues.
1053 ///
1054 /// Tasks that are spawned or notified from within a runtime thread are scheduled using that
1055 /// worker's local queue. This metric returns the **current** number of tasks pending in all
1056 /// workers' local queues. As such, the returned value may increase or decrease as new tasks
1057 /// are scheduled and processed.
1058 ///
1059 /// ##### Definition
1060 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`].
1061 ///
1062 /// ##### See also
1063 /// - [`RuntimeMetrics::min_local_queue_depth`]
1064 /// - [`RuntimeMetrics::max_local_queue_depth`]
1065 ///
1066 /// ##### Example
1067 ///
1068 /// ###### With `current_thread` runtime
1069 /// The below example spawns 100 tasks:
1070 /// ```
1071 /// #[tokio::main(flavor = "current_thread")]
1072 /// async fn main() {
1073 /// const N: usize = 100;
1074 ///
1075 /// let handle = tokio::runtime::Handle::current();
1076 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1077 /// let mut intervals = monitor.intervals();
1078 /// let mut next_interval = || intervals.next().unwrap();
1079 ///
1080 /// let interval = next_interval(); // end of interval 1
1081 /// assert_eq!(interval.total_local_queue_depth, 0);
1082 ///
1083 ///
1084 /// for _ in 0..N {
1085 /// tokio::spawn(async {});
1086 /// }
1087 /// let interval = next_interval(); // end of interval 2
1088 /// assert_eq!(interval.total_local_queue_depth, N);
1089 /// }
1090 /// ```
1091 ///
1092 /// ###### With `multi_thread` runtime
1093 /// The below example spawns 100 tasks:
1094 /// ```
1095 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
1096 /// async fn main() {
1097 /// const N: usize = 100;
1098 ///
1099 /// let handle = tokio::runtime::Handle::current();
1100 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1101 /// let mut intervals = monitor.intervals();
1102 /// let mut next_interval = || intervals.next().unwrap();
1103 ///
1104 /// let interval = next_interval(); // end of interval 1
1105 /// assert_eq!(interval.total_local_queue_depth, 0);
1106 ///
1107 /// use std::sync::atomic::{AtomicBool, Ordering};
1108 /// static SPINLOCK_A: AtomicBool = AtomicBool::new(true);
1109 ///
1110 /// // block the other worker thread
1111 /// tokio::spawn(async {
1112 /// while SPINLOCK_A.load(Ordering::SeqCst) {}
1113 /// });
1114 ///
1115 /// static SPINLOCK_B: AtomicBool = AtomicBool::new(true);
1116 ///
1117 /// let _ = tokio::spawn(async {
1118 /// for _ in 0..N {
1119 /// tokio::spawn(async {
1120 /// while SPINLOCK_B.load(Ordering::SeqCst) {}
1121 /// });
1122 /// }
1123 /// }).await;
1124 ///
1125 /// // unblock the other worker thread
1126 /// SPINLOCK_A.store(false, Ordering::SeqCst);
1127 ///
1128 /// let interval = next_interval(); // end of interval 2
1129 /// assert_eq!(interval.total_local_queue_depth, N - 1);
1130 ///
1131 /// SPINLOCK_B.store(false, Ordering::SeqCst);
1132 /// }
1133 /// ```
1134 pub total_local_queue_depth: usize,
1135
1136 /// The maximum number of tasks currently scheduled any worker's local queue.
1137 ///
1138 /// ##### Definition
1139 /// This metric is derived from the maximum of
1140 /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1141 ///
1142 /// ##### See also
1143 /// - [`RuntimeMetrics::total_local_queue_depth`]
1144 /// - [`RuntimeMetrics::min_local_queue_depth`]
1145 pub max_local_queue_depth: usize,
1146
1147 /// The minimum number of tasks currently scheduled any worker's local queue.
1148 ///
1149 /// ##### Definition
1150 /// This metric is derived from the minimum of
1151 /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1152 ///
1153 /// ##### See also
1154 /// - [`RuntimeMetrics::total_local_queue_depth`]
1155 /// - [`RuntimeMetrics::max_local_queue_depth`]
1156 pub min_local_queue_depth: usize,
1157
1158 /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool.
1159 ///
1160 /// ##### Definition
1161 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::blocking_queue_depth`].
1162 pub blocking_queue_depth: usize,
1163
1164 /// The number of additional threads spawned by the runtime.
1165 ///
1166 /// ##### Definition
1167 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_blocking_threads`].
1168 pub blocking_threads_count: usize,
1169
1170 /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls.
1171 ///
1172 /// ##### Definition
1173 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_idle_blocking_threads`].
1174 pub idle_blocking_threads_count: usize,
1175
1176 /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets.
1177 ///
1178 /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget.
1179 ///
1180 /// The counter is monotonically increasing. It is never decremented or reset to zero.
1181 ///
1182 /// ##### Definition
1183 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::budget_forced_yield_count`].
1184 pub budget_forced_yield_count: u64,
1185
1186 /// Returns the number of ready events processed by the runtime’s I/O driver.
1187 ///
1188 /// ##### Definition
1189 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::io_driver_ready_count`].
1190 pub io_driver_ready_count: u64,
1191 }
1192}
1193
1194macro_rules! define_semi_stable {
1195 (
1196 $(#[$($attributes:tt)*])*
1197 $vis:vis struct $name:ident {
1198 stable {
1199 $($stable_name:ident: $stable_ty:ty),*
1200 $(,)?
1201 }
1202 $(,)?
1203 unstable {
1204 $($unstable_name:ident: $unstable_ty:ty),*
1205 $(,)?
1206 }
1207 }
1208 ) => {
1209 $(#[$($attributes)*])*
1210 $vis struct $name {
1211 $(
1212 $stable_name: $stable_ty,
1213 )*
1214 $(
1215 #[cfg(tokio_unstable)]
1216 #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
1217 $unstable_name: $unstable_ty,
1218 )*
1219 }
1220 };
1221}
1222
1223define_semi_stable! {
1224 /// Snapshot of per-worker metrics
1225 #[derive(Debug, Default)]
1226 struct Worker {
1227 stable {
1228 worker: usize,
1229 total_park_count: u64,
1230 total_busy_duration: Duration,
1231 }
1232 unstable {
1233 total_noop_count: u64,
1234 total_steal_count: u64,
1235 total_steal_operations: u64,
1236 total_local_schedule_count: u64,
1237 total_overflow_count: u64,
1238 total_polls_count: u64,
1239 poll_time_histogram: Vec<u64>,
1240 }
1241 }
1242}
1243
1244define_semi_stable! {
1245 /// Iterator returned by [`RuntimeMonitor::intervals`].
1246 ///
1247 /// See that method's documentation for more details.
1248 #[derive(Debug)]
1249 pub struct RuntimeIntervals {
1250 stable {
1251 runtime: runtime::RuntimeMetrics,
1252 started_at: Instant,
1253 workers: Vec<Worker>,
1254 }
1255 unstable {
1256 // Number of tasks scheduled from *outside* of the runtime
1257 num_remote_schedules: u64,
1258 budget_forced_yield_count: u64,
1259 io_driver_ready_count: u64,
1260 }
1261 }
1262}
1263
1264impl RuntimeIntervals {
1265 fn probe(&mut self) -> RuntimeMetrics {
1266 let now = Instant::now();
1267
1268 let mut metrics = RuntimeMetrics {
1269 workers_count: self.runtime.num_workers(),
1270 live_tasks_count: self.runtime.num_alive_tasks(),
1271 elapsed: now - self.started_at,
1272 global_queue_depth: self.runtime.global_queue_depth(),
1273 min_park_count: u64::MAX,
1274 min_busy_duration: Duration::from_secs(1000000000),
1275 ..Default::default()
1276 };
1277
1278 #[cfg(tokio_unstable)]
1279 {
1280 let num_remote_schedules = self.runtime.remote_schedule_count();
1281 let budget_forced_yields = self.runtime.budget_forced_yield_count();
1282 let io_driver_ready_events = self.runtime.io_driver_ready_count();
1283
1284 metrics.num_remote_schedules = num_remote_schedules - self.num_remote_schedules;
1285 metrics.min_noop_count = u64::MAX;
1286 metrics.min_steal_count = u64::MAX;
1287 metrics.min_local_schedule_count = u64::MAX;
1288 metrics.min_overflow_count = u64::MAX;
1289 metrics.min_polls_count = u64::MAX;
1290 metrics.min_local_queue_depth = usize::MAX;
1291 metrics.mean_poll_duration_worker_min = Duration::MAX;
1292 metrics.poll_time_histogram = vec![0; self.runtime.poll_time_histogram_num_buckets()];
1293 metrics.budget_forced_yield_count =
1294 budget_forced_yields - self.budget_forced_yield_count;
1295 metrics.io_driver_ready_count = io_driver_ready_events - self.io_driver_ready_count;
1296
1297 self.num_remote_schedules = num_remote_schedules;
1298 self.budget_forced_yield_count = budget_forced_yields;
1299 self.io_driver_ready_count = io_driver_ready_events;
1300 }
1301 self.started_at = now;
1302
1303 for worker in &mut self.workers {
1304 worker.probe(&self.runtime, &mut metrics);
1305 }
1306
1307 #[cfg(tokio_unstable)]
1308 {
1309 if metrics.total_polls_count == 0 {
1310 debug_assert_eq!(metrics.mean_poll_duration, Duration::default());
1311
1312 metrics.mean_poll_duration_worker_max = Duration::default();
1313 metrics.mean_poll_duration_worker_min = Duration::default();
1314 }
1315 }
1316
1317 metrics
1318 }
1319}
1320
1321impl Iterator for RuntimeIntervals {
1322 type Item = RuntimeMetrics;
1323
1324 fn next(&mut self) -> Option<RuntimeMetrics> {
1325 Some(self.probe())
1326 }
1327}
1328
1329impl RuntimeMonitor {
1330 /// Creates a new [`RuntimeMonitor`].
1331 pub fn new(runtime: &runtime::Handle) -> RuntimeMonitor {
1332 let runtime = runtime.metrics();
1333
1334 RuntimeMonitor { runtime }
1335 }
1336
1337 /// Produces an unending iterator of [`RuntimeMetrics`].
1338 ///
1339 /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1340 /// produced by [`RuntimeMonitor::intervals`]. The item type of this iterator is [`RuntimeMetrics`],
1341 /// which is a bundle of runtime metrics that describe *only* changes occurring within that sampling
1342 /// interval.
1343 ///
1344 /// # Example
1345 ///
1346 /// ```
1347 /// use std::time::Duration;
1348 ///
1349 /// #[tokio::main]
1350 /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1351 /// let handle = tokio::runtime::Handle::current();
1352 /// // construct the runtime metrics monitor
1353 /// let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1354 ///
1355 /// // print runtime metrics every 500ms
1356 /// {
1357 /// tokio::spawn(async move {
1358 /// for interval in runtime_monitor.intervals() {
1359 /// // pretty-print the metric interval
1360 /// println!("{:?}", interval);
1361 /// // wait 500ms
1362 /// tokio::time::sleep(Duration::from_millis(500)).await;
1363 /// }
1364 /// });
1365 /// }
1366 ///
1367 /// // await some tasks
1368 /// tokio::join![
1369 /// do_work(),
1370 /// do_work(),
1371 /// do_work(),
1372 /// ];
1373 ///
1374 /// Ok(())
1375 /// }
1376 ///
1377 /// async fn do_work() {
1378 /// for _ in 0..25 {
1379 /// tokio::task::yield_now().await;
1380 /// tokio::time::sleep(Duration::from_millis(100)).await;
1381 /// }
1382 /// }
1383 /// ```
1384 pub fn intervals(&self) -> RuntimeIntervals {
1385 let started_at = Instant::now();
1386
1387 let workers = (0..self.runtime.num_workers())
1388 .map(|worker| Worker::new(worker, &self.runtime))
1389 .collect();
1390
1391 RuntimeIntervals {
1392 runtime: self.runtime.clone(),
1393 started_at,
1394 workers,
1395
1396 #[cfg(tokio_unstable)]
1397 num_remote_schedules: self.runtime.remote_schedule_count(),
1398 #[cfg(tokio_unstable)]
1399 budget_forced_yield_count: self.runtime.budget_forced_yield_count(),
1400 #[cfg(tokio_unstable)]
1401 io_driver_ready_count: self.runtime.io_driver_ready_count(),
1402 }
1403 }
1404}
1405
1406impl Worker {
1407 fn new(worker: usize, rt: &runtime::RuntimeMetrics) -> Worker {
1408 #[allow(unused_mut, clippy::needless_update)]
1409 let mut wrk = Worker {
1410 worker,
1411 total_park_count: rt.worker_park_count(worker),
1412 total_busy_duration: rt.worker_total_busy_duration(worker),
1413 ..Default::default()
1414 };
1415
1416 #[cfg(tokio_unstable)]
1417 {
1418 let poll_time_histogram = if rt.poll_time_histogram_enabled() {
1419 vec![0; rt.poll_time_histogram_num_buckets()]
1420 } else {
1421 vec![]
1422 };
1423 wrk.total_noop_count = rt.worker_noop_count(worker);
1424 wrk.total_steal_count = rt.worker_steal_count(worker);
1425 wrk.total_steal_operations = rt.worker_steal_operations(worker);
1426 wrk.total_local_schedule_count = rt.worker_local_schedule_count(worker);
1427 wrk.total_overflow_count = rt.worker_overflow_count(worker);
1428 wrk.total_polls_count = rt.worker_poll_count(worker);
1429 wrk.poll_time_histogram = poll_time_histogram;
1430 };
1431 wrk
1432 }
1433
1434 fn probe(&mut self, rt: &runtime::RuntimeMetrics, metrics: &mut RuntimeMetrics) {
1435 macro_rules! metric {
1436 ( $sum:ident, $max:ident, $min:ident, $probe:ident ) => {{
1437 let val = rt.$probe(self.worker);
1438 let delta = val - self.$sum;
1439 self.$sum = val;
1440
1441 metrics.$sum += delta;
1442
1443 if delta > metrics.$max {
1444 metrics.$max = delta;
1445 }
1446
1447 if delta < metrics.$min {
1448 metrics.$min = delta;
1449 }
1450 }};
1451 }
1452
1453 metric!(
1454 total_park_count,
1455 max_park_count,
1456 min_park_count,
1457 worker_park_count
1458 );
1459 metric!(
1460 total_busy_duration,
1461 max_busy_duration,
1462 min_busy_duration,
1463 worker_total_busy_duration
1464 );
1465
1466 #[cfg(tokio_unstable)]
1467 {
1468 let mut worker_polls_count = self.total_polls_count;
1469 let total_polls_count = metrics.total_polls_count;
1470
1471 metric!(
1472 total_noop_count,
1473 max_noop_count,
1474 min_noop_count,
1475 worker_noop_count
1476 );
1477 metric!(
1478 total_steal_count,
1479 max_steal_count,
1480 min_steal_count,
1481 worker_steal_count
1482 );
1483 metric!(
1484 total_steal_operations,
1485 max_steal_operations,
1486 min_steal_operations,
1487 worker_steal_operations
1488 );
1489 metric!(
1490 total_local_schedule_count,
1491 max_local_schedule_count,
1492 min_local_schedule_count,
1493 worker_local_schedule_count
1494 );
1495 metric!(
1496 total_overflow_count,
1497 max_overflow_count,
1498 min_overflow_count,
1499 worker_overflow_count
1500 );
1501 metric!(
1502 total_polls_count,
1503 max_polls_count,
1504 min_polls_count,
1505 worker_poll_count
1506 );
1507
1508 // Get the number of polls since last probe
1509 worker_polls_count = self.total_polls_count - worker_polls_count;
1510
1511 // Update the mean task poll duration if there were polls
1512 if worker_polls_count > 0 {
1513 let val = rt.worker_mean_poll_time(self.worker);
1514
1515 if val > metrics.mean_poll_duration_worker_max {
1516 metrics.mean_poll_duration_worker_max = val;
1517 }
1518
1519 if val < metrics.mean_poll_duration_worker_min {
1520 metrics.mean_poll_duration_worker_min = val;
1521 }
1522
1523 // First, scale the current value down
1524 let ratio = total_polls_count as f64 / metrics.total_polls_count as f64;
1525 let mut mean = metrics.mean_poll_duration.as_nanos() as f64 * ratio;
1526
1527 // Add the scaled current worker's mean poll duration
1528 let ratio = worker_polls_count as f64 / metrics.total_polls_count as f64;
1529 mean += val.as_nanos() as f64 * ratio;
1530
1531 metrics.mean_poll_duration = Duration::from_nanos(mean as u64);
1532 }
1533
1534 // Update the histogram counts if there were polls since last count
1535 if worker_polls_count > 0 {
1536 for (bucket, cell) in metrics.poll_time_histogram.iter_mut().enumerate() {
1537 let new = rt.poll_time_histogram_bucket_count(self.worker, bucket);
1538 let delta = new - self.poll_time_histogram[bucket];
1539 self.poll_time_histogram[bucket] = new;
1540
1541 *cell += delta;
1542 }
1543 }
1544
1545 // Local scheduled tasks is an absolute value
1546 let local_scheduled_tasks = rt.worker_local_queue_depth(self.worker);
1547 metrics.total_local_queue_depth += local_scheduled_tasks;
1548
1549 if local_scheduled_tasks > metrics.max_local_queue_depth {
1550 metrics.max_local_queue_depth = local_scheduled_tasks;
1551 }
1552
1553 if local_scheduled_tasks < metrics.min_local_queue_depth {
1554 metrics.min_local_queue_depth = local_scheduled_tasks;
1555 }
1556
1557 // Blocking queue depth is an absolute value too
1558 metrics.blocking_queue_depth = rt.blocking_queue_depth();
1559
1560 metrics.blocking_threads_count = rt.num_blocking_threads();
1561 metrics.idle_blocking_threads_count = rt.num_idle_blocking_threads();
1562 }
1563 }
1564}
1565
1566derived_metrics!(
1567 [RuntimeMetrics] {
1568 stable {
1569 /// Returns the ratio of the [`RuntimeMetrics::total_busy_duration`] to the [`RuntimeMetrics::elapsed`].
1570 pub fn busy_ratio(&self) -> f64 {
1571 self.total_busy_duration.as_nanos() as f64 / self.elapsed.as_nanos() as f64
1572 }
1573 }
1574 unstable {
1575 /// Returns the ratio of the [`RuntimeMetrics::total_polls_count`] to the [`RuntimeMetrics::total_noop_count`].
1576 pub fn mean_polls_per_park(&self) -> f64 {
1577 let total_park_count = self.total_park_count - self.total_noop_count;
1578 if total_park_count == 0 {
1579 0.0
1580 } else {
1581 self.total_polls_count as f64 / total_park_count as f64
1582 }
1583 }
1584 }
1585 }
1586);