tokio_metrics/runtime.rs
1use std::time::{Duration, Instant};
2use tokio::runtime;
3
4#[cfg(feature = "metrics-rs-integration")]
5pub(crate) mod metrics_rs_integration;
6
7/// Monitors key metrics of the tokio runtime.
8///
9/// ### Usage
10/// ```
11/// use std::time::Duration;
12/// use tokio_metrics::RuntimeMonitor;
13///
14/// #[tokio::main]
15/// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
16/// let handle = tokio::runtime::Handle::current();
17///
18/// // print runtime metrics every 500ms
19/// {
20/// let runtime_monitor = RuntimeMonitor::new(&handle);
21/// tokio::spawn(async move {
22/// for interval in runtime_monitor.intervals() {
23/// // pretty-print the metric interval
24/// println!("{:?}", interval);
25/// // wait 500ms
26/// tokio::time::sleep(Duration::from_millis(500)).await;
27/// }
28/// });
29/// }
30///
31/// // await some tasks
32/// tokio::join![
33/// do_work(),
34/// do_work(),
35/// do_work(),
36/// ];
37///
38/// Ok(())
39/// }
40///
41/// async fn do_work() {
42/// for _ in 0..25 {
43/// tokio::task::yield_now().await;
44/// tokio::time::sleep(Duration::from_millis(100)).await;
45/// }
46/// }
47/// ```
48#[derive(Debug)]
49pub struct RuntimeMonitor {
50 /// Handle to the runtime
51 runtime: runtime::RuntimeMetrics,
52}
53
54macro_rules! define_runtime_metrics {
55 (
56 stable {
57 $(
58 $(#[$($attributes:tt)*])*
59 $vis:vis $name:ident: $ty:ty
60 ),*
61 $(,)?
62 }
63 unstable {
64 $(
65 $(#[$($unstable_attributes:tt)*])*
66 $unstable_vis:vis $unstable_name:ident: $unstable_ty:ty
67 ),*
68 $(,)?
69 }
70 ) => {
71 /// Key runtime metrics.
72 #[non_exhaustive]
73 #[derive(Default, Debug, Clone)]
74 pub struct RuntimeMetrics {
75 $(
76 $(#[$($attributes)*])*
77 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
78 $vis $name: $ty,
79 )*
80 $(
81 $(#[$($unstable_attributes)*])*
82 #[cfg(tokio_unstable)]
83 #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
84 $unstable_vis $unstable_name: $unstable_ty,
85 )*
86 }
87 };
88}
89
90define_runtime_metrics! {
91 stable {
92 /// The number of worker threads used by the runtime.
93 ///
94 /// This metric is static for a runtime.
95 ///
96 /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`].
97 /// When using the `current_thread` runtime, the return value is always `1`.
98 ///
99 /// The number of workers is set by configuring
100 /// [`worker_threads`][`tokio::runtime::Builder::worker_threads`] with
101 /// [`tokio::runtime::Builder`], or by parameterizing [`tokio::main`].
102 ///
103 /// ##### Examples
104 /// In the below example, the number of workers is set by parameterizing [`tokio::main`]:
105 /// ```
106 /// use tokio::runtime::Handle;
107 ///
108 /// #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
109 /// async fn main() {
110 /// let handle = tokio::runtime::Handle::current();
111 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
112 /// let mut intervals = monitor.intervals();
113 /// let mut next_interval = || intervals.next().unwrap();
114 ///
115 /// assert_eq!(next_interval().workers_count, 10);
116 /// }
117 /// ```
118 ///
119 /// [`tokio::main`]: https://docs.rs/tokio/latest/tokio/attr.main.html
120 ///
121 /// When using the `current_thread` runtime, the return value is always `1`; e.g.:
122 /// ```
123 /// use tokio::runtime::Handle;
124 ///
125 /// #[tokio::main(flavor = "current_thread")]
126 /// async fn main() {
127 /// let handle = tokio::runtime::Handle::current();
128 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
129 /// let mut intervals = monitor.intervals();
130 /// let mut next_interval = || intervals.next().unwrap();
131 ///
132 /// assert_eq!(next_interval().workers_count, 1);
133 /// }
134 /// ```
135 ///
136 /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`]; e.g.:
137 /// ```
138 /// use tokio::runtime::Handle;
139 ///
140 /// #[tokio::main]
141 /// async fn main() {
142 /// let handle = Handle::current();
143 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
144 /// let mut intervals = monitor.intervals();
145 /// let mut next_interval = || intervals.next().unwrap();
146 ///
147 /// assert_eq!(next_interval().workers_count, handle.metrics().num_workers());
148 /// }
149 /// ```
150 pub workers_count: usize,
151
152 /// The number of times worker threads parked.
153 ///
154 /// The worker park count increases by one each time the worker parks the thread waiting for
155 /// new inbound events to process. This usually means the worker has processed all pending work
156 /// and is currently idle.
157 ///
158 /// ##### Definition
159 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_park_count`]
160 /// across all worker threads.
161 ///
162 /// ##### See also
163 /// - [`RuntimeMetrics::max_park_count`]
164 /// - [`RuntimeMetrics::min_park_count`]
165 ///
166 /// ##### Examples
167 /// ```
168 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
169 /// async fn main() {
170 /// let handle = tokio::runtime::Handle::current();
171 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
172 /// let mut intervals = monitor.intervals();
173 /// let mut next_interval = || intervals.next().unwrap();
174 ///
175 /// let interval = next_interval(); // end of interval 1
176 /// assert_eq!(interval.total_park_count, 0);
177 ///
178 /// induce_parks().await;
179 ///
180 /// let interval = next_interval(); // end of interval 2
181 /// assert!(interval.total_park_count >= 1); // usually 1 or 2 parks
182 /// }
183 ///
184 /// async fn induce_parks() {
185 /// let _ = tokio::time::timeout(std::time::Duration::ZERO, async {
186 /// loop { tokio::task::yield_now().await; }
187 /// }).await;
188 /// }
189 /// ```
190 pub total_park_count: u64,
191
192 /// The maximum number of times any worker thread parked.
193 ///
194 /// ##### Definition
195 /// This metric is derived from the maximum of
196 /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
197 ///
198 /// ##### See also
199 /// - [`RuntimeMetrics::total_park_count`]
200 /// - [`RuntimeMetrics::min_park_count`]
201 pub max_park_count: u64,
202
203 /// The minimum number of times any worker thread parked.
204 ///
205 /// ##### Definition
206 /// This metric is derived from the maximum of
207 /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads.
208 ///
209 /// ##### See also
210 /// - [`RuntimeMetrics::total_park_count`]
211 /// - [`RuntimeMetrics::max_park_count`]
212 pub min_park_count: u64,
213
214 /// The amount of time worker threads were busy.
215 ///
216 /// The worker busy duration increases whenever the worker is spending time processing work.
217 /// Using this value can indicate the total load of workers.
218 ///
219 /// ##### Definition
220 /// This metric is derived from the sum of
221 /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
222 ///
223 /// ##### See also
224 /// - [`RuntimeMetrics::min_busy_duration`]
225 /// - [`RuntimeMetrics::max_busy_duration`]
226 ///
227 /// ##### Examples
228 /// In the below example, tasks spend a total of 3s busy:
229 /// ```
230 /// use tokio::time::Duration;
231 ///
232 /// fn main() {
233 /// let start = tokio::time::Instant::now();
234 ///
235 /// let rt = tokio::runtime::Builder::new_current_thread()
236 /// .enable_all()
237 /// .build()
238 /// .unwrap();
239 ///
240 /// let handle = rt.handle();
241 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
242 /// let mut intervals = monitor.intervals();
243 /// let mut next_interval = || intervals.next().unwrap();
244 ///
245 /// let delay_1s = Duration::from_secs(1);
246 /// let delay_3s = Duration::from_secs(3);
247 ///
248 /// rt.block_on(async {
249 /// // keep the main task busy for 1s
250 /// spin_for(delay_1s);
251 ///
252 /// // spawn a task and keep it busy for 2s
253 /// let _ = tokio::spawn(async move {
254 /// spin_for(delay_3s);
255 /// }).await;
256 /// });
257 ///
258 /// // flush metrics
259 /// drop(rt);
260 ///
261 /// let elapsed = start.elapsed();
262 ///
263 /// let interval = next_interval(); // end of interval 2
264 /// assert!(interval.total_busy_duration >= delay_1s + delay_3s);
265 /// assert!(interval.total_busy_duration <= elapsed);
266 /// }
267 ///
268 /// fn time<F>(task: F) -> Duration
269 /// where
270 /// F: Fn() -> ()
271 /// {
272 /// let start = tokio::time::Instant::now();
273 /// task();
274 /// start.elapsed()
275 /// }
276 ///
277 /// /// Block the current thread for a given `duration`.
278 /// fn spin_for(duration: Duration) {
279 /// let start = tokio::time::Instant::now();
280 /// while start.elapsed() <= duration {}
281 /// }
282 /// ```
283 ///
284 /// Busy times may not accumulate as the above example suggests (FIXME: Why?); e.g., if we
285 /// remove the three second delay, the time spent busy falls to mere microseconds:
286 /// ```should_panic
287 /// use tokio::time::Duration;
288 ///
289 /// fn main() {
290 /// let rt = tokio::runtime::Builder::new_current_thread()
291 /// .enable_all()
292 /// .build()
293 /// .unwrap();
294 ///
295 /// let handle = rt.handle();
296 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
297 /// let mut intervals = monitor.intervals();
298 /// let mut next_interval = || intervals.next().unwrap();
299 ///
300 /// let delay_1s = Duration::from_secs(1);
301 ///
302 /// let elapsed = time(|| rt.block_on(async {
303 /// // keep the main task busy for 1s
304 /// spin_for(delay_1s);
305 /// }));
306 ///
307 /// // flush metrics
308 /// drop(rt);
309 ///
310 /// let interval = next_interval(); // end of interval 2
311 /// assert!(interval.total_busy_duration >= delay_1s); // FAIL
312 /// assert!(interval.total_busy_duration <= elapsed);
313 /// }
314 ///
315 /// fn time<F>(task: F) -> Duration
316 /// where
317 /// F: Fn() -> ()
318 /// {
319 /// let start = tokio::time::Instant::now();
320 /// task();
321 /// start.elapsed()
322 /// }
323 ///
324 /// /// Block the current thread for a given `duration`.
325 /// fn spin_for(duration: Duration) {
326 /// let start = tokio::time::Instant::now();
327 /// while start.elapsed() <= duration {}
328 /// }
329 /// ```
330 pub total_busy_duration: Duration,
331
332 /// The maximum amount of time a worker thread was busy.
333 ///
334 /// ##### Definition
335 /// This metric is derived from the maximum of
336 /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
337 ///
338 /// ##### See also
339 /// - [`RuntimeMetrics::total_busy_duration`]
340 /// - [`RuntimeMetrics::min_busy_duration`]
341 pub max_busy_duration: Duration,
342
343 /// The minimum amount of time a worker thread was busy.
344 ///
345 /// ##### Definition
346 /// This metric is derived from the minimum of
347 /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads.
348 ///
349 /// ##### See also
350 /// - [`RuntimeMetrics::total_busy_duration`]
351 /// - [`RuntimeMetrics::max_busy_duration`]
352 pub min_busy_duration: Duration,
353
354 /// The number of tasks currently scheduled in the runtime's global queue.
355 ///
356 /// Tasks that are spawned or notified from a non-runtime thread are scheduled using the
357 /// runtime's global queue. This metric returns the **current** number of tasks pending in
358 /// the global queue. As such, the returned value may increase or decrease as new tasks are
359 /// scheduled and processed.
360 ///
361 /// ##### Definition
362 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::global_queue_depth`].
363 ///
364 /// ##### Example
365 /// ```
366 /// # let current_thread = tokio::runtime::Builder::new_current_thread()
367 /// # .enable_all()
368 /// # .build()
369 /// # .unwrap();
370 /// #
371 /// # let multi_thread = tokio::runtime::Builder::new_multi_thread()
372 /// # .worker_threads(2)
373 /// # .enable_all()
374 /// # .build()
375 /// # .unwrap();
376 /// #
377 /// # for runtime in [current_thread, multi_thread] {
378 /// let handle = runtime.handle().clone();
379 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
380 /// let mut intervals = monitor.intervals();
381 /// let mut next_interval = || intervals.next().unwrap();
382 ///
383 /// let interval = next_interval(); // end of interval 1
384 /// # #[cfg(tokio_unstable)]
385 /// assert_eq!(interval.num_remote_schedules, 0);
386 ///
387 /// // spawn a system thread outside of the runtime
388 /// std::thread::spawn(move || {
389 /// // spawn two tasks from this non-runtime thread
390 /// handle.spawn(async {});
391 /// handle.spawn(async {});
392 /// }).join().unwrap();
393 ///
394 /// // flush metrics
395 /// drop(runtime);
396 ///
397 /// let interval = next_interval(); // end of interval 2
398 /// # #[cfg(tokio_unstable)]
399 /// assert_eq!(interval.num_remote_schedules, 2);
400 /// # }
401 /// ```
402 pub global_queue_depth: usize,
403
404 /// Total amount of time elapsed since observing runtime metrics.
405 pub elapsed: Duration,
406 }
407 unstable {
408 /// The average duration of a single invocation of poll on a task.
409 ///
410 /// This average is an exponentially-weighted moving average of the duration
411 /// of task polls on all runtime workers.
412 ///
413 /// ##### Examples
414 /// ```
415 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
416 /// async fn main() {
417 /// let handle = tokio::runtime::Handle::current();
418 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
419 /// let mut intervals = monitor.intervals();
420 /// let mut next_interval = || intervals.next().unwrap();
421 ///
422 /// let interval = next_interval();
423 /// println!("mean task poll duration is {:?}", interval.mean_poll_duration);
424 /// }
425 /// ```
426 pub mean_poll_duration: Duration,
427
428 /// The average duration of a single invocation of poll on a task on the
429 /// worker with the lowest value.
430 ///
431 /// This average is an exponentially-weighted moving average of the duration
432 /// of task polls on the runtime worker with the lowest value.
433 ///
434 /// ##### Examples
435 /// ```
436 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
437 /// async fn main() {
438 /// let handle = tokio::runtime::Handle::current();
439 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
440 /// let mut intervals = monitor.intervals();
441 /// let mut next_interval = || intervals.next().unwrap();
442 ///
443 /// let interval = next_interval();
444 /// println!("min mean task poll duration is {:?}", interval.mean_poll_duration_worker_min);
445 /// }
446 /// ```
447 pub mean_poll_duration_worker_min: Duration,
448
449 /// The average duration of a single invocation of poll on a task on the
450 /// worker with the highest value.
451 ///
452 /// This average is an exponentially-weighted moving average of the duration
453 /// of task polls on the runtime worker with the highest value.
454 ///
455 /// ##### Examples
456 /// ```
457 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
458 /// async fn main() {
459 /// let handle = tokio::runtime::Handle::current();
460 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
461 /// let mut intervals = monitor.intervals();
462 /// let mut next_interval = || intervals.next().unwrap();
463 ///
464 /// let interval = next_interval();
465 /// println!("max mean task poll duration is {:?}", interval.mean_poll_duration_worker_max);
466 /// }
467 /// ```
468 pub mean_poll_duration_worker_max: Duration,
469
470 /// A histogram of task polls since the previous probe grouped by poll
471 /// times.
472 ///
473 /// This metric must be explicitly enabled when creating the runtime with
474 /// [`enable_metrics_poll_time_histogram`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
475 /// Bucket sizes are fixed and configured at the runtime level. See
476 /// configuration options on
477 /// [`runtime::Builder`][tokio::runtime::Builder::enable_metrics_poll_time_histogram].
478 ///
479 /// ##### Examples
480 /// ```
481 /// use tokio::runtime::HistogramConfiguration;
482 /// use std::time::Duration;
483 ///
484 /// let config = HistogramConfiguration::linear(Duration::from_micros(50), 12);
485 ///
486 /// let rt = tokio::runtime::Builder::new_multi_thread()
487 /// .enable_metrics_poll_time_histogram()
488 /// .metrics_poll_time_histogram_configuration(config)
489 /// .build()
490 /// .unwrap();
491 ///
492 /// rt.block_on(async {
493 /// let handle = tokio::runtime::Handle::current();
494 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
495 /// let mut intervals = monitor.intervals();
496 /// let mut next_interval = || intervals.next().unwrap();
497 ///
498 /// let interval = next_interval();
499 /// println!("poll count histogram {:?}", interval.poll_time_histogram);
500 /// });
501 /// ```
502 pub poll_time_histogram: Vec<u64>,
503
504 /// The number of times worker threads unparked but performed no work before parking again.
505 ///
506 /// The worker no-op count increases by one each time the worker unparks the thread but finds
507 /// no new work and goes back to sleep. This indicates a false-positive wake up.
508 ///
509 /// ##### Definition
510 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_noop_count`]
511 /// across all worker threads.
512 ///
513 /// ##### Examples
514 /// Unfortunately, there isn't a great way to reliably induce no-op parks, as they occur as
515 /// false-positive events under concurrency.
516 ///
517 /// The below example triggers fewer than two parks in the single-threaded runtime:
518 /// ```
519 /// #[tokio::main(flavor = "current_thread")]
520 /// async fn main() {
521 /// let handle = tokio::runtime::Handle::current();
522 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
523 /// let mut intervals = monitor.intervals();
524 /// let mut next_interval = || intervals.next().unwrap();
525 ///
526 /// assert_eq!(next_interval().total_park_count, 0);
527 ///
528 /// async {
529 /// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
530 /// }.await;
531 ///
532 /// assert!(next_interval().total_park_count > 0);
533 /// }
534 /// ```
535 ///
536 /// The below example triggers fewer than two parks in the multi-threaded runtime:
537 /// ```
538 /// #[tokio::main(flavor = "multi_thread")]
539 /// async fn main() {
540 /// let handle = tokio::runtime::Handle::current();
541 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
542 /// let mut intervals = monitor.intervals();
543 /// let mut next_interval = || intervals.next().unwrap();
544 ///
545 /// assert_eq!(next_interval().total_noop_count, 0);
546 ///
547 /// async {
548 /// tokio::time::sleep(std::time::Duration::from_millis(1)).await;
549 /// }.await;
550 ///
551 /// assert!(next_interval().total_noop_count > 0);
552 /// }
553 /// ```
554 pub total_noop_count: u64,
555
556 /// The maximum number of times any worker thread unparked but performed no work before parking
557 /// again.
558 ///
559 /// ##### Definition
560 /// This metric is derived from the maximum of
561 /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
562 ///
563 /// ##### See also
564 /// - [`RuntimeMetrics::total_noop_count`]
565 /// - [`RuntimeMetrics::min_noop_count`]
566 pub max_noop_count: u64,
567
568 /// The minimum number of times any worker thread unparked but performed no work before parking
569 /// again.
570 ///
571 /// ##### Definition
572 /// This metric is derived from the minimum of
573 /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads.
574 ///
575 /// ##### See also
576 /// - [`RuntimeMetrics::total_noop_count`]
577 /// - [`RuntimeMetrics::max_noop_count`]
578 pub min_noop_count: u64,
579
580 /// The number of tasks worker threads stole from another worker thread.
581 ///
582 /// The worker steal count increases by the amount of stolen tasks each time the worker
583 /// has processed its scheduled queue and successfully steals more pending tasks from another
584 /// worker.
585 ///
586 /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
587 /// using the current thread runtime.
588 ///
589 /// ##### Definition
590 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] for
591 /// all worker threads.
592 ///
593 /// ##### See also
594 /// - [`RuntimeMetrics::min_steal_count`]
595 /// - [`RuntimeMetrics::max_steal_count`]
596 ///
597 /// ##### Examples
598 /// In the below example, a blocking channel is used to backup one worker thread:
599 /// ```
600 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
601 /// async fn main() {
602 /// let handle = tokio::runtime::Handle::current();
603 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
604 /// let mut intervals = monitor.intervals();
605 /// let mut next_interval = || intervals.next().unwrap();
606 ///
607 /// let interval = next_interval(); // end of first sampling interval
608 /// assert_eq!(interval.total_steal_count, 0);
609 /// assert_eq!(interval.min_steal_count, 0);
610 /// assert_eq!(interval.max_steal_count, 0);
611 ///
612 /// // induce a steal
613 /// async {
614 /// let (tx, rx) = std::sync::mpsc::channel();
615 /// // Move to the runtime.
616 /// tokio::spawn(async move {
617 /// // Spawn the task that sends to the channel
618 /// tokio::spawn(async move {
619 /// tx.send(()).unwrap();
620 /// });
621 /// // Spawn a task that bumps the previous task out of the "next
622 /// // scheduled" slot.
623 /// tokio::spawn(async {});
624 /// // Blocking receive on the channel.
625 /// rx.recv().unwrap();
626 /// flush_metrics().await;
627 /// }).await.unwrap();
628 /// flush_metrics().await;
629 /// }.await;
630 ///
631 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
632 /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
633 ///
634 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3
635 /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count);
636 /// }
637 ///
638 /// async fn flush_metrics() {
639 /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
640 /// }
641 /// ```
642 pub total_steal_count: u64,
643
644 /// The maximum number of tasks any worker thread stole from another worker thread.
645 ///
646 /// ##### Definition
647 /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
648 /// across all worker threads.
649 ///
650 /// ##### See also
651 /// - [`RuntimeMetrics::total_steal_count`]
652 /// - [`RuntimeMetrics::min_steal_count`]
653 pub max_steal_count: u64,
654
655 /// The minimum number of tasks any worker thread stole from another worker thread.
656 ///
657 /// ##### Definition
658 /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`]
659 /// across all worker threads.
660 ///
661 /// ##### See also
662 /// - [`RuntimeMetrics::total_steal_count`]
663 /// - [`RuntimeMetrics::max_steal_count`]
664 pub min_steal_count: u64,
665
666 /// The number of times worker threads stole tasks from another worker thread.
667 ///
668 /// The worker steal operations increases by one each time the worker has processed its
669 /// scheduled queue and successfully steals more pending tasks from another worker.
670 ///
671 /// This metric only applies to the **multi-threaded** runtime and will always return `0` when
672 /// using the current thread runtime.
673 ///
674 /// ##### Definition
675 /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
676 /// for all worker threads.
677 ///
678 /// ##### See also
679 /// - [`RuntimeMetrics::min_steal_operations`]
680 /// - [`RuntimeMetrics::max_steal_operations`]
681 ///
682 /// ##### Examples
683 /// In the below example, a blocking channel is used to backup one worker thread:
684 /// ```
685 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
686 /// async fn main() {
687 /// let handle = tokio::runtime::Handle::current();
688 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
689 /// let mut intervals = monitor.intervals();
690 /// let mut next_interval = || intervals.next().unwrap();
691 ///
692 /// let interval = next_interval(); // end of first sampling interval
693 /// assert_eq!(interval.total_steal_operations, 0);
694 /// assert_eq!(interval.min_steal_operations, 0);
695 /// assert_eq!(interval.max_steal_operations, 0);
696 ///
697 /// // induce a steal
698 /// async {
699 /// let (tx, rx) = std::sync::mpsc::channel();
700 /// // Move to the runtime.
701 /// tokio::spawn(async move {
702 /// // Spawn the task that sends to the channel
703 /// tokio::spawn(async move {
704 /// tx.send(()).unwrap();
705 /// });
706 /// // Spawn a task that bumps the previous task out of the "next
707 /// // scheduled" slot.
708 /// tokio::spawn(async {});
709 /// // Blocking receive on the channe.
710 /// rx.recv().unwrap();
711 /// flush_metrics().await;
712 /// }).await.unwrap();
713 /// flush_metrics().await;
714 /// }.await;
715 ///
716 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
717 /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
718 ///
719 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3
720 /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations);
721 /// }
722 ///
723 /// async fn flush_metrics() {
724 /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
725 /// }
726 /// ```
727 pub total_steal_operations: u64,
728
729 /// The maximum number of times any worker thread stole tasks from another worker thread.
730 ///
731 /// ##### Definition
732 /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
733 /// across all worker threads.
734 ///
735 /// ##### See also
736 /// - [`RuntimeMetrics::total_steal_operations`]
737 /// - [`RuntimeMetrics::min_steal_operations`]
738 pub max_steal_operations: u64,
739
740 /// The minimum number of times any worker thread stole tasks from another worker thread.
741 ///
742 /// ##### Definition
743 /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`]
744 /// across all worker threads.
745 ///
746 /// ##### See also
747 /// - [`RuntimeMetrics::total_steal_operations`]
748 /// - [`RuntimeMetrics::max_steal_operations`]
749 pub min_steal_operations: u64,
750
751 /// The number of tasks scheduled from **outside** of the runtime.
752 ///
753 /// The remote schedule count increases by one each time a task is woken from **outside** of
754 /// the runtime. This usually means that a task is spawned or notified from a non-runtime
755 /// thread and must be queued using the Runtime's global queue, which tends to be slower.
756 ///
757 /// ##### Definition
758 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::remote_schedule_count`].
759 ///
760 /// ##### Examples
761 /// In the below example, a remote schedule is induced by spawning a system thread, then
762 /// spawning a tokio task from that system thread:
763 /// ```
764 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
765 /// async fn main() {
766 /// let handle = tokio::runtime::Handle::current();
767 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
768 /// let mut intervals = monitor.intervals();
769 /// let mut next_interval = || intervals.next().unwrap();
770 ///
771 /// let interval = next_interval(); // end of first sampling interval
772 /// assert_eq!(interval.num_remote_schedules, 0);
773 ///
774 /// // spawn a non-runtime thread
775 /// std::thread::spawn(move || {
776 /// // spawn two tasks from this non-runtime thread
777 /// async move {
778 /// handle.spawn(async {}).await;
779 /// handle.spawn(async {}).await;
780 /// }
781 /// }).join().unwrap().await;
782 ///
783 /// let interval = next_interval(); // end of second sampling interval
784 /// assert_eq!(interval.num_remote_schedules, 2);
785 ///
786 /// let interval = next_interval(); // end of third sampling interval
787 /// assert_eq!(interval.num_remote_schedules, 0);
788 /// }
789 /// ```
790 pub num_remote_schedules: u64,
791
792 /// The number of tasks scheduled from worker threads.
793 ///
794 /// The local schedule count increases by one each time a task is woken from **inside** of the
795 /// runtime. This usually means that a task is spawned or notified from within a runtime thread
796 /// and will be queued on the worker-local queue.
797 ///
798 /// ##### Definition
799 /// This metric is derived from the sum of
800 /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] across all worker threads.
801 ///
802 /// ##### See also
803 /// - [`RuntimeMetrics::min_local_schedule_count`]
804 /// - [`RuntimeMetrics::max_local_schedule_count`]
805 ///
806 /// ##### Examples
807 /// ###### With `current_thread` runtime
808 /// In the below example, two tasks are spawned from the context of a third tokio task:
809 /// ```
810 /// #[tokio::main(flavor = "current_thread")]
811 /// async fn main() {
812 /// let handle = tokio::runtime::Handle::current();
813 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
814 /// let mut intervals = monitor.intervals();
815 /// let mut next_interval = || intervals.next().unwrap();
816 ///
817 /// let interval = { flush_metrics().await; next_interval() }; // end interval 2
818 /// assert_eq!(interval.total_local_schedule_count, 0);
819 ///
820 /// let task = async {
821 /// tokio::spawn(async {}); // local schedule 1
822 /// tokio::spawn(async {}); // local schedule 2
823 /// };
824 ///
825 /// let handle = tokio::spawn(task); // local schedule 3
826 ///
827 /// let interval = { flush_metrics().await; next_interval() }; // end interval 2
828 /// assert_eq!(interval.total_local_schedule_count, 3);
829 ///
830 /// let _ = handle.await;
831 ///
832 /// let interval = { flush_metrics().await; next_interval() }; // end interval 3
833 /// assert_eq!(interval.total_local_schedule_count, 0);
834 /// }
835 ///
836 /// async fn flush_metrics() {
837 /// tokio::task::yield_now().await;
838 /// }
839 /// ```
840 ///
841 /// ###### With `multi_thread` runtime
842 /// In the below example, 100 tasks are spawned:
843 /// ```
844 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
845 /// async fn main() {
846 /// let handle = tokio::runtime::Handle::current();
847 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
848 /// let mut intervals = monitor.intervals();
849 /// let mut next_interval = || intervals.next().unwrap();
850 ///
851 /// let interval = next_interval(); // end of interval 1
852 /// assert_eq!(interval.total_local_schedule_count, 0);
853 ///
854 /// use std::sync::atomic::{AtomicBool, Ordering};
855 /// static SPINLOCK: AtomicBool = AtomicBool::new(true);
856 ///
857 /// // block the other worker thread
858 /// tokio::spawn(async {
859 /// while SPINLOCK.load(Ordering::SeqCst) {}
860 /// });
861 ///
862 /// // FIXME: why does this need to be in a `spawn`?
863 /// let _ = tokio::spawn(async {
864 /// // spawn 100 tasks
865 /// for _ in 0..100 {
866 /// tokio::spawn(async {});
867 /// }
868 /// // this spawns 1 more task
869 /// flush_metrics().await;
870 /// }).await;
871 ///
872 /// // unblock the other worker thread
873 /// SPINLOCK.store(false, Ordering::SeqCst);
874 ///
875 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
876 /// assert_eq!(interval.total_local_schedule_count, 100 + 1);
877 /// }
878 ///
879 /// async fn flush_metrics() {
880 /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await;
881 /// }
882 /// ```
883 pub total_local_schedule_count: u64,
884
885 /// The maximum number of tasks scheduled from any one worker thread.
886 ///
887 /// ##### Definition
888 /// This metric is derived from the maximum of
889 /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
890 ///
891 /// ##### See also
892 /// - [`RuntimeMetrics::total_local_schedule_count`]
893 /// - [`RuntimeMetrics::min_local_schedule_count`]
894 pub max_local_schedule_count: u64,
895
896 /// The minimum number of tasks scheduled from any one worker thread.
897 ///
898 /// ##### Definition
899 /// This metric is derived from the minimum of
900 /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads.
901 ///
902 /// ##### See also
903 /// - [`RuntimeMetrics::total_local_schedule_count`]
904 /// - [`RuntimeMetrics::max_local_schedule_count`]
905 pub min_local_schedule_count: u64,
906
907 /// The number of times worker threads saturated their local queues.
908 ///
909 /// The worker steal count increases by one each time the worker attempts to schedule a task
910 /// locally, but its local queue is full. When this happens, half of the
911 /// local queue is moved to the global queue.
912 ///
913 /// This metric only applies to the **multi-threaded** scheduler.
914 ///
915 /// ##### Definition
916 /// This metric is derived from the sum of
917 /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
918 ///
919 /// ##### See also
920 /// - [`RuntimeMetrics::min_overflow_count`]
921 /// - [`RuntimeMetrics::max_overflow_count`]
922 ///
923 /// ##### Examples
924 /// ```
925 /// #[tokio::main(flavor = "multi_thread", worker_threads = 1)]
926 /// async fn main() {
927 /// let handle = tokio::runtime::Handle::current();
928 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
929 /// let mut intervals = monitor.intervals();
930 /// let mut next_interval = || intervals.next().unwrap();
931 ///
932 /// let interval = next_interval(); // end of interval 1
933 /// assert_eq!(interval.total_overflow_count, 0);
934 ///
935 /// use std::sync::atomic::{AtomicBool, Ordering};
936 ///
937 /// // spawn a ton of tasks
938 /// let _ = tokio::spawn(async {
939 /// // we do this in a `tokio::spawn` because it is impossible to
940 /// // overflow the main task
941 /// for _ in 0..300 {
942 /// tokio::spawn(async {});
943 /// }
944 /// }).await;
945 ///
946 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
947 /// assert_eq!(interval.total_overflow_count, 1);
948 /// }
949 ///
950 /// async fn flush_metrics() {
951 /// let _ = tokio::time::sleep(std::time::Duration::from_millis(1)).await;
952 /// }
953 /// ```
954 pub total_overflow_count: u64,
955
956 /// The maximum number of times any one worker saturated its local queue.
957 ///
958 /// ##### Definition
959 /// This metric is derived from the maximum of
960 /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
961 ///
962 /// ##### See also
963 /// - [`RuntimeMetrics::total_overflow_count`]
964 /// - [`RuntimeMetrics::min_overflow_count`]
965 pub max_overflow_count: u64,
966
967 /// The minimum number of times any one worker saturated its local queue.
968 ///
969 /// ##### Definition
970 /// This metric is derived from the maximum of
971 /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads.
972 ///
973 /// ##### See also
974 /// - [`RuntimeMetrics::total_overflow_count`]
975 /// - [`RuntimeMetrics::max_overflow_count`]
976 pub min_overflow_count: u64,
977
978 /// The number of tasks that have been polled across all worker threads.
979 ///
980 /// The worker poll count increases by one each time a worker polls a scheduled task.
981 ///
982 /// ##### Definition
983 /// This metric is derived from the sum of
984 /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
985 ///
986 /// ##### See also
987 /// - [`RuntimeMetrics::min_polls_count`]
988 /// - [`RuntimeMetrics::max_polls_count`]
989 ///
990 /// ##### Examples
991 /// In the below example, 42 tasks are spawned and polled:
992 /// ```
993 /// #[tokio::main(flavor = "current_thread")]
994 /// async fn main() {
995 /// let handle = tokio::runtime::Handle::current();
996 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
997 /// let mut intervals = monitor.intervals();
998 /// let mut next_interval = || intervals.next().unwrap();
999 ///
1000 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 1
1001 /// assert_eq!(interval.total_polls_count, 0);
1002 /// assert_eq!(interval.min_polls_count, 0);
1003 /// assert_eq!(interval.max_polls_count, 0);
1004 ///
1005 /// const N: u64 = 42;
1006 ///
1007 /// for _ in 0..N {
1008 /// let _ = tokio::spawn(async {}).await;
1009 /// }
1010 ///
1011 /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2
1012 /// assert_eq!(interval.total_polls_count, N);
1013 /// assert_eq!(interval.min_polls_count, N);
1014 /// assert_eq!(interval.max_polls_count, N);
1015 /// }
1016 ///
1017 /// async fn flush_metrics() {
1018 /// let _ = tokio::task::yield_now().await;
1019 /// }
1020 /// ```
1021 pub total_polls_count: u64,
1022
1023 /// The maximum number of tasks that have been polled in any worker thread.
1024 ///
1025 /// ##### Definition
1026 /// This metric is derived from the maximum of
1027 /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1028 ///
1029 /// ##### See also
1030 /// - [`RuntimeMetrics::total_polls_count`]
1031 /// - [`RuntimeMetrics::min_polls_count`]
1032 pub max_polls_count: u64,
1033
1034 /// The minimum number of tasks that have been polled in any worker thread.
1035 ///
1036 /// ##### Definition
1037 /// This metric is derived from the minimum of
1038 /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads.
1039 ///
1040 /// ##### See also
1041 /// - [`RuntimeMetrics::total_polls_count`]
1042 /// - [`RuntimeMetrics::max_polls_count`]
1043 pub min_polls_count: u64,
1044
1045 /// The total number of tasks currently scheduled in workers' local queues.
1046 ///
1047 /// Tasks that are spawned or notified from within a runtime thread are scheduled using that
1048 /// worker's local queue. This metric returns the **current** number of tasks pending in all
1049 /// workers' local queues. As such, the returned value may increase or decrease as new tasks
1050 /// are scheduled and processed.
1051 ///
1052 /// ##### Definition
1053 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`].
1054 ///
1055 /// ##### See also
1056 /// - [`RuntimeMetrics::min_local_queue_depth`]
1057 /// - [`RuntimeMetrics::max_local_queue_depth`]
1058 ///
1059 /// ##### Example
1060 ///
1061 /// ###### With `current_thread` runtime
1062 /// The below example spawns 100 tasks:
1063 /// ```
1064 /// #[tokio::main(flavor = "current_thread")]
1065 /// async fn main() {
1066 /// const N: usize = 100;
1067 ///
1068 /// let handle = tokio::runtime::Handle::current();
1069 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1070 /// let mut intervals = monitor.intervals();
1071 /// let mut next_interval = || intervals.next().unwrap();
1072 ///
1073 /// let interval = next_interval(); // end of interval 1
1074 /// assert_eq!(interval.total_local_queue_depth, 0);
1075 ///
1076 ///
1077 /// for _ in 0..N {
1078 /// tokio::spawn(async {});
1079 /// }
1080 /// let interval = next_interval(); // end of interval 2
1081 /// assert_eq!(interval.total_local_queue_depth, N);
1082 /// }
1083 /// ```
1084 ///
1085 /// ###### With `multi_thread` runtime
1086 /// The below example spawns 100 tasks:
1087 /// ```
1088 /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
1089 /// async fn main() {
1090 /// const N: usize = 100;
1091 ///
1092 /// let handle = tokio::runtime::Handle::current();
1093 /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1094 /// let mut intervals = monitor.intervals();
1095 /// let mut next_interval = || intervals.next().unwrap();
1096 ///
1097 /// let interval = next_interval(); // end of interval 1
1098 /// assert_eq!(interval.total_local_queue_depth, 0);
1099 ///
1100 /// use std::sync::atomic::{AtomicBool, Ordering};
1101 /// static SPINLOCK_A: AtomicBool = AtomicBool::new(true);
1102 ///
1103 /// // block the other worker thread
1104 /// tokio::spawn(async {
1105 /// while SPINLOCK_A.load(Ordering::SeqCst) {}
1106 /// });
1107 ///
1108 /// static SPINLOCK_B: AtomicBool = AtomicBool::new(true);
1109 ///
1110 /// let _ = tokio::spawn(async {
1111 /// for _ in 0..N {
1112 /// tokio::spawn(async {
1113 /// while SPINLOCK_B.load(Ordering::SeqCst) {}
1114 /// });
1115 /// }
1116 /// }).await;
1117 ///
1118 /// // unblock the other worker thread
1119 /// SPINLOCK_A.store(false, Ordering::SeqCst);
1120 ///
1121 /// let interval = next_interval(); // end of interval 2
1122 /// assert_eq!(interval.total_local_queue_depth, N - 1);
1123 ///
1124 /// SPINLOCK_B.store(false, Ordering::SeqCst);
1125 /// }
1126 /// ```
1127 pub total_local_queue_depth: usize,
1128
1129 /// The maximum number of tasks currently scheduled any worker's local queue.
1130 ///
1131 /// ##### Definition
1132 /// This metric is derived from the maximum of
1133 /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1134 ///
1135 /// ##### See also
1136 /// - [`RuntimeMetrics::total_local_queue_depth`]
1137 /// - [`RuntimeMetrics::min_local_queue_depth`]
1138 pub max_local_queue_depth: usize,
1139
1140 /// The minimum number of tasks currently scheduled any worker's local queue.
1141 ///
1142 /// ##### Definition
1143 /// This metric is derived from the minimum of
1144 /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads.
1145 ///
1146 /// ##### See also
1147 /// - [`RuntimeMetrics::total_local_queue_depth`]
1148 /// - [`RuntimeMetrics::max_local_queue_depth`]
1149 pub min_local_queue_depth: usize,
1150
1151 /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool.
1152 ///
1153 /// ##### Definition
1154 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::blocking_queue_depth`].
1155 pub blocking_queue_depth: usize,
1156
1157 /// The current number of alive tasks in the runtime.
1158 ///
1159 /// ##### Definition
1160 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_alive_tasks`].
1161 pub live_tasks_count: usize,
1162
1163 /// The number of additional threads spawned by the runtime.
1164 ///
1165 /// ##### Definition
1166 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_blocking_threads`].
1167 pub blocking_threads_count: usize,
1168
1169 /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls.
1170 ///
1171 /// ##### Definition
1172 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_idle_blocking_threads`].
1173 pub idle_blocking_threads_count: usize,
1174
1175 /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets.
1176 ///
1177 /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget.
1178 ///
1179 /// The counter is monotonically increasing. It is never decremented or reset to zero.
1180 ///
1181 /// ##### Definition
1182 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::budget_forced_yield_count`].
1183 pub budget_forced_yield_count: u64,
1184
1185 /// Returns the number of ready events processed by the runtime’s I/O driver.
1186 ///
1187 /// ##### Definition
1188 /// This metric is derived from [`tokio::runtime::RuntimeMetrics::io_driver_ready_count`].
1189 pub io_driver_ready_count: u64,
1190 }
1191}
1192
1193macro_rules! define_semi_stable {
1194 (
1195 $(#[$($attributes:tt)*])*
1196 $vis:vis struct $name:ident {
1197 stable {
1198 $($stable_name:ident: $stable_ty:ty),*
1199 $(,)?
1200 }
1201 $(,)?
1202 unstable {
1203 $($unstable_name:ident: $unstable_ty:ty),*
1204 $(,)?
1205 }
1206 }
1207 ) => {
1208 $(#[$($attributes)*])*
1209 $vis struct $name {
1210 $(
1211 $stable_name: $stable_ty,
1212 )*
1213 $(
1214 #[cfg(tokio_unstable)]
1215 #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
1216 $unstable_name: $unstable_ty,
1217 )*
1218 }
1219 };
1220}
1221
1222define_semi_stable! {
1223 /// Snapshot of per-worker metrics
1224 #[derive(Debug, Default)]
1225 struct Worker {
1226 stable {
1227 worker: usize,
1228 total_park_count: u64,
1229 total_busy_duration: Duration,
1230 }
1231 unstable {
1232 total_noop_count: u64,
1233 total_steal_count: u64,
1234 total_steal_operations: u64,
1235 total_local_schedule_count: u64,
1236 total_overflow_count: u64,
1237 total_polls_count: u64,
1238 poll_time_histogram: Vec<u64>,
1239 }
1240 }
1241}
1242
1243define_semi_stable! {
1244 /// Iterator returned by [`RuntimeMonitor::intervals`].
1245 ///
1246 /// See that method's documentation for more details.
1247 #[derive(Debug)]
1248 pub struct RuntimeIntervals {
1249 stable {
1250 runtime: runtime::RuntimeMetrics,
1251 started_at: Instant,
1252 workers: Vec<Worker>,
1253 }
1254 unstable {
1255 // Number of tasks scheduled from *outside* of the runtime
1256 num_remote_schedules: u64,
1257 budget_forced_yield_count: u64,
1258 io_driver_ready_count: u64,
1259 }
1260 }
1261}
1262
1263impl RuntimeIntervals {
1264 fn probe(&mut self) -> RuntimeMetrics {
1265 let now = Instant::now();
1266
1267 let mut metrics = RuntimeMetrics {
1268 workers_count: self.runtime.num_workers(),
1269 elapsed: now - self.started_at,
1270 global_queue_depth: self.runtime.global_queue_depth(),
1271 min_park_count: u64::MAX,
1272 min_busy_duration: Duration::from_secs(1000000000),
1273 ..Default::default()
1274 };
1275
1276 #[cfg(tokio_unstable)]
1277 {
1278 let num_remote_schedules = self.runtime.remote_schedule_count();
1279 let budget_forced_yields = self.runtime.budget_forced_yield_count();
1280 let io_driver_ready_events = self.runtime.io_driver_ready_count();
1281
1282 metrics.num_remote_schedules = num_remote_schedules - self.num_remote_schedules;
1283 metrics.min_noop_count = u64::MAX;
1284 metrics.min_steal_count = u64::MAX;
1285 metrics.min_local_schedule_count = u64::MAX;
1286 metrics.min_overflow_count = u64::MAX;
1287 metrics.min_polls_count = u64::MAX;
1288 metrics.min_local_queue_depth = usize::MAX;
1289 metrics.mean_poll_duration_worker_min = Duration::MAX;
1290 metrics.poll_time_histogram = vec![0; self.runtime.poll_time_histogram_num_buckets()];
1291 metrics.budget_forced_yield_count =
1292 budget_forced_yields - self.budget_forced_yield_count;
1293 metrics.io_driver_ready_count = io_driver_ready_events - self.io_driver_ready_count;
1294
1295 self.num_remote_schedules = num_remote_schedules;
1296 self.budget_forced_yield_count = budget_forced_yields;
1297 self.io_driver_ready_count = io_driver_ready_events;
1298 }
1299 self.started_at = now;
1300
1301 for worker in &mut self.workers {
1302 worker.probe(&self.runtime, &mut metrics);
1303 }
1304
1305 #[cfg(tokio_unstable)]
1306 {
1307 if metrics.total_polls_count == 0 {
1308 debug_assert_eq!(metrics.mean_poll_duration, Duration::default());
1309
1310 metrics.mean_poll_duration_worker_max = Duration::default();
1311 metrics.mean_poll_duration_worker_min = Duration::default();
1312 }
1313 }
1314
1315 metrics
1316 }
1317}
1318
1319impl Iterator for RuntimeIntervals {
1320 type Item = RuntimeMetrics;
1321
1322 fn next(&mut self) -> Option<RuntimeMetrics> {
1323 Some(self.probe())
1324 }
1325}
1326
1327impl RuntimeMonitor {
1328 /// Creates a new [`RuntimeMonitor`].
1329 pub fn new(runtime: &runtime::Handle) -> RuntimeMonitor {
1330 let runtime = runtime.metrics();
1331
1332 RuntimeMonitor { runtime }
1333 }
1334
1335 /// Produces an unending iterator of [`RuntimeMetrics`].
1336 ///
1337 /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1338 /// produced by [`RuntimeMonitor::intervals`]. The item type of this iterator is [`RuntimeMetrics`],
1339 /// which is a bundle of runtime metrics that describe *only* changes occurring within that sampling
1340 /// interval.
1341 ///
1342 /// # Example
1343 ///
1344 /// ```
1345 /// use std::time::Duration;
1346 ///
1347 /// #[tokio::main]
1348 /// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1349 /// let handle = tokio::runtime::Handle::current();
1350 /// // construct the runtime metrics monitor
1351 /// let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
1352 ///
1353 /// // print runtime metrics every 500ms
1354 /// {
1355 /// tokio::spawn(async move {
1356 /// for interval in runtime_monitor.intervals() {
1357 /// // pretty-print the metric interval
1358 /// println!("{:?}", interval);
1359 /// // wait 500ms
1360 /// tokio::time::sleep(Duration::from_millis(500)).await;
1361 /// }
1362 /// });
1363 /// }
1364 ///
1365 /// // await some tasks
1366 /// tokio::join![
1367 /// do_work(),
1368 /// do_work(),
1369 /// do_work(),
1370 /// ];
1371 ///
1372 /// Ok(())
1373 /// }
1374 ///
1375 /// async fn do_work() {
1376 /// for _ in 0..25 {
1377 /// tokio::task::yield_now().await;
1378 /// tokio::time::sleep(Duration::from_millis(100)).await;
1379 /// }
1380 /// }
1381 /// ```
1382 pub fn intervals(&self) -> RuntimeIntervals {
1383 let started_at = Instant::now();
1384
1385 let workers = (0..self.runtime.num_workers())
1386 .map(|worker| Worker::new(worker, &self.runtime))
1387 .collect();
1388
1389 RuntimeIntervals {
1390 runtime: self.runtime.clone(),
1391 started_at,
1392 workers,
1393
1394 #[cfg(tokio_unstable)]
1395 num_remote_schedules: self.runtime.remote_schedule_count(),
1396 #[cfg(tokio_unstable)]
1397 budget_forced_yield_count: self.runtime.budget_forced_yield_count(),
1398 #[cfg(tokio_unstable)]
1399 io_driver_ready_count: self.runtime.io_driver_ready_count(),
1400 }
1401 }
1402}
1403
1404impl Worker {
1405 fn new(worker: usize, rt: &runtime::RuntimeMetrics) -> Worker {
1406 #[allow(unused_mut, clippy::needless_update)]
1407 let mut wrk = Worker {
1408 worker,
1409 total_park_count: rt.worker_park_count(worker),
1410 total_busy_duration: rt.worker_total_busy_duration(worker),
1411 ..Default::default()
1412 };
1413
1414 #[cfg(tokio_unstable)]
1415 {
1416 let poll_time_histogram = if rt.poll_time_histogram_enabled() {
1417 vec![0; rt.poll_time_histogram_num_buckets()]
1418 } else {
1419 vec![]
1420 };
1421 wrk.total_noop_count = rt.worker_noop_count(worker);
1422 wrk.total_steal_count = rt.worker_steal_count(worker);
1423 wrk.total_steal_operations = rt.worker_steal_operations(worker);
1424 wrk.total_local_schedule_count = rt.worker_local_schedule_count(worker);
1425 wrk.total_overflow_count = rt.worker_overflow_count(worker);
1426 wrk.total_polls_count = rt.worker_poll_count(worker);
1427 wrk.poll_time_histogram = poll_time_histogram;
1428 };
1429 wrk
1430 }
1431
1432 fn probe(&mut self, rt: &runtime::RuntimeMetrics, metrics: &mut RuntimeMetrics) {
1433 macro_rules! metric {
1434 ( $sum:ident, $max:ident, $min:ident, $probe:ident ) => {{
1435 let val = rt.$probe(self.worker);
1436 let delta = val - self.$sum;
1437 self.$sum = val;
1438
1439 metrics.$sum += delta;
1440
1441 if delta > metrics.$max {
1442 metrics.$max = delta;
1443 }
1444
1445 if delta < metrics.$min {
1446 metrics.$min = delta;
1447 }
1448 }};
1449 }
1450
1451 metric!(
1452 total_park_count,
1453 max_park_count,
1454 min_park_count,
1455 worker_park_count
1456 );
1457 metric!(
1458 total_busy_duration,
1459 max_busy_duration,
1460 min_busy_duration,
1461 worker_total_busy_duration
1462 );
1463
1464 #[cfg(tokio_unstable)]
1465 {
1466 let mut worker_polls_count = self.total_polls_count;
1467 let total_polls_count = metrics.total_polls_count;
1468
1469 metric!(
1470 total_noop_count,
1471 max_noop_count,
1472 min_noop_count,
1473 worker_noop_count
1474 );
1475 metric!(
1476 total_steal_count,
1477 max_steal_count,
1478 min_steal_count,
1479 worker_steal_count
1480 );
1481 metric!(
1482 total_steal_operations,
1483 max_steal_operations,
1484 min_steal_operations,
1485 worker_steal_operations
1486 );
1487 metric!(
1488 total_local_schedule_count,
1489 max_local_schedule_count,
1490 min_local_schedule_count,
1491 worker_local_schedule_count
1492 );
1493 metric!(
1494 total_overflow_count,
1495 max_overflow_count,
1496 min_overflow_count,
1497 worker_overflow_count
1498 );
1499 metric!(
1500 total_polls_count,
1501 max_polls_count,
1502 min_polls_count,
1503 worker_poll_count
1504 );
1505
1506 // Get the number of polls since last probe
1507 worker_polls_count = self.total_polls_count - worker_polls_count;
1508
1509 // Update the mean task poll duration if there were polls
1510 if worker_polls_count > 0 {
1511 let val = rt.worker_mean_poll_time(self.worker);
1512
1513 if val > metrics.mean_poll_duration_worker_max {
1514 metrics.mean_poll_duration_worker_max = val;
1515 }
1516
1517 if val < metrics.mean_poll_duration_worker_min {
1518 metrics.mean_poll_duration_worker_min = val;
1519 }
1520
1521 // First, scale the current value down
1522 let ratio = total_polls_count as f64 / metrics.total_polls_count as f64;
1523 let mut mean = metrics.mean_poll_duration.as_nanos() as f64 * ratio;
1524
1525 // Add the scaled current worker's mean poll duration
1526 let ratio = worker_polls_count as f64 / metrics.total_polls_count as f64;
1527 mean += val.as_nanos() as f64 * ratio;
1528
1529 metrics.mean_poll_duration = Duration::from_nanos(mean as u64);
1530 }
1531
1532 // Update the histogram counts if there were polls since last count
1533 if worker_polls_count > 0 {
1534 for (bucket, cell) in metrics.poll_time_histogram.iter_mut().enumerate() {
1535 let new = rt.poll_time_histogram_bucket_count(self.worker, bucket);
1536 let delta = new - self.poll_time_histogram[bucket];
1537 self.poll_time_histogram[bucket] = new;
1538
1539 *cell += delta;
1540 }
1541 }
1542
1543 // Local scheduled tasks is an absolute value
1544 let local_scheduled_tasks = rt.worker_local_queue_depth(self.worker);
1545 metrics.total_local_queue_depth += local_scheduled_tasks;
1546
1547 if local_scheduled_tasks > metrics.max_local_queue_depth {
1548 metrics.max_local_queue_depth = local_scheduled_tasks;
1549 }
1550
1551 if local_scheduled_tasks < metrics.min_local_queue_depth {
1552 metrics.min_local_queue_depth = local_scheduled_tasks;
1553 }
1554
1555 // Blocking queue depth is an absolute value too
1556 metrics.blocking_queue_depth = rt.blocking_queue_depth();
1557
1558 #[allow(deprecated)]
1559 {
1560 // use the deprecated active_tasks_count here to support slightly older versions of Tokio,
1561 // it's the same.
1562 metrics.live_tasks_count = rt.active_tasks_count();
1563 }
1564 metrics.blocking_threads_count = rt.num_blocking_threads();
1565 metrics.idle_blocking_threads_count = rt.num_idle_blocking_threads();
1566 }
1567 }
1568}
1569
1570impl RuntimeMetrics {
1571 /// Returns the ratio of the [`RuntimeMetrics::total_polls_count`] to the [`RuntimeMetrics::total_noop_count`].
1572 #[cfg(tokio_unstable)]
1573 pub fn mean_polls_per_park(&self) -> f64 {
1574 let total_park_count = self.total_park_count - self.total_noop_count;
1575 if total_park_count == 0 {
1576 0.0
1577 } else {
1578 self.total_polls_count as f64 / total_park_count as f64
1579 }
1580 }
1581
1582 /// Returns the ratio of the [`RuntimeMetrics::total_busy_duration`] to the [`RuntimeMetrics::elapsed`].
1583 pub fn busy_ratio(&self) -> f64 {
1584 self.total_busy_duration.as_nanos() as f64 / self.elapsed.as_nanos() as f64
1585 }
1586}