tokio_metrics/task.rs
1use futures_util::task::{ArcWake, AtomicWaker};
2use pin_project_lite::pin_project;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
6use std::sync::Arc;
7use std::task::{Context, Poll};
8use tokio_stream::Stream;
9
10#[cfg(feature = "rt")]
11use tokio::time::{Duration, Instant};
12
13use crate::derived_metrics::derived_metrics;
14#[cfg(not(feature = "rt"))]
15use std::time::{Duration, Instant};
16
17#[cfg(all(feature = "rt", feature = "metrics-rs-integration"))]
18pub(crate) mod metrics_rs_integration;
19
20/// Monitors key metrics of instrumented tasks.
21///
22/// ### Basic Usage
23/// A [`TaskMonitor`] tracks key [metrics][TaskMetrics] of async tasks that have been
24/// [instrumented][`TaskMonitor::instrument`] with the monitor.
25///
26/// In the below example, a [`TaskMonitor`] is [constructed][TaskMonitor::new] and used to
27/// [instrument][TaskMonitor::instrument] three worker tasks; meanwhile, a fourth task
28/// prints [metrics][TaskMetrics] in 500ms [intervals][TaskMonitor::intervals].
29/// ```
30/// use std::time::Duration;
31///
32/// #[tokio::main]
33/// async fn main() {
34/// // construct a metrics monitor
35/// let metrics_monitor = tokio_metrics::TaskMonitor::new();
36///
37/// // print task metrics every 500ms
38/// {
39/// let metrics_monitor = metrics_monitor.clone();
40/// tokio::spawn(async move {
41/// for interval in metrics_monitor.intervals() {
42/// // pretty-print the metric interval
43/// println!("{:?}", interval);
44/// // wait 500ms
45/// tokio::time::sleep(Duration::from_millis(500)).await;
46/// }
47/// });
48/// }
49///
50/// // instrument some tasks and await them
51/// // note that the same TaskMonitor can be used for multiple tasks
52/// tokio::join![
53/// metrics_monitor.instrument(do_work()),
54/// metrics_monitor.instrument(do_work()),
55/// metrics_monitor.instrument(do_work())
56/// ];
57/// }
58///
59/// async fn do_work() {
60/// for _ in 0..25 {
61/// tokio::task::yield_now().await;
62/// tokio::time::sleep(Duration::from_millis(100)).await;
63/// }
64/// }
65/// ```
66///
67/// ### What should I instrument?
68/// In most cases, you should construct a *distinct* [`TaskMonitor`] for each kind of key task.
69///
70/// #### Instrumenting a web application
71/// For instance, a web service should have a distinct [`TaskMonitor`] for each endpoint. Within
72/// each endpoint, it's prudent to additionally instrument major sub-tasks, each with their own
73/// distinct [`TaskMonitor`]s. [*Why are my tasks slow?*](#why-are-my-tasks-slow) explores a
74/// debugging scenario for a web service that takes this approach to instrumentation. This
75/// approach is exemplified in the below example:
76/// ```no_run
77/// // The unabridged version of this snippet is in the examples directory of this crate.
78///
79/// #[tokio::main]
80/// async fn main() {
81/// // construct a TaskMonitor for root endpoint
82/// let monitor_root = tokio_metrics::TaskMonitor::new();
83///
84/// // construct TaskMonitors for create_users endpoint
85/// let monitor_create_user = CreateUserMonitors {
86/// // monitor for the entire endpoint
87/// route: tokio_metrics::TaskMonitor::new(),
88/// // monitor for database insertion subtask
89/// insert: tokio_metrics::TaskMonitor::new(),
90/// };
91///
92/// // build our application with two instrumented endpoints
93/// let app = axum::Router::new()
94/// // `GET /` goes to `root`
95/// .route("/", axum::routing::get({
96/// let monitor = monitor_root.clone();
97/// move || monitor.instrument(async { "Hello, World!" })
98/// }))
99/// // `POST /users` goes to `create_user`
100/// .route("/users", axum::routing::post({
101/// let monitors = monitor_create_user.clone();
102/// let route = monitors.route.clone();
103/// move |payload| {
104/// route.instrument(create_user(payload, monitors))
105/// }
106/// }));
107///
108/// // print task metrics for each endpoint every 1s
109/// let metrics_frequency = std::time::Duration::from_secs(1);
110/// tokio::spawn(async move {
111/// let root_intervals = monitor_root.intervals();
112/// let create_user_route_intervals =
113/// monitor_create_user.route.intervals();
114/// let create_user_insert_intervals =
115/// monitor_create_user.insert.intervals();
116/// let create_user_intervals =
117/// create_user_route_intervals.zip(create_user_insert_intervals);
118///
119/// let intervals = root_intervals.zip(create_user_intervals);
120/// for (root_route, (create_user_route, create_user_insert)) in intervals {
121/// println!("root_route = {:#?}", root_route);
122/// println!("create_user_route = {:#?}", create_user_route);
123/// println!("create_user_insert = {:#?}", create_user_insert);
124/// tokio::time::sleep(metrics_frequency).await;
125/// }
126/// });
127///
128/// // run the server
129/// let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3000));
130/// let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
131/// axum::serve(listener, app)
132/// .await
133/// .unwrap();
134/// }
135///
136/// async fn create_user(
137/// axum::Json(payload): axum::Json<CreateUser>,
138/// monitors: CreateUserMonitors,
139/// ) -> impl axum::response::IntoResponse {
140/// let user = User { id: 1337, username: payload.username, };
141/// // instrument inserting the user into the db:
142/// let _ = monitors.insert.instrument(insert_user(user.clone())).await;
143/// (axum::http::StatusCode::CREATED, axum::Json(user))
144/// }
145///
146/// /* definitions of CreateUserMonitors, CreateUser and User omitted for brevity */
147///
148/// #
149/// # #[derive(Clone)]
150/// # struct CreateUserMonitors {
151/// # // monitor for the entire endpoint
152/// # route: tokio_metrics::TaskMonitor,
153/// # // monitor for database insertion subtask
154/// # insert: tokio_metrics::TaskMonitor,
155/// # }
156/// #
157/// # #[derive(serde::Deserialize)] struct CreateUser { username: String, }
158/// # #[derive(Clone, serde::Serialize)] struct User { id: u64, username: String, }
159/// #
160/// // insert the user into the database
161/// async fn insert_user(_: User) {
162/// /* implementation details elided */
163/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
164/// }
165/// ```
166///
167/// ### Why are my tasks slow?
168/// **Scenario:** You track key, high-level metrics about the customer response time. An alarm warns
169/// you that P90 latency for an endpoint exceeds your targets. What is causing the increase?
170///
171/// #### Identifying the high-level culprits
172/// A set of tasks will appear to execute more slowly if:
173/// - they are taking longer to poll (i.e., they consume too much CPU time)
174/// - they are waiting longer to be polled (e.g., they're waiting longer in tokio's scheduling
175/// queues)
176/// - they are waiting longer on external events to complete (e.g., asynchronous network requests)
177///
178/// The culprits, at a high level, may be some combination of these sources of latency. Fortunately,
179/// you have instrumented the key tasks of each of your endpoints with distinct [`TaskMonitor`]s.
180/// Using the monitors on the endpoint experiencing elevated latency, you begin by answering:
181/// - [*Are my tasks taking longer to poll?*](#are-my-tasks-taking-longer-to-poll)
182/// - [*Are my tasks spending more time waiting to be polled?*](#are-my-tasks-spending-more-time-waiting-to-be-polled)
183/// - [*Are my tasks spending more time waiting on external events to complete?*](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
184///
185/// ##### Are my tasks taking longer to poll?
186/// - **Did [`mean_poll_duration`][TaskMetrics::mean_poll_duration] increase?**
187/// This metric reflects the mean poll duration. If it increased, it means that, on average,
188/// individual polls tended to take longer. However, this does not necessarily imply increased
189/// task latency: An increase in poll durations could be offset by fewer polls.
190/// - **Did [`slow_poll_ratio`][TaskMetrics::slow_poll_ratio] increase?**
191/// This metric reflects the proportion of polls that were 'slow'. If it increased, it means that
192/// a greater proportion of polls performed excessive computation before yielding. This does not
193/// necessarily imply increased task latency: An increase in the proportion of slow polls could be
194/// offset by fewer or faster polls.
195/// - **Did [`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration] increase?**
196/// This metric reflects the mean duration of slow polls. If it increased, it means that, on
197/// average, slow polls got slower. This does not necessarily imply increased task latency: An
198/// increase in average slow poll duration could be offset by fewer or faster polls.
199///
200/// If so, [*why are my tasks taking longer to poll?*](#why-are-my-tasks-taking-longer-to-poll)
201///
202/// ##### Are my tasks spending more time waiting to be polled?
203/// - **Did [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increase?**
204/// This metric reflects the mean delay between the instant a task is first instrumented and the
205/// instant it is first polled. If it increases, it means that, on average, tasks spent longer
206/// waiting to be initially run.
207/// - **Did [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration] increase?**
208/// This metric reflects the mean duration that tasks spent in the scheduled state. The
209/// 'scheduled' state of a task is the duration between the instant a task is awoken and the
210/// instant it is subsequently polled. If this metric increases, it means that, on average, tasks
211/// spent longer in tokio's queues before being polled.
212/// - **Did [`long_delay_ratio`][TaskMetrics::long_delay_ratio] increase?**
213/// This metric reflects the proportion of scheduling delays which were 'long'. If it increased,
214/// it means that a greater proportion of tasks experienced excessive delays before they could
215/// execute after being woken. This does not necessarily indicate an increase in latency, as this
216/// could be offset by fewer or faster task polls.
217/// - **Did [`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration] increase?**
218/// This metric reflects the mean duration of long delays. If it increased, it means that, on
219/// average, long delays got even longer. This does not necessarily imply increased task latency:
220/// an increase in average long delay duration could be offset by fewer or faster polls or more
221/// short schedules.
222///
223/// If so, [*why are my tasks spending more time waiting to be polled?*](#why-are-my-tasks-spending-more-time-waiting-to-be-polled)
224///
225/// ##### Are my tasks spending more time waiting on external events to complete?
226/// - **Did [`mean_idle_duration`][TaskMetrics::mean_idle_duration] increase?**
227/// This metric reflects the mean duration that tasks spent in the idle state. The idle state is
228/// the duration spanning the instant a task completes a poll, and the instant that it is next
229/// awoken. Tasks inhabit this state when they are waiting for task-external events to complete
230/// (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this metric increases,
231/// tasks, in aggregate, spent more time waiting for task-external events to complete.
232///
233/// If so, [*why are my tasks spending more time waiting on external events to complete?*](#why-are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
234///
235/// #### Digging deeper
236/// Having [established the high-level culprits](#identifying-the-high-level-culprits), you now
237/// search for further explanation...
238///
239/// ##### Why are my tasks taking longer to poll?
240/// You observed that [your tasks are taking longer to poll](#are-my-tasks-taking-longer-to-poll).
241/// The culprit is likely some combination of:
242/// - **Your tasks are accidentally blocking.** Common culprits include:
243/// 1. Using the Rust standard library's [filesystem](https://doc.rust-lang.org/std/fs/) or
244/// [networking](https://doc.rust-lang.org/std/net/) APIs.
245/// These APIs are synchronous; use tokio's [filesystem](https://docs.rs/tokio/latest/tokio/fs/)
246/// and [networking](https://docs.rs/tokio/latest/tokio/net/) APIs, instead.
247/// 3. Calling [`block_on`](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.block_on).
248/// 4. Invoking `println!` or other synchronous logging routines.
249/// Invocations of `println!` involve acquiring an exclusive lock on stdout, followed by a
250/// synchronous write to stdout.
251/// 2. **Your tasks are computationally expensive.** Common culprits include:
252/// 1. TLS/cryptographic routines
253/// 2. doing a lot of processing on bytes
254/// 3. calling non-Tokio resources
255///
256/// ##### Why are my tasks spending more time waiting to be polled?
257/// You observed that [your tasks are spending more time waiting to be polled](#are-my-tasks-spending-more-time-waiting-to-be-polled)
258/// suggesting some combination of:
259/// - Your application is inflating the time elapsed between instrumentation and first poll.
260/// - Your tasks are being scheduled into tokio's global queue.
261/// - Other tasks are spending too long without yielding, thus backing up tokio's queues.
262///
263/// Start by asking: [*Is time-to-first-poll unusually high?*](#is-time-to-first-poll-unusually-high)
264///
265/// ##### Why are my tasks spending more time waiting on external events to complete?
266/// You observed that [your tasks are spending more time waiting waiting on external events to
267/// complete](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete). But what
268/// event? Fortunately, within the task experiencing increased idle times, you monitored several
269/// sub-tasks with distinct [`TaskMonitor`]s. For each of these sub-tasks, you [*you try to identify
270/// the performance culprits...*](#identifying-the-high-level-culprits)
271///
272/// #### Digging even deeper
273///
274/// ##### Is time-to-first-poll unusually high?
275/// Contrast these two metrics:
276/// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
277/// This metric reflects the mean delay between the instant a task is first instrumented and the
278/// instant it is *first* polled.
279/// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
280/// This metric reflects the mean delay between the instant when tasks were awoken and the
281/// instant they were subsequently polled.
282///
283/// If the former metric exceeds the latter (or increased unexpectedly more than the latter), then
284/// start by investigating [*if your application is artificially delaying the time-to-first-poll*](#is-my-application-delaying-the-time-to-first-poll).
285///
286/// Otherwise, investigate [*if other tasks are polling too long without yielding*](#are-other-tasks-polling-too-long-without-yielding).
287///
288/// ##### Is my application delaying the time-to-first-poll?
289/// You observed that [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increased, more
290/// than [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]. Your application may be
291/// needlessly inflating the time elapsed between instrumentation and first poll. Are you
292/// constructing (and instrumenting) tasks separately from awaiting or spawning them?
293///
294/// For instance, in the below example, the application induces 1 second delay between when `task`
295/// is instrumented and when it is awaited:
296/// ```rust
297/// #[tokio::main]
298/// async fn main() {
299/// use tokio::time::Duration;
300/// let monitor = tokio_metrics::TaskMonitor::new();
301///
302/// let task = monitor.instrument(async move {});
303///
304/// let one_sec = Duration::from_secs(1);
305/// tokio::time::sleep(one_sec).await;
306///
307/// let _ = tokio::spawn(task).await;
308///
309/// assert!(monitor.cumulative().total_first_poll_delay >= one_sec);
310/// }
311/// ```
312///
313/// Otherwise, [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] might be unusually high
314/// because [*your application is spawning key tasks into tokio's global queue...*](#is-my-application-spawning-more-tasks-into-tokio’s-global-queue)
315///
316/// ##### Is my application spawning more tasks into tokio's global queue?
317/// Tasks awoken from threads *not* managed by the tokio runtime are scheduled with a slower,
318/// global "injection" queue.
319///
320/// You may be notifying runtime tasks from off-runtime. For instance, Given the following:
321/// ```ignore
322/// #[tokio::main]
323/// async fn main() {
324/// for _ in 0..100 {
325/// let (tx, rx) = oneshot::channel();
326/// tokio::spawn(async move {
327/// tx.send(());
328/// })
329///
330/// rx.await;
331/// }
332/// }
333/// ```
334/// One would expect this to run efficiently, however, the main task is run *off* the main runtime
335/// and the spawned tasks are *on* runtime, which means the snippet will run much slower than:
336/// ```ignore
337/// #[tokio::main]
338/// async fn main() {
339/// tokio::spawn(async {
340/// for _ in 0..100 {
341/// let (tx, rx) = oneshot::channel();
342/// tokio::spawn(async move {
343/// tx.send(());
344/// })
345///
346/// rx.await;
347/// }
348/// }).await;
349/// }
350/// ```
351/// The slowdown is caused by a higher time between the `rx` task being notified (in `tx.send()`)
352/// and the task being polled.
353///
354/// ##### Are other tasks polling too long without yielding?
355/// You suspect that your tasks are slow because they're backed up in tokio's scheduling queues. For
356/// *each* of your application's [`TaskMonitor`]s you check to see [*if their associated tasks are
357/// taking longer to poll...*](#are-my-tasks-taking-longer-to-poll)
358///
359/// ### Limitations
360/// The [`TaskMetrics`] type uses [`u64`] to represent both event counters and durations (measured
361/// in nanoseconds). Consequently, event counters are accurate for ≤ [`u64::MAX`] events, and
362/// durations are accurate for ≤ [`u64::MAX`] nanoseconds.
363///
364/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::cumulative`] increase
365/// monotonically with each successive invocation of [`TaskMonitor::cumulative`]. Upon overflow,
366/// counters and durations wrap.
367///
368/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::intervals`] are
369/// calculated by computing the difference of metrics in successive invocations of
370/// [`TaskMonitor::cumulative`]. If, within a monitoring interval, an event occurs more than
371/// [`u64::MAX`] times, or a monitored duration exceeds [`u64::MAX`] nanoseconds, the metrics for
372/// that interval will overflow and not be accurate.
373///
374/// ##### Examples at the limits
375/// Consider the [`TaskMetrics::total_first_poll_delay`] metric. This metric accurately reflects
376/// delays between instrumentation and first-poll ≤ [`u64::MAX`] nanoseconds:
377/// ```
378/// use tokio::time::Duration;
379///
380/// #[tokio::main(flavor = "current_thread", start_paused = true)]
381/// async fn main() {
382/// let monitor = tokio_metrics::TaskMonitor::new();
383/// let mut interval = monitor.intervals();
384/// let mut next_interval = || interval.next().unwrap();
385///
386/// // construct and instrument a task, but do not `await` it
387/// let task = monitor.instrument(async {});
388///
389/// // this is the maximum duration representable by tokio_metrics
390/// let max_duration = Duration::from_nanos(u64::MAX);
391///
392/// // let's advance the clock by this amount and poll `task`
393/// let _ = tokio::time::advance(max_duration).await;
394/// task.await;
395///
396/// // durations ≤ `max_duration` are accurately reflected in this metric
397/// assert_eq!(next_interval().total_first_poll_delay, max_duration);
398/// assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
399/// }
400/// ```
401/// If the total delay between instrumentation and first poll exceeds [`u64::MAX`] nanoseconds,
402/// [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay] will overflow:
403/// ```
404/// # use tokio::time::Duration;
405/// #
406/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
407/// # async fn main() {
408/// # let monitor = tokio_metrics::TaskMonitor::new();
409/// #
410/// // construct and instrument a task, but do not `await` it
411/// let task_a = monitor.instrument(async {});
412/// let task_b = monitor.instrument(async {});
413///
414/// // this is the maximum duration representable by tokio_metrics
415/// let max_duration = Duration::from_nanos(u64::MAX);
416///
417/// // let's advance the clock by 1.5x this amount and await `task`
418/// let _ = tokio::time::advance(3 * (max_duration / 2)).await;
419/// task_a.await;
420/// task_b.await;
421///
422/// // the `total_first_poll_delay` has overflowed
423/// assert!(monitor.cumulative().total_first_poll_delay < max_duration);
424/// # }
425/// ```
426/// If *many* tasks are spawned, it will take far less than a [`u64::MAX`]-nanosecond delay to bring
427/// this metric to the precipice of overflow:
428/// ```
429/// # use tokio::time::Duration;
430/// #
431/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
432/// # async fn main() {
433/// # let monitor = tokio_metrics::TaskMonitor::new();
434/// # let mut interval = monitor.intervals();
435/// # let mut next_interval = || interval.next().unwrap();
436/// #
437/// // construct and instrument u16::MAX tasks, but do not `await` them
438/// let first_poll_count = u16::MAX as u64;
439/// let mut tasks = Vec::with_capacity(first_poll_count as usize);
440/// for _ in 0..first_poll_count { tasks.push(monitor.instrument(async {})); }
441///
442/// // this is the maximum duration representable by tokio_metrics
443/// let max_duration = u64::MAX;
444///
445/// // let's advance the clock justenough such that all of the time-to-first-poll
446/// // delays summed nearly equals `max_duration_nanos`, less some remainder...
447/// let iffy_delay = max_duration / (first_poll_count as u64);
448/// let small_remainder = max_duration % first_poll_count;
449/// let _ = tokio::time::advance(Duration::from_nanos(iffy_delay)).await;
450///
451/// // ...then poll all of the instrumented tasks:
452/// for task in tasks { task.await; }
453///
454/// // `total_first_poll_delay` is at the precipice of overflowing!
455/// assert_eq!(
456/// next_interval().total_first_poll_delay.as_nanos(),
457/// (max_duration - small_remainder) as u128
458/// );
459/// assert_eq!(
460/// monitor.cumulative().total_first_poll_delay.as_nanos(),
461/// (max_duration - small_remainder) as u128
462/// );
463/// # }
464/// ```
465/// Frequent, interval-sampled metrics will retain their accuracy, even if the cumulative
466/// metrics counter overflows at most once in the midst of an interval:
467/// ```
468/// # use tokio::time::Duration;
469/// # use tokio_metrics::TaskMonitor;
470/// #
471/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
472/// # async fn main() {
473/// # let monitor = TaskMonitor::new();
474/// # let mut interval = monitor.intervals();
475/// # let mut next_interval = || interval.next().unwrap();
476/// #
477/// let first_poll_count = u16::MAX as u64;
478/// let batch_size = first_poll_count / 3;
479///
480/// let max_duration_ns = u64::MAX;
481/// let iffy_delay_ns = max_duration_ns / first_poll_count;
482///
483/// // Instrument `batch_size` number of tasks, wait for `delay` nanoseconds,
484/// // then await the instrumented tasks.
485/// async fn run_batch(monitor: &TaskMonitor, batch_size: usize, delay: u64) {
486/// let mut tasks = Vec::with_capacity(batch_size);
487/// for _ in 0..batch_size { tasks.push(monitor.instrument(async {})); }
488/// let _ = tokio::time::advance(Duration::from_nanos(delay)).await;
489/// for task in tasks { task.await; }
490/// }
491///
492/// // this is how much `total_time_to_first_poll_ns` will
493/// // increase with each batch we run
494/// let batch_delay = iffy_delay_ns * batch_size;
495///
496/// // run batches 1, 2, and 3
497/// for i in 1..=3 {
498/// run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
499/// assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
500/// assert_eq!(i * batch_delay as u128, monitor.cumulative().total_first_poll_delay.as_nanos());
501/// }
502///
503/// /* now, the `total_time_to_first_poll_ns` counter is at the precipice of overflow */
504/// assert_eq!(monitor.cumulative().total_first_poll_delay.as_nanos(), max_duration_ns as u128);
505///
506/// // run batch 4
507/// run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
508/// // the interval counter remains accurate
509/// assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
510/// // but the cumulative counter has overflowed
511/// assert_eq!(batch_delay as u128 - 1, monitor.cumulative().total_first_poll_delay.as_nanos());
512/// # }
513/// ```
514/// If a cumulative metric overflows *more than once* in the midst of an interval,
515/// its interval-sampled counterpart will also overflow.
516#[derive(Clone, Debug)]
517pub struct TaskMonitor {
518 metrics: Arc<RawMetrics>,
519}
520
521/// Provides an interface for constructing a [`TaskMonitor`] with specialized configuration
522/// parameters.
523#[derive(Clone, Debug, Default)]
524pub struct TaskMonitorBuilder {
525 slow_poll_threshold: Option<Duration>,
526 long_delay_threshold: Option<Duration>,
527}
528
529impl TaskMonitorBuilder {
530 /// Creates a new [`TaskMonitorBuilder`].
531 pub fn new() -> Self {
532 Self {
533 slow_poll_threshold: None,
534 long_delay_threshold: None,
535 }
536 }
537
538 /// Specifies the threshold at which polls are considered 'slow'.
539 pub fn with_slow_poll_threshold(&mut self, threshold: Duration) -> &mut Self {
540 self.slow_poll_threshold = Some(threshold);
541
542 self
543 }
544
545 /// Specifies the threshold at which schedules are considered 'long'.
546 pub fn with_long_delay_threshold(&mut self, threshold: Duration) -> &mut Self {
547 self.long_delay_threshold = Some(threshold);
548
549 self
550 }
551
552 /// Consume the builder, producing a [`TaskMonitor`].
553 pub fn build(self) -> TaskMonitor {
554 TaskMonitor::create(
555 self.slow_poll_threshold
556 .unwrap_or(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD),
557 self.long_delay_threshold
558 .unwrap_or(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD),
559 )
560 }
561}
562
563pin_project! {
564 /// An async task that has been instrumented with [`TaskMonitor::instrument`].
565 #[derive(Debug)]
566 pub struct Instrumented<T> {
567 // The task being instrumented
568 #[pin]
569 task: T,
570
571 // True when the task is polled for the first time
572 did_poll_once: bool,
573
574 // The instant, tracked as nanoseconds since `instrumented_at`, at which the future finished
575 // its last poll.
576 idled_at: u64,
577
578 // State shared between the task and its instrumented waker.
579 state: Arc<State>,
580 }
581
582 impl<T> PinnedDrop for Instrumented<T> {
583 fn drop(this: Pin<&mut Self>) {
584 this.state.metrics.dropped_count.fetch_add(1, SeqCst);
585 }
586 }
587}
588
589/// Key metrics of [instrumented][`TaskMonitor::instrument`] tasks.
590#[non_exhaustive]
591#[derive(Debug, Clone, Copy, Default)]
592pub struct TaskMetrics {
593 /// The number of tasks instrumented.
594 ///
595 /// ##### Examples
596 /// ```
597 /// #[tokio::main]
598 /// async fn main() {
599 /// let monitor = tokio_metrics::TaskMonitor::new();
600 /// let mut interval = monitor.intervals();
601 /// let mut next_interval = || interval.next().unwrap();
602 ///
603 /// // 0 tasks have been instrumented
604 /// assert_eq!(next_interval().instrumented_count, 0);
605 ///
606 /// monitor.instrument(async {});
607 ///
608 /// // 1 task has been instrumented
609 /// assert_eq!(next_interval().instrumented_count, 1);
610 ///
611 /// monitor.instrument(async {});
612 /// monitor.instrument(async {});
613 ///
614 /// // 2 tasks have been instrumented
615 /// assert_eq!(next_interval().instrumented_count, 2);
616 ///
617 /// // since the last interval was produced, 0 tasks have been instrumented
618 /// assert_eq!(next_interval().instrumented_count, 0);
619 /// }
620 /// ```
621 pub instrumented_count: u64,
622
623 /// The number of tasks dropped.
624 ///
625 /// ##### Examples
626 /// ```
627 /// #[tokio::main]
628 /// async fn main() {
629 /// let monitor = tokio_metrics::TaskMonitor::new();
630 /// let mut interval = monitor.intervals();
631 /// let mut next_interval = || interval.next().unwrap();
632 ///
633 /// // 0 tasks have been dropped
634 /// assert_eq!(next_interval().dropped_count, 0);
635 ///
636 /// let _task = monitor.instrument(async {});
637 ///
638 /// // 0 tasks have been dropped
639 /// assert_eq!(next_interval().dropped_count, 0);
640 ///
641 /// monitor.instrument(async {}).await;
642 /// drop(monitor.instrument(async {}));
643 ///
644 /// // 2 tasks have been dropped
645 /// assert_eq!(next_interval().dropped_count, 2);
646 ///
647 /// // since the last interval was produced, 0 tasks have been dropped
648 /// assert_eq!(next_interval().dropped_count, 0);
649 /// }
650 /// ```
651 pub dropped_count: u64,
652
653 /// The number of tasks polled for the first time.
654 ///
655 /// ##### Derived metrics
656 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
657 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
658 /// are first polled.
659 ///
660 /// ##### Examples
661 /// In the below example, no tasks are instrumented or polled in the first sampling interval;
662 /// one task is instrumented (but not polled) in the second sampling interval; that task is
663 /// awaited to completion (and, thus, polled at least once) in the third sampling interval; no
664 /// additional tasks are polled for the first time within the fourth sampling interval:
665 /// ```
666 /// #[tokio::main]
667 /// async fn main() {
668 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
669 /// let mut interval = metrics_monitor.intervals();
670 /// let mut next_interval = || interval.next().unwrap();
671 ///
672 /// // no tasks have been constructed, instrumented, and polled at least once
673 /// assert_eq!(next_interval().first_poll_count, 0);
674 ///
675 /// let task = metrics_monitor.instrument(async {});
676 ///
677 /// // `task` has been constructed and instrumented, but has not yet been polled
678 /// assert_eq!(next_interval().first_poll_count, 0);
679 ///
680 /// // poll `task` to completion
681 /// task.await;
682 ///
683 /// // `task` has been constructed, instrumented, and polled at least once
684 /// assert_eq!(next_interval().first_poll_count, 1);
685 ///
686 /// // since the last interval was produced, 0 tasks have been constructed, instrumented and polled
687 /// assert_eq!(next_interval().first_poll_count, 0);
688 ///
689 /// }
690 /// ```
691 pub first_poll_count: u64,
692
693 /// The total duration elapsed between the instant tasks are instrumented, and the instant they
694 /// are first polled.
695 ///
696 /// ##### Derived metrics
697 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
698 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
699 /// are first polled.
700 ///
701 /// ##### Examples
702 /// In the below example, 0 tasks have been instrumented or polled within the first sampling
703 /// interval, a total of 500ms elapse between the instrumentation and polling of tasks within
704 /// the second sampling interval, and a total of 350ms elapse between the instrumentation and
705 /// polling of tasks within the third sampling interval:
706 /// ```
707 /// use tokio::time::Duration;
708 ///
709 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
710 /// async fn main() {
711 /// let monitor = tokio_metrics::TaskMonitor::new();
712 /// let mut interval = monitor.intervals();
713 /// let mut next_interval = || interval.next().unwrap();
714 ///
715 /// // no tasks have yet been created, instrumented, or polled
716 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
717 /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
718 ///
719 /// // constructs and instruments a task, pauses a given duration, then awaits the task
720 /// async fn instrument_pause_await(monitor: &tokio_metrics::TaskMonitor, pause: Duration) {
721 /// let task = monitor.instrument(async move {});
722 /// tokio::time::sleep(pause).await;
723 /// task.await;
724 /// }
725 ///
726 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
727 /// let task_a_pause_time = Duration::from_millis(500);
728 /// instrument_pause_await(&monitor, task_a_pause_time).await;
729 ///
730 /// assert_eq!(next_interval().total_first_poll_delay, task_a_pause_time);
731 /// assert_eq!(monitor.cumulative().total_first_poll_delay, task_a_pause_time);
732 ///
733 /// // construct and await a task that pauses for 250ms between instrumentation and first poll
734 /// let task_b_pause_time = Duration::from_millis(250);
735 /// instrument_pause_await(&monitor, task_b_pause_time).await;
736 ///
737 /// // construct and await a task that pauses for 100ms between instrumentation and first poll
738 /// let task_c_pause_time = Duration::from_millis(100);
739 /// instrument_pause_await(&monitor, task_c_pause_time).await;
740 ///
741 /// assert_eq!(
742 /// next_interval().total_first_poll_delay,
743 /// task_b_pause_time + task_c_pause_time
744 /// );
745 /// assert_eq!(
746 /// monitor.cumulative().total_first_poll_delay,
747 /// task_a_pause_time + task_b_pause_time + task_c_pause_time
748 /// );
749 /// }
750 /// ```
751 ///
752 /// ##### When is this metric recorded?
753 /// The delay between instrumentation and first poll is not recorded until the first poll
754 /// actually occurs:
755 /// ```
756 /// # use tokio::time::Duration;
757 /// #
758 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
759 /// # async fn main() {
760 /// # let monitor = tokio_metrics::TaskMonitor::new();
761 /// # let mut interval = monitor.intervals();
762 /// # let mut next_interval = || interval.next().unwrap();
763 /// #
764 /// // we construct and instrument a task, but do not `await` it
765 /// let task = monitor.instrument(async {});
766 ///
767 /// // let's sleep for 1s before we poll `task`
768 /// let one_sec = Duration::from_secs(1);
769 /// let _ = tokio::time::sleep(one_sec).await;
770 ///
771 /// // although 1s has now elapsed since the instrumentation of `task`,
772 /// // this is not reflected in `total_first_poll_delay`...
773 /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
774 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
775 ///
776 /// // ...and won't be until `task` is actually polled
777 /// task.await;
778 ///
779 /// // now, the 1s delay is reflected in `total_first_poll_delay`:
780 /// assert_eq!(next_interval().total_first_poll_delay, one_sec);
781 /// assert_eq!(monitor.cumulative().total_first_poll_delay, one_sec);
782 /// # }
783 /// ```
784 ///
785 /// ##### What if first-poll-delay is very large?
786 /// The first-poll-delay of *individual* tasks saturates at `u64::MAX` nanoseconds. However, if
787 /// the *total* first-poll-delay *across* monitored tasks exceeds `u64::MAX` nanoseconds, this
788 /// metric will wrap around:
789 /// ```
790 /// use tokio::time::Duration;
791 ///
792 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
793 /// async fn main() {
794 /// let monitor = tokio_metrics::TaskMonitor::new();
795 ///
796 /// // construct and instrument a task, but do not `await` it
797 /// let task = monitor.instrument(async {});
798 ///
799 /// // this is the maximum duration representable by tokio_metrics
800 /// let max_duration = Duration::from_nanos(u64::MAX);
801 ///
802 /// // let's advance the clock by double this amount and await `task`
803 /// let _ = tokio::time::advance(max_duration * 2).await;
804 /// task.await;
805 ///
806 /// // the time-to-first-poll of `task` saturates at `max_duration`
807 /// assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
808 ///
809 /// // ...but note that the metric *will* wrap around if more tasks are involved
810 /// let task = monitor.instrument(async {});
811 /// let _ = tokio::time::advance(Duration::from_nanos(1)).await;
812 /// task.await;
813 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
814 /// }
815 /// ```
816 pub total_first_poll_delay: Duration,
817
818 /// The total number of times that tasks idled, waiting to be awoken.
819 ///
820 /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
821 /// task completes a poll, and the instant that it is next awoken.
822 ///
823 /// ##### Derived metrics
824 /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
825 /// The mean duration of idles.
826 ///
827 /// ##### Examples
828 /// ```
829 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
830 /// async fn main() {
831 /// let monitor = tokio_metrics::TaskMonitor::new();
832 /// let mut interval = monitor.intervals();
833 /// let mut next_interval = move || interval.next().unwrap();
834 /// let one_sec = std::time::Duration::from_secs(1);
835 ///
836 /// monitor.instrument(async {}).await;
837 ///
838 /// assert_eq!(next_interval().total_idled_count, 0);
839 /// assert_eq!(monitor.cumulative().total_idled_count, 0);
840 ///
841 /// monitor.instrument(async move {
842 /// tokio::time::sleep(one_sec).await;
843 /// }).await;
844 ///
845 /// assert_eq!(next_interval().total_idled_count, 1);
846 /// assert_eq!(monitor.cumulative().total_idled_count, 1);
847 ///
848 /// monitor.instrument(async {
849 /// tokio::time::sleep(one_sec).await;
850 /// tokio::time::sleep(one_sec).await;
851 /// }).await;
852 ///
853 /// assert_eq!(next_interval().total_idled_count, 2);
854 /// assert_eq!(monitor.cumulative().total_idled_count, 3);
855 /// }
856 /// ```
857 pub total_idled_count: u64,
858
859 /// The total duration that tasks idled.
860 ///
861 /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
862 /// task completes a poll, and the instant that it is next awoken.
863 ///
864 /// ##### Derived metrics
865 /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
866 /// The mean duration of idles.
867 ///
868 /// ##### Examples
869 /// ```
870 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
871 /// async fn main() {
872 /// let monitor = tokio_metrics::TaskMonitor::new();
873 /// let mut interval = monitor.intervals();
874 /// let mut next_interval = move || interval.next().unwrap();
875 /// let one_sec = std::time::Duration::from_secs(1);
876 /// let two_sec = std::time::Duration::from_secs(2);
877 ///
878 /// assert_eq!(next_interval().total_idle_duration.as_nanos(), 0);
879 /// assert_eq!(monitor.cumulative().total_idle_duration.as_nanos(), 0);
880 ///
881 /// monitor.instrument(async move {
882 /// tokio::time::sleep(one_sec).await;
883 /// }).await;
884 ///
885 /// assert_eq!(next_interval().total_idle_duration, one_sec);
886 /// assert_eq!(monitor.cumulative().total_idle_duration, one_sec);
887 ///
888 /// monitor.instrument(async move {
889 /// tokio::time::sleep(two_sec).await;
890 /// }).await;
891 ///
892 /// assert_eq!(next_interval().total_idle_duration, two_sec);
893 /// assert_eq!(monitor.cumulative().total_idle_duration, one_sec + two_sec);
894 /// }
895 /// ```
896 pub total_idle_duration: Duration,
897
898 /// The maximum idle duration that a task took.
899 ///
900 /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
901 /// task completes a poll, and the instant that it is next awoken.
902 ///
903 /// ##### Examples
904 /// ```
905 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
906 /// async fn main() {
907 /// let monitor = tokio_metrics::TaskMonitor::new();
908 /// let mut interval = monitor.intervals();
909 /// let mut next_interval = move || interval.next().unwrap();
910 /// let one_sec = std::time::Duration::from_secs(1);
911 /// let two_sec = std::time::Duration::from_secs(2);
912 ///
913 /// assert_eq!(next_interval().max_idle_duration.as_nanos(), 0);
914 /// assert_eq!(monitor.cumulative().max_idle_duration.as_nanos(), 0);
915 ///
916 /// monitor.instrument(async move {
917 /// tokio::time::sleep(one_sec).await;
918 /// }).await;
919 ///
920 /// assert_eq!(next_interval().max_idle_duration, one_sec);
921 /// assert_eq!(monitor.cumulative().max_idle_duration, one_sec);
922 ///
923 /// monitor.instrument(async move {
924 /// tokio::time::sleep(two_sec).await;
925 /// }).await;
926 ///
927 /// assert_eq!(next_interval().max_idle_duration, two_sec);
928 /// assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
929 ///
930 /// monitor.instrument(async move {
931 /// tokio::time::sleep(one_sec).await;
932 /// }).await;
933 ///
934 /// assert_eq!(next_interval().max_idle_duration, one_sec);
935 /// assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
936 /// }
937 /// ```
938 pub max_idle_duration: Duration,
939
940 /// The total number of times that tasks were awoken (and then, presumably, scheduled for
941 /// execution).
942 ///
943 /// ##### Definition
944 /// This metric is equal to [`total_short_delay_count`][TaskMetrics::total_short_delay_count]
945 /// \+ [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
946 ///
947 /// ##### Derived metrics
948 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
949 /// The mean duration that tasks spent waiting to be executed after awakening.
950 ///
951 /// ##### Examples
952 /// In the below example, a task yields to the scheduler a varying number of times between
953 /// sampling intervals; this metric is equal to the number of times the task yielded:
954 /// ```
955 /// #[tokio::main]
956 /// async fn main(){
957 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
958 ///
959 /// // [A] no tasks have been created, instrumented, and polled more than once
960 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
961 ///
962 /// // [B] a `task` is created and instrumented
963 /// let task = {
964 /// let monitor = metrics_monitor.clone();
965 /// metrics_monitor.instrument(async move {
966 /// let mut interval = monitor.intervals();
967 /// let mut next_interval = move || interval.next().unwrap();
968 ///
969 /// // [E] `task` has not yet yielded to the scheduler, and
970 /// // thus has not yet been scheduled since its first `poll`
971 /// assert_eq!(next_interval().total_scheduled_count, 0);
972 ///
973 /// tokio::task::yield_now().await; // yield to the scheduler
974 ///
975 /// // [F] `task` has yielded to the scheduler once (and thus been
976 /// // scheduled once) since the last sampling interval
977 /// assert_eq!(next_interval().total_scheduled_count, 1);
978 ///
979 /// tokio::task::yield_now().await; // yield to the scheduler
980 /// tokio::task::yield_now().await; // yield to the scheduler
981 /// tokio::task::yield_now().await; // yield to the scheduler
982 ///
983 /// // [G] `task` has yielded to the scheduler thrice (and thus been
984 /// // scheduled thrice) since the last sampling interval
985 /// assert_eq!(next_interval().total_scheduled_count, 3);
986 ///
987 /// tokio::task::yield_now().await; // yield to the scheduler
988 ///
989 /// next_interval
990 /// })
991 /// };
992 ///
993 /// // [C] `task` has not yet been polled at all
994 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
995 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
996 ///
997 /// // [D] poll `task` to completion
998 /// let mut next_interval = task.await;
999 ///
1000 /// // [H] `task` has been polled 1 times since the last sample
1001 /// assert_eq!(next_interval().total_scheduled_count, 1);
1002 ///
1003 /// // [I] `task` has been polled 0 times since the last sample
1004 /// assert_eq!(next_interval().total_scheduled_count, 0);
1005 ///
1006 /// // [J] `task` has yielded to the scheduler a total of five times
1007 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 5);
1008 /// }
1009 /// ```
1010 #[doc(alias = "total_delay_count")]
1011 pub total_scheduled_count: u64,
1012
1013 /// The total duration that tasks spent waiting to be polled after awakening.
1014 ///
1015 /// ##### Definition
1016 /// This metric is equal to [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration]
1017 /// \+ [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration].
1018 ///
1019 /// ##### Derived metrics
1020 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
1021 /// The mean duration that tasks spent waiting to be executed after awakening.
1022 ///
1023 /// ##### Examples
1024 /// ```
1025 /// use tokio::time::Duration;
1026 ///
1027 /// #[tokio::main(flavor = "current_thread")]
1028 /// async fn main() {
1029 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1030 /// let mut interval = metrics_monitor.intervals();
1031 /// let mut next_interval = || interval.next().unwrap();
1032 ///
1033 /// // construct and instrument and spawn a task that yields endlessly
1034 /// tokio::spawn(metrics_monitor.instrument(async {
1035 /// loop { tokio::task::yield_now().await }
1036 /// }));
1037 ///
1038 /// tokio::task::yield_now().await;
1039 ///
1040 /// // block the executor for 1 second
1041 /// std::thread::sleep(Duration::from_millis(1000));
1042 ///
1043 /// tokio::task::yield_now().await;
1044 ///
1045 /// // `endless_task` will have spent approximately one second waiting
1046 /// let total_scheduled_duration = next_interval().total_scheduled_duration;
1047 /// assert!(total_scheduled_duration >= Duration::from_millis(1000));
1048 /// assert!(total_scheduled_duration <= Duration::from_millis(1100));
1049 /// }
1050 /// ```
1051 #[doc(alias = "total_delay_duration")]
1052 pub total_scheduled_duration: Duration,
1053
1054 /// The total number of times that tasks were polled.
1055 ///
1056 /// ##### Definition
1057 /// This metric is equal to [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count]
1058 /// \+ [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
1059 ///
1060 /// ##### Derived metrics
1061 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1062 /// The mean duration of polls.
1063 ///
1064 /// ##### Examples
1065 /// In the below example, a task with multiple yield points is await'ed to completion; this
1066 /// metric reflects the number of `await`s within each sampling interval:
1067 /// ```
1068 /// #[tokio::main]
1069 /// async fn main() {
1070 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1071 ///
1072 /// // [A] no tasks have been created, instrumented, and polled more than once
1073 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1074 ///
1075 /// // [B] a `task` is created and instrumented
1076 /// let task = {
1077 /// let monitor = metrics_monitor.clone();
1078 /// metrics_monitor.instrument(async move {
1079 /// let mut interval = monitor.intervals();
1080 /// let mut next_interval = move || interval.next().unwrap();
1081 ///
1082 /// // [E] task is in the midst of its first poll
1083 /// assert_eq!(next_interval().total_poll_count, 0);
1084 ///
1085 /// tokio::task::yield_now().await; // poll 1
1086 ///
1087 /// // [F] task has been polled 1 time
1088 /// assert_eq!(next_interval().total_poll_count, 1);
1089 ///
1090 /// tokio::task::yield_now().await; // poll 2
1091 /// tokio::task::yield_now().await; // poll 3
1092 /// tokio::task::yield_now().await; // poll 4
1093 ///
1094 /// // [G] task has been polled 3 times
1095 /// assert_eq!(next_interval().total_poll_count, 3);
1096 ///
1097 /// tokio::task::yield_now().await; // poll 5
1098 ///
1099 /// next_interval // poll 6
1100 /// })
1101 /// };
1102 ///
1103 /// // [C] `task` has not yet been polled at all
1104 /// assert_eq!(metrics_monitor.cumulative().total_poll_count, 0);
1105 ///
1106 /// // [D] poll `task` to completion
1107 /// let mut next_interval = task.await;
1108 ///
1109 /// // [H] `task` has been polled 2 times since the last sample
1110 /// assert_eq!(next_interval().total_poll_count, 2);
1111 ///
1112 /// // [I] `task` has been polled 0 times since the last sample
1113 /// assert_eq!(next_interval().total_poll_count, 0);
1114 ///
1115 /// // [J] `task` has been polled 6 times
1116 /// assert_eq!(metrics_monitor.cumulative().total_poll_count, 6);
1117 /// }
1118 /// ```
1119 pub total_poll_count: u64,
1120
1121 /// The total duration elapsed during polls.
1122 ///
1123 /// ##### Definition
1124 /// This metric is equal to [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration]
1125 /// \+ [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration].
1126 ///
1127 /// ##### Derived metrics
1128 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1129 /// The mean duration of polls.
1130 ///
1131 /// #### Examples
1132 /// ```
1133 /// use tokio::time::Duration;
1134 ///
1135 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1136 /// async fn main() {
1137 /// let monitor = tokio_metrics::TaskMonitor::new();
1138 /// let mut interval = monitor.intervals();
1139 /// let mut next_interval = move || interval.next().unwrap();
1140 ///
1141 /// assert_eq!(next_interval().total_poll_duration, Duration::ZERO);
1142 ///
1143 /// monitor.instrument(async {
1144 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
1145 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
1146 /// () // poll 3 (0s)
1147 /// }).await;
1148 ///
1149 /// assert_eq!(next_interval().total_poll_duration, Duration::from_secs(2));
1150 /// }
1151 /// ```
1152 pub total_poll_duration: Duration,
1153
1154 /// The total number of times that polling tasks completed swiftly.
1155 ///
1156 /// Here, 'swiftly' is defined as completing in strictly less time than
1157 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1158 ///
1159 /// ##### Derived metrics
1160 /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1161 /// The mean duration of fast polls.
1162 ///
1163 /// ##### Examples
1164 /// In the below example, 0 polls occur within the first sampling interval, 3 fast polls occur
1165 /// within the second sampling interval, and 2 fast polls occur within the third sampling
1166 /// interval:
1167 /// ```
1168 /// use std::future::Future;
1169 /// use std::time::Duration;
1170 ///
1171 /// #[tokio::main]
1172 /// async fn main() {
1173 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1174 /// let mut interval = metrics_monitor.intervals();
1175 /// let mut next_interval = || interval.next().unwrap();
1176 ///
1177 /// // no tasks have been constructed, instrumented, or polled
1178 /// assert_eq!(next_interval().total_fast_poll_count, 0);
1179 ///
1180 /// let fast = Duration::ZERO;
1181 ///
1182 /// // this task completes in three fast polls
1183 /// let _ = metrics_monitor.instrument(async {
1184 /// spin_for(fast).await; // fast poll 1
1185 /// spin_for(fast).await; // fast poll 2
1186 /// spin_for(fast) // fast poll 3
1187 /// }).await;
1188 ///
1189 /// assert_eq!(next_interval().total_fast_poll_count, 3);
1190 ///
1191 /// // this task completes in two fast polls
1192 /// let _ = metrics_monitor.instrument(async {
1193 /// spin_for(fast).await; // fast poll 1
1194 /// spin_for(fast) // fast poll 2
1195 /// }).await;
1196 ///
1197 /// assert_eq!(next_interval().total_fast_poll_count, 2);
1198 /// }
1199 ///
1200 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1201 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1202 /// let start = tokio::time::Instant::now();
1203 /// while start.elapsed() <= duration {}
1204 /// tokio::task::yield_now()
1205 /// }
1206 /// ```
1207 pub total_fast_poll_count: u64,
1208
1209 /// The total duration of fast polls.
1210 ///
1211 /// Here, 'fast' is defined as completing in strictly less time than
1212 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1213 ///
1214 /// ##### Derived metrics
1215 /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1216 /// The mean duration of fast polls.
1217 ///
1218 /// ##### Examples
1219 /// In the below example, no tasks are polled in the first sampling interval; three fast polls
1220 /// consume a total of 3μs time in the second sampling interval; and two fast polls consume a
1221 /// total of 2μs time in the third sampling interval:
1222 /// ```
1223 /// use std::future::Future;
1224 /// use std::time::Duration;
1225 ///
1226 /// #[tokio::main]
1227 /// async fn main() {
1228 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1229 /// let mut interval = metrics_monitor.intervals();
1230 /// let mut next_interval = || interval.next().unwrap();
1231 ///
1232 /// // no tasks have been constructed, instrumented, or polled
1233 /// let interval = next_interval();
1234 /// assert_eq!(interval.total_fast_poll_duration, Duration::ZERO);
1235 ///
1236 /// let fast = Duration::from_micros(1);
1237 ///
1238 /// // this task completes in three fast polls
1239 /// let task_a_time = time(metrics_monitor.instrument(async {
1240 /// spin_for(fast).await; // fast poll 1
1241 /// spin_for(fast).await; // fast poll 2
1242 /// spin_for(fast) // fast poll 3
1243 /// })).await;
1244 ///
1245 /// let interval = next_interval();
1246 /// assert!(interval.total_fast_poll_duration >= fast * 3);
1247 /// assert!(interval.total_fast_poll_duration <= task_a_time);
1248 ///
1249 /// // this task completes in two fast polls
1250 /// let task_b_time = time(metrics_monitor.instrument(async {
1251 /// spin_for(fast).await; // fast poll 1
1252 /// spin_for(fast) // fast poll 2
1253 /// })).await;
1254 ///
1255 /// let interval = next_interval();
1256 /// assert!(interval.total_fast_poll_duration >= fast * 2);
1257 /// assert!(interval.total_fast_poll_duration <= task_b_time);
1258 /// }
1259 ///
1260 /// /// Produces the amount of time it took to await a given async task.
1261 /// async fn time(task: impl Future) -> Duration {
1262 /// let start = tokio::time::Instant::now();
1263 /// task.await;
1264 /// start.elapsed()
1265 /// }
1266 ///
1267 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1268 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1269 /// let start = tokio::time::Instant::now();
1270 /// while start.elapsed() <= duration {}
1271 /// tokio::task::yield_now()
1272 /// }
1273 /// ```
1274 pub total_fast_poll_duration: Duration,
1275
1276 /// The total number of times that polling tasks completed slowly.
1277 ///
1278 /// Here, 'slowly' is defined as completing in at least as much time as
1279 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1280 ///
1281 /// ##### Derived metrics
1282 /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1283 /// The mean duration of slow polls.
1284 ///
1285 /// ##### Examples
1286 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1287 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1288 /// interval:
1289 /// ```
1290 /// use std::future::Future;
1291 /// use std::time::Duration;
1292 ///
1293 /// #[tokio::main]
1294 /// async fn main() {
1295 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1296 /// let mut interval = metrics_monitor.intervals();
1297 /// let mut next_interval = || interval.next().unwrap();
1298 ///
1299 /// // no tasks have been constructed, instrumented, or polled
1300 /// assert_eq!(next_interval().total_slow_poll_count, 0);
1301 ///
1302 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1303 ///
1304 /// // this task completes in three slow polls
1305 /// let _ = metrics_monitor.instrument(async {
1306 /// spin_for(slow).await; // slow poll 1
1307 /// spin_for(slow).await; // slow poll 2
1308 /// spin_for(slow) // slow poll 3
1309 /// }).await;
1310 ///
1311 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1312 ///
1313 /// // this task completes in two slow polls
1314 /// let _ = metrics_monitor.instrument(async {
1315 /// spin_for(slow).await; // slow poll 1
1316 /// spin_for(slow) // slow poll 2
1317 /// }).await;
1318 ///
1319 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1320 /// }
1321 ///
1322 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1323 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1324 /// let start = tokio::time::Instant::now();
1325 /// while start.elapsed() <= duration {}
1326 /// tokio::task::yield_now()
1327 /// }
1328 /// ```
1329 pub total_slow_poll_count: u64,
1330
1331 /// The total duration of slow polls.
1332 ///
1333 /// Here, 'slowly' is defined as completing in at least as much time as
1334 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1335 ///
1336 /// ##### Derived metrics
1337 /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1338 /// The mean duration of slow polls.
1339 ///
1340 /// ##### Examples
1341 /// In the below example, no tasks are polled in the first sampling interval; three slow polls
1342 /// consume a total of
1343 /// 30 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD]
1344 /// time in the second sampling interval; and two slow polls consume a total of
1345 /// 20 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
1346 /// third sampling interval:
1347 /// ```
1348 /// use std::future::Future;
1349 /// use std::time::Duration;
1350 ///
1351 /// #[tokio::main]
1352 /// async fn main() {
1353 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1354 /// let mut interval = metrics_monitor.intervals();
1355 /// let mut next_interval = || interval.next().unwrap();
1356 ///
1357 /// // no tasks have been constructed, instrumented, or polled
1358 /// let interval = next_interval();
1359 /// assert_eq!(interval.total_slow_poll_duration, Duration::ZERO);
1360 ///
1361 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1362 ///
1363 /// // this task completes in three slow polls
1364 /// let task_a_time = time(metrics_monitor.instrument(async {
1365 /// spin_for(slow).await; // slow poll 1
1366 /// spin_for(slow).await; // slow poll 2
1367 /// spin_for(slow) // slow poll 3
1368 /// })).await;
1369 ///
1370 /// let interval = next_interval();
1371 /// assert!(interval.total_slow_poll_duration >= slow * 3);
1372 /// assert!(interval.total_slow_poll_duration <= task_a_time);
1373 ///
1374 /// // this task completes in two slow polls
1375 /// let task_b_time = time(metrics_monitor.instrument(async {
1376 /// spin_for(slow).await; // slow poll 1
1377 /// spin_for(slow) // slow poll 2
1378 /// })).await;
1379 ///
1380 /// let interval = next_interval();
1381 /// assert!(interval.total_slow_poll_duration >= slow * 2);
1382 /// assert!(interval.total_slow_poll_duration <= task_b_time);
1383 /// }
1384 ///
1385 /// /// Produces the amount of time it took to await a given async task.
1386 /// async fn time(task: impl Future) -> Duration {
1387 /// let start = tokio::time::Instant::now();
1388 /// task.await;
1389 /// start.elapsed()
1390 /// }
1391 ///
1392 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1393 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1394 /// let start = tokio::time::Instant::now();
1395 /// while start.elapsed() <= duration {}
1396 /// tokio::task::yield_now()
1397 /// }
1398 /// ```
1399 pub total_slow_poll_duration: Duration,
1400
1401 /// The total count of tasks with short scheduling delays.
1402 ///
1403 /// This is defined as tasks taking strictly less than
1404 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1405 /// scheduled.
1406 ///
1407 /// ##### Derived metrics
1408 /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1409 /// The mean duration of short scheduling delays.
1410 pub total_short_delay_count: u64,
1411
1412 /// The total duration of tasks with short scheduling delays.
1413 ///
1414 /// This is defined as tasks taking strictly less than
1415 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1416 /// scheduled.
1417 ///
1418 /// ##### Derived metrics
1419 /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1420 /// The mean duration of short scheduling delays.
1421 pub total_short_delay_duration: Duration,
1422
1423 /// The total count of tasks with long scheduling delays.
1424 ///
1425 /// This is defined as tasks taking
1426 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1427 /// after being scheduled.
1428 ///
1429 /// ##### Derived metrics
1430 /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1431 /// The mean duration of short scheduling delays.
1432 pub total_long_delay_count: u64,
1433
1434 /// The total duration of tasks with long scheduling delays.
1435 ///
1436 /// This is defined as tasks taking
1437 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1438 /// after being scheduled.
1439 ///
1440 /// ##### Derived metrics
1441 /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1442 /// The mean duration of short scheduling delays.
1443 pub total_long_delay_duration: Duration,
1444}
1445
1446/// Tracks the metrics, shared across the various types.
1447#[derive(Debug)]
1448struct RawMetrics {
1449 /// A task poll takes longer than this, it is considered a slow poll.
1450 slow_poll_threshold: Duration,
1451
1452 /// A scheduling delay of at least this long will be considered a long delay
1453 long_delay_threshold: Duration,
1454
1455 /// Total number of instrumented tasks.
1456 instrumented_count: AtomicU64,
1457
1458 /// Total number of instrumented tasks polled at least once.
1459 first_poll_count: AtomicU64,
1460
1461 /// Total number of times tasks entered the `idle` state.
1462 total_idled_count: AtomicU64,
1463
1464 /// Total number of times tasks were scheduled.
1465 total_scheduled_count: AtomicU64,
1466
1467 /// Total number of times tasks were polled fast
1468 total_fast_poll_count: AtomicU64,
1469
1470 /// Total number of times tasks were polled slow
1471 total_slow_poll_count: AtomicU64,
1472
1473 /// Total number of times tasks had long delay,
1474 total_long_delay_count: AtomicU64,
1475
1476 /// Total number of times tasks had little delay
1477 total_short_delay_count: AtomicU64,
1478
1479 /// Total number of times tasks were dropped
1480 dropped_count: AtomicU64,
1481
1482 /// Total amount of time until the first poll
1483 total_first_poll_delay_ns: AtomicU64,
1484
1485 /// Total amount of time tasks spent in the `idle` state.
1486 total_idle_duration_ns: AtomicU64,
1487
1488 /// The longest time tasks spent in the `idle` state locally.
1489 /// This will be used to track the local max between interval
1490 /// metric snapshots.
1491 local_max_idle_duration_ns: AtomicU64,
1492
1493 /// The longest time tasks spent in the `idle` state.
1494 global_max_idle_duration_ns: AtomicU64,
1495
1496 /// Total amount of time tasks spent in the waking state.
1497 total_scheduled_duration_ns: AtomicU64,
1498
1499 /// Total amount of time tasks spent being polled below the slow cut off.
1500 total_fast_poll_duration_ns: AtomicU64,
1501
1502 /// Total amount of time tasks spent being polled above the slow cut off.
1503 total_slow_poll_duration: AtomicU64,
1504
1505 /// Total amount of time tasks spent being polled below the long delay cut off.
1506 total_short_delay_duration_ns: AtomicU64,
1507
1508 /// Total amount of time tasks spent being polled at or above the long delay cut off.
1509 total_long_delay_duration_ns: AtomicU64,
1510}
1511
1512#[derive(Debug)]
1513struct State {
1514 /// Where metrics should be recorded
1515 metrics: Arc<RawMetrics>,
1516
1517 /// Instant at which the task was instrumented. This is used to track the time to first poll.
1518 instrumented_at: Instant,
1519
1520 /// The instant, tracked as nanoseconds since `instrumented_at`, at which the future
1521 /// was last woken.
1522 woke_at: AtomicU64,
1523
1524 /// Waker to forward notifications to.
1525 waker: AtomicWaker,
1526}
1527
1528impl TaskMonitor {
1529 /// The default duration at which polls cross the threshold into being categorized as 'slow' is
1530 /// 50μs.
1531 #[cfg(not(test))]
1532 pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_micros(50);
1533 #[cfg(test)]
1534 #[allow(missing_docs)]
1535 pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(500);
1536
1537 /// The default duration at which schedules cross the threshold into being categorized as 'long'
1538 /// is 50μs.
1539 #[cfg(not(test))]
1540 pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_micros(50);
1541 #[cfg(test)]
1542 #[allow(missing_docs)]
1543 pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_millis(500);
1544
1545 /// Constructs a new task monitor.
1546 ///
1547 /// Uses [`Self::DEFAULT_SLOW_POLL_THRESHOLD`] as the threshold at which polls will be
1548 /// considered 'slow'.
1549 ///
1550 /// Uses [`Self::DEFAULT_LONG_DELAY_THRESHOLD`] as the threshold at which scheduling will be
1551 /// considered 'long'.
1552 pub fn new() -> TaskMonitor {
1553 TaskMonitor::with_slow_poll_threshold(Self::DEFAULT_SLOW_POLL_THRESHOLD)
1554 }
1555
1556 /// Constructs a builder for a task monitor.
1557 pub fn builder() -> TaskMonitorBuilder {
1558 TaskMonitorBuilder::new()
1559 }
1560
1561 /// Constructs a new task monitor with a given threshold at which polls are considered 'slow'.
1562 ///
1563 /// ##### Selecting an appropriate threshold
1564 /// TODO. What advice can we give here?
1565 ///
1566 /// ##### Examples
1567 /// In the below example, low-threshold and high-threshold monitors are constructed and
1568 /// instrument identical tasks; the low-threshold monitor reports4 slow polls, and the
1569 /// high-threshold monitor reports only 2 slow polls:
1570 /// ```
1571 /// use std::future::Future;
1572 /// use std::time::Duration;
1573 /// use tokio_metrics::TaskMonitor;
1574 ///
1575 /// #[tokio::main]
1576 /// async fn main() {
1577 /// let lo_threshold = Duration::from_micros(10);
1578 /// let hi_threshold = Duration::from_millis(10);
1579 ///
1580 /// let lo_monitor = TaskMonitor::with_slow_poll_threshold(lo_threshold);
1581 /// let hi_monitor = TaskMonitor::with_slow_poll_threshold(hi_threshold);
1582 ///
1583 /// let make_task = || async {
1584 /// spin_for(lo_threshold).await; // faster poll 1
1585 /// spin_for(lo_threshold).await; // faster poll 2
1586 /// spin_for(hi_threshold).await; // slower poll 3
1587 /// spin_for(hi_threshold).await // slower poll 4
1588 /// };
1589 ///
1590 /// lo_monitor.instrument(make_task()).await;
1591 /// hi_monitor.instrument(make_task()).await;
1592 ///
1593 /// // the low-threshold monitor reported 4 slow polls:
1594 /// assert_eq!(lo_monitor.cumulative().total_slow_poll_count, 4);
1595 /// // the high-threshold monitor reported only 2 slow polls:
1596 /// assert_eq!(hi_monitor.cumulative().total_slow_poll_count, 2);
1597 /// }
1598 ///
1599 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1600 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1601 /// let start = tokio::time::Instant::now();
1602 /// while start.elapsed() <= duration {}
1603 /// tokio::task::yield_now()
1604 /// }
1605 /// ```
1606 pub fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitor {
1607 Self::create(slow_poll_cut_off, Self::DEFAULT_LONG_DELAY_THRESHOLD)
1608 }
1609
1610 fn create(slow_poll_cut_off: Duration, long_delay_cut_off: Duration) -> TaskMonitor {
1611 TaskMonitor {
1612 metrics: Arc::new(RawMetrics {
1613 slow_poll_threshold: slow_poll_cut_off,
1614 first_poll_count: AtomicU64::new(0),
1615 total_idled_count: AtomicU64::new(0),
1616 total_scheduled_count: AtomicU64::new(0),
1617 total_fast_poll_count: AtomicU64::new(0),
1618 total_slow_poll_count: AtomicU64::new(0),
1619 total_long_delay_count: AtomicU64::new(0),
1620 instrumented_count: AtomicU64::new(0),
1621 dropped_count: AtomicU64::new(0),
1622 total_first_poll_delay_ns: AtomicU64::new(0),
1623 total_scheduled_duration_ns: AtomicU64::new(0),
1624 local_max_idle_duration_ns: AtomicU64::new(0),
1625 global_max_idle_duration_ns: AtomicU64::new(0),
1626 total_idle_duration_ns: AtomicU64::new(0),
1627 total_fast_poll_duration_ns: AtomicU64::new(0),
1628 total_slow_poll_duration: AtomicU64::new(0),
1629 total_short_delay_duration_ns: AtomicU64::new(0),
1630 long_delay_threshold: long_delay_cut_off,
1631 total_short_delay_count: AtomicU64::new(0),
1632 total_long_delay_duration_ns: AtomicU64::new(0),
1633 }),
1634 }
1635 }
1636
1637 /// Produces the duration greater-than-or-equal-to at which polls are categorized as slow.
1638 ///
1639 /// ##### Examples
1640 /// In the below example, [`TaskMonitor`] is initialized with [`TaskMonitor::new`];
1641 /// consequently, its slow-poll threshold equals [`TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD`]:
1642 /// ```
1643 /// use tokio_metrics::TaskMonitor;
1644 ///
1645 /// #[tokio::main]
1646 /// async fn main() {
1647 /// let metrics_monitor = TaskMonitor::new();
1648 ///
1649 /// assert_eq!(
1650 /// metrics_monitor.slow_poll_threshold(),
1651 /// TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD
1652 /// );
1653 /// }
1654 /// ```
1655 pub fn slow_poll_threshold(&self) -> Duration {
1656 self.metrics.slow_poll_threshold
1657 }
1658
1659 /// Produces the duration greater-than-or-equal-to at which scheduling delays are categorized
1660 /// as long.
1661 pub fn long_delay_threshold(&self) -> Duration {
1662 self.metrics.long_delay_threshold
1663 }
1664
1665 /// Produces an instrumented façade around a given async task.
1666 ///
1667 /// ##### Examples
1668 /// Instrument an async task by passing it to [`TaskMonitor::instrument`]:
1669 /// ```
1670 /// #[tokio::main]
1671 /// async fn main() {
1672 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1673 ///
1674 /// // 0 tasks have been instrumented, much less polled
1675 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1676 ///
1677 /// // instrument a task and poll it to completion
1678 /// metrics_monitor.instrument(async {}).await;
1679 ///
1680 /// // 1 task has been instrumented and polled
1681 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 1);
1682 ///
1683 /// // instrument a task and poll it to completion
1684 /// metrics_monitor.instrument(async {}).await;
1685 ///
1686 /// // 2 tasks have been instrumented and polled
1687 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 2);
1688 /// }
1689 /// ```
1690 /// An aync task may be tracked by multiple [`TaskMonitor`]s; e.g.:
1691 /// ```
1692 /// #[tokio::main]
1693 /// async fn main() {
1694 /// let monitor_a = tokio_metrics::TaskMonitor::new();
1695 /// let monitor_b = tokio_metrics::TaskMonitor::new();
1696 ///
1697 /// // 0 tasks have been instrumented, much less polled
1698 /// assert_eq!(monitor_a.cumulative().first_poll_count, 0);
1699 /// assert_eq!(monitor_b.cumulative().first_poll_count, 0);
1700 ///
1701 /// // instrument a task and poll it to completion
1702 /// monitor_a.instrument(monitor_b.instrument(async {})).await;
1703 ///
1704 /// // 1 task has been instrumented and polled
1705 /// assert_eq!(monitor_a.cumulative().first_poll_count, 1);
1706 /// assert_eq!(monitor_b.cumulative().first_poll_count, 1);
1707 /// }
1708 /// ```
1709 /// It is also possible (but probably undesirable) to instrument an async task multiple times
1710 /// with the same [`TaskMonitor`]; e.g.:
1711 /// ```
1712 /// #[tokio::main]
1713 /// async fn main() {
1714 /// let monitor = tokio_metrics::TaskMonitor::new();
1715 ///
1716 /// // 0 tasks have been instrumented, much less polled
1717 /// assert_eq!(monitor.cumulative().first_poll_count, 0);
1718 ///
1719 /// // instrument a task and poll it to completion
1720 /// monitor.instrument(monitor.instrument(async {})).await;
1721 ///
1722 /// // 2 tasks have been instrumented and polled, supposedly
1723 /// assert_eq!(monitor.cumulative().first_poll_count, 2);
1724 /// }
1725 /// ```
1726 pub fn instrument<F>(&self, task: F) -> Instrumented<F> {
1727 self.metrics.instrumented_count.fetch_add(1, SeqCst);
1728 Instrumented {
1729 task,
1730 did_poll_once: false,
1731 idled_at: 0,
1732 state: Arc::new(State {
1733 metrics: self.metrics.clone(),
1734 instrumented_at: Instant::now(),
1735 woke_at: AtomicU64::new(0),
1736 waker: AtomicWaker::new(),
1737 }),
1738 }
1739 }
1740
1741 /// Produces [`TaskMetrics`] for the tasks instrumented by this [`TaskMonitor`], collected since
1742 /// the construction of [`TaskMonitor`].
1743 ///
1744 /// ##### See also
1745 /// - [`TaskMonitor::intervals`]:
1746 /// produces [`TaskMetrics`] for user-defined sampling intervals, instead of cumulatively
1747 ///
1748 /// ##### Examples
1749 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1750 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1751 /// interval; five slow polls occur across all sampling intervals:
1752 /// ```
1753 /// use std::future::Future;
1754 /// use std::time::Duration;
1755 ///
1756 /// #[tokio::main]
1757 /// async fn main() {
1758 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1759 ///
1760 /// // initialize a stream of sampling intervals
1761 /// let mut intervals = metrics_monitor.intervals();
1762 /// // each call of `next_interval` will produce metrics for the last sampling interval
1763 /// let mut next_interval = || intervals.next().unwrap();
1764 ///
1765 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1766 ///
1767 /// // this task completes in three slow polls
1768 /// let _ = metrics_monitor.instrument(async {
1769 /// spin_for(slow).await; // slow poll 1
1770 /// spin_for(slow).await; // slow poll 2
1771 /// spin_for(slow) // slow poll 3
1772 /// }).await;
1773 ///
1774 /// // in the previous sampling interval, there were 3 slow polls
1775 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1776 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 3);
1777 ///
1778 /// // this task completes in two slow polls
1779 /// let _ = metrics_monitor.instrument(async {
1780 /// spin_for(slow).await; // slow poll 1
1781 /// spin_for(slow) // slow poll 2
1782 /// }).await;
1783 ///
1784 /// // in the previous sampling interval, there were 2 slow polls
1785 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1786 ///
1787 /// // across all sampling interval, there were a total of 5 slow polls
1788 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1789 /// }
1790 ///
1791 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1792 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1793 /// let start = tokio::time::Instant::now();
1794 /// while start.elapsed() <= duration {}
1795 /// tokio::task::yield_now()
1796 /// }
1797 /// ```
1798 pub fn cumulative(&self) -> TaskMetrics {
1799 self.metrics.metrics()
1800 }
1801
1802 /// Produces an unending iterator of metric sampling intervals.
1803 ///
1804 /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1805 /// produced by [`TaskMonitor::intervals`]. The item type of this iterator is [`TaskMetrics`],
1806 /// which is a bundle of task metrics that describe *only* events occurring within that sampling
1807 /// interval.
1808 ///
1809 /// ##### Examples
1810 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1811 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1812 /// interval; five slow polls occur across all sampling intervals:
1813 /// ```
1814 /// use std::future::Future;
1815 /// use std::time::Duration;
1816 ///
1817 /// #[tokio::main]
1818 /// async fn main() {
1819 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1820 ///
1821 /// // initialize a stream of sampling intervals
1822 /// let mut intervals = metrics_monitor.intervals();
1823 /// // each call of `next_interval` will produce metrics for the last sampling interval
1824 /// let mut next_interval = || intervals.next().unwrap();
1825 ///
1826 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1827 ///
1828 /// // this task completes in three slow polls
1829 /// let _ = metrics_monitor.instrument(async {
1830 /// spin_for(slow).await; // slow poll 1
1831 /// spin_for(slow).await; // slow poll 2
1832 /// spin_for(slow) // slow poll 3
1833 /// }).await;
1834 ///
1835 /// // in the previous sampling interval, there were 3 slow polls
1836 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1837 ///
1838 /// // this task completes in two slow polls
1839 /// let _ = metrics_monitor.instrument(async {
1840 /// spin_for(slow).await; // slow poll 1
1841 /// spin_for(slow) // slow poll 2
1842 /// }).await;
1843 ///
1844 /// // in the previous sampling interval, there were 2 slow polls
1845 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1846 ///
1847 /// // across all sampling intervals, there were a total of 5 slow polls
1848 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1849 /// }
1850 ///
1851 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1852 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1853 /// let start = tokio::time::Instant::now();
1854 /// while start.elapsed() <= duration {}
1855 /// tokio::task::yield_now()
1856 /// }
1857 /// ```
1858 pub fn intervals(&self) -> TaskIntervals {
1859 TaskIntervals {
1860 metrics: self.metrics.clone(),
1861 previous: None,
1862 }
1863 }
1864}
1865
1866impl RawMetrics {
1867 fn get_and_reset_local_max_idle_duration(&self) -> Duration {
1868 Duration::from_nanos(self.local_max_idle_duration_ns.swap(0, SeqCst))
1869 }
1870
1871 fn metrics(&self) -> TaskMetrics {
1872 let total_fast_poll_count = self.total_fast_poll_count.load(SeqCst);
1873 let total_slow_poll_count = self.total_slow_poll_count.load(SeqCst);
1874
1875 let total_fast_poll_duration =
1876 Duration::from_nanos(self.total_fast_poll_duration_ns.load(SeqCst));
1877 let total_slow_poll_duration =
1878 Duration::from_nanos(self.total_slow_poll_duration.load(SeqCst));
1879
1880 let total_poll_count = total_fast_poll_count + total_slow_poll_count;
1881 let total_poll_duration = total_fast_poll_duration + total_slow_poll_duration;
1882
1883 TaskMetrics {
1884 instrumented_count: self.instrumented_count.load(SeqCst),
1885 dropped_count: self.dropped_count.load(SeqCst),
1886
1887 total_poll_count,
1888 total_poll_duration,
1889 first_poll_count: self.first_poll_count.load(SeqCst),
1890 total_idled_count: self.total_idled_count.load(SeqCst),
1891 total_scheduled_count: self.total_scheduled_count.load(SeqCst),
1892 total_fast_poll_count: self.total_fast_poll_count.load(SeqCst),
1893 total_slow_poll_count: self.total_slow_poll_count.load(SeqCst),
1894 total_short_delay_count: self.total_short_delay_count.load(SeqCst),
1895 total_long_delay_count: self.total_long_delay_count.load(SeqCst),
1896 total_first_poll_delay: Duration::from_nanos(
1897 self.total_first_poll_delay_ns.load(SeqCst),
1898 ),
1899 max_idle_duration: Duration::from_nanos(self.global_max_idle_duration_ns.load(SeqCst)),
1900 total_idle_duration: Duration::from_nanos(self.total_idle_duration_ns.load(SeqCst)),
1901 total_scheduled_duration: Duration::from_nanos(
1902 self.total_scheduled_duration_ns.load(SeqCst),
1903 ),
1904 total_fast_poll_duration: Duration::from_nanos(
1905 self.total_fast_poll_duration_ns.load(SeqCst),
1906 ),
1907 total_slow_poll_duration: Duration::from_nanos(
1908 self.total_slow_poll_duration.load(SeqCst),
1909 ),
1910 total_short_delay_duration: Duration::from_nanos(
1911 self.total_short_delay_duration_ns.load(SeqCst),
1912 ),
1913 total_long_delay_duration: Duration::from_nanos(
1914 self.total_long_delay_duration_ns.load(SeqCst),
1915 ),
1916 }
1917 }
1918}
1919
1920impl Default for TaskMonitor {
1921 fn default() -> TaskMonitor {
1922 TaskMonitor::new()
1923 }
1924}
1925
1926derived_metrics!(
1927 [TaskMetrics] {
1928 stable {
1929 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
1930 /// are first polled.
1931 ///
1932 /// ##### Definition
1933 /// This metric is derived from [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay]
1934 /// ÷ [`first_poll_count`][TaskMetrics::first_poll_count].
1935 ///
1936 /// ##### Interpretation
1937 /// If this metric increases, it means that, on average, tasks spent longer waiting to be
1938 /// initially polled.
1939 ///
1940 /// ##### See also
1941 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
1942 /// The mean duration that tasks spent waiting to be executed after awakening.
1943 ///
1944 /// ##### Examples
1945 /// In the below example, no tasks are instrumented or polled within the first sampling
1946 /// interval; in the second sampling interval, 500ms elapse between the instrumentation of a
1947 /// task and its first poll; in the third sampling interval, a mean of 750ms elapse between the
1948 /// instrumentation and first poll of two tasks:
1949 /// ```
1950 /// use std::time::Duration;
1951 ///
1952 /// #[tokio::main]
1953 /// async fn main() {
1954 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1955 /// let mut interval = metrics_monitor.intervals();
1956 /// let mut next_interval = || interval.next().unwrap();
1957 ///
1958 /// // no tasks have yet been created, instrumented, or polled
1959 /// assert_eq!(next_interval().mean_first_poll_delay(), Duration::ZERO);
1960 ///
1961 /// // constructs and instruments a task, pauses for `pause_time`, awaits the task, then
1962 /// // produces the total time it took to do all of the aforementioned
1963 /// async fn instrument_pause_await(
1964 /// metrics_monitor: &tokio_metrics::TaskMonitor,
1965 /// pause_time: Duration
1966 /// ) -> Duration
1967 /// {
1968 /// let before_instrumentation = tokio::time::Instant::now();
1969 /// let task = metrics_monitor.instrument(async move {});
1970 /// tokio::time::sleep(pause_time).await;
1971 /// task.await;
1972 /// before_instrumentation.elapsed()
1973 /// }
1974 ///
1975 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
1976 /// let task_a_pause_time = Duration::from_millis(500);
1977 /// let task_a_total_time = instrument_pause_await(&metrics_monitor, task_a_pause_time).await;
1978 ///
1979 /// // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
1980 /// // pause time of 500ms, and less-than-or-equal-to the total runtime of `task_a`
1981 /// let mean_first_poll_delay = next_interval().mean_first_poll_delay();
1982 /// assert!(mean_first_poll_delay >= task_a_pause_time);
1983 /// assert!(mean_first_poll_delay <= task_a_total_time);
1984 ///
1985 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
1986 /// let task_b_pause_time = Duration::from_millis(500);
1987 /// let task_b_total_time = instrument_pause_await(&metrics_monitor, task_b_pause_time).await;
1988 ///
1989 /// // construct and await a task that pauses for 1000ms between instrumentation and first poll
1990 /// let task_c_pause_time = Duration::from_millis(1000);
1991 /// let task_c_total_time = instrument_pause_await(&metrics_monitor, task_c_pause_time).await;
1992 ///
1993 /// // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
1994 /// // average pause time of 500ms, and less-than-or-equal-to the combined total runtime of
1995 /// // `task_b` and `task_c`
1996 /// let mean_first_poll_delay = next_interval().mean_first_poll_delay();
1997 /// assert!(mean_first_poll_delay >= (task_b_pause_time + task_c_pause_time) / 2);
1998 /// assert!(mean_first_poll_delay <= (task_b_total_time + task_c_total_time) / 2);
1999 /// }
2000 /// ```
2001 pub fn mean_first_poll_delay(&self) -> Duration {
2002 mean(self.total_first_poll_delay, self.first_poll_count)
2003 }
2004
2005 /// The mean duration of idles.
2006 ///
2007 /// ##### Definition
2008 /// This metric is derived from [`total_idle_duration`][TaskMetrics::total_idle_duration] ÷
2009 /// [`total_idled_count`][TaskMetrics::total_idled_count].
2010 ///
2011 /// ##### Interpretation
2012 /// The idle state is the duration spanning the instant a task completes a poll, and the instant
2013 /// that it is next awoken. Tasks inhabit this state when they are waiting for task-external
2014 /// events to complete (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this
2015 /// metric increases, it means that tasks, in aggregate, spent more time waiting for
2016 /// task-external events to complete.
2017 ///
2018 /// ##### Examples
2019 /// ```
2020 /// #[tokio::main]
2021 /// async fn main() {
2022 /// let monitor = tokio_metrics::TaskMonitor::new();
2023 /// let one_sec = std::time::Duration::from_secs(1);
2024 ///
2025 /// monitor.instrument(async move {
2026 /// tokio::time::sleep(one_sec).await;
2027 /// }).await;
2028 ///
2029 /// assert!(monitor.cumulative().mean_idle_duration() >= one_sec);
2030 /// }
2031 /// ```
2032 pub fn mean_idle_duration(&self) -> Duration {
2033 mean(self.total_idle_duration, self.total_idled_count)
2034 }
2035
2036 /// The mean duration that tasks spent waiting to be executed after awakening.
2037 ///
2038 /// ##### Definition
2039 /// This metric is derived from
2040 /// [`total_scheduled_duration`][TaskMetrics::total_scheduled_duration] ÷
2041 /// [`total_scheduled_count`][`TaskMetrics::total_scheduled_count`].
2042 ///
2043 /// ##### Interpretation
2044 /// If this metric increases, it means that, on average, tasks spent longer in the runtime's
2045 /// queues before being polled.
2046 ///
2047 /// ##### See also
2048 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
2049 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
2050 /// are first polled.
2051 ///
2052 /// ##### Examples
2053 /// ```
2054 /// use tokio::time::Duration;
2055 ///
2056 /// #[tokio::main(flavor = "current_thread")]
2057 /// async fn main() {
2058 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2059 /// let mut interval = metrics_monitor.intervals();
2060 /// let mut next_interval = || interval.next().unwrap();
2061 ///
2062 /// // construct and instrument and spawn a task that yields endlessly
2063 /// tokio::spawn(metrics_monitor.instrument(async {
2064 /// loop { tokio::task::yield_now().await }
2065 /// }));
2066 ///
2067 /// tokio::task::yield_now().await;
2068 ///
2069 /// // block the executor for 1 second
2070 /// std::thread::sleep(Duration::from_millis(1000));
2071 ///
2072 /// // get the task to run twice
2073 /// // the first will have a 1 sec scheduling delay, the second will have almost none
2074 /// tokio::task::yield_now().await;
2075 /// tokio::task::yield_now().await;
2076 ///
2077 /// // `endless_task` will have spent approximately one second waiting
2078 /// let mean_scheduled_duration = next_interval().mean_scheduled_duration();
2079 /// assert!(mean_scheduled_duration >= Duration::from_millis(500), "{}", mean_scheduled_duration.as_secs_f64());
2080 /// assert!(mean_scheduled_duration <= Duration::from_millis(600), "{}", mean_scheduled_duration.as_secs_f64());
2081 /// }
2082 /// ```
2083 pub fn mean_scheduled_duration(&self) -> Duration {
2084 mean(self.total_scheduled_duration, self.total_scheduled_count)
2085 }
2086
2087 /// The mean duration of polls.
2088 ///
2089 /// ##### Definition
2090 /// This metric is derived from [`total_poll_duration`][TaskMetrics::total_poll_duration] ÷
2091 /// [`total_poll_count`][TaskMetrics::total_poll_count].
2092 ///
2093 /// ##### Interpretation
2094 /// If this metric increases, it means that, on average, individual polls are tending to take
2095 /// longer. However, this does not necessarily imply increased task latency: An increase in poll
2096 /// durations could be offset by fewer polls.
2097 ///
2098 /// ##### See also
2099 /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2100 /// The ratio between the number polls categorized as slow and fast.
2101 /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2102 /// The mean duration of slow polls.
2103 ///
2104 /// ##### Examples
2105 /// ```
2106 /// use std::time::Duration;
2107 ///
2108 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
2109 /// async fn main() {
2110 /// let monitor = tokio_metrics::TaskMonitor::new();
2111 /// let mut interval = monitor.intervals();
2112 /// let mut next_interval = move || interval.next().unwrap();
2113 ///
2114 /// assert_eq!(next_interval().mean_poll_duration(), Duration::ZERO);
2115 ///
2116 /// monitor.instrument(async {
2117 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
2118 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
2119 /// () // poll 3 (0s)
2120 /// }).await;
2121 ///
2122 /// assert_eq!(next_interval().mean_poll_duration(), Duration::from_secs(2) / 3);
2123 /// }
2124 /// ```
2125 pub fn mean_poll_duration(&self) -> Duration {
2126 mean(self.total_poll_duration, self.total_poll_count)
2127 }
2128
2129 /// The ratio between the number polls categorized as slow and fast.
2130 ///
2131 /// ##### Definition
2132 /// This metric is derived from [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count] ÷
2133 /// [`total_poll_count`][TaskMetrics::total_poll_count].
2134 ///
2135 /// ##### Interpretation
2136 /// If this metric increases, it means that a greater proportion of polls took excessively long
2137 /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2138 /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2139 /// However, as a rule, *should* yield to the scheduler frequently.
2140 ///
2141 /// ##### See also
2142 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2143 /// The mean duration of polls.
2144 /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2145 /// The mean duration of slow polls.
2146 ///
2147 /// ##### Examples
2148 /// Changes in this metric may be observed by varying the ratio of slow and slow fast within
2149 /// sampling intervals; for instance:
2150 /// ```
2151 /// use std::future::Future;
2152 /// use std::time::Duration;
2153 ///
2154 /// #[tokio::main]
2155 /// async fn main() {
2156 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2157 /// let mut interval = metrics_monitor.intervals();
2158 /// let mut next_interval = || interval.next().unwrap();
2159 ///
2160 /// // no tasks have been constructed, instrumented, or polled
2161 /// let interval = next_interval();
2162 /// assert_eq!(interval.total_fast_poll_count, 0);
2163 /// assert_eq!(interval.total_slow_poll_count, 0);
2164 /// assert!(interval.slow_poll_ratio().is_nan());
2165 ///
2166 /// let fast = Duration::ZERO;
2167 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
2168 ///
2169 /// // this task completes in three fast polls
2170 /// metrics_monitor.instrument(async {
2171 /// spin_for(fast).await; // fast poll 1
2172 /// spin_for(fast).await; // fast poll 2
2173 /// spin_for(fast); // fast poll 3
2174 /// }).await;
2175 ///
2176 /// // this task completes in two slow polls
2177 /// metrics_monitor.instrument(async {
2178 /// spin_for(slow).await; // slow poll 1
2179 /// spin_for(slow); // slow poll 2
2180 /// }).await;
2181 ///
2182 /// let interval = next_interval();
2183 /// assert_eq!(interval.total_fast_poll_count, 3);
2184 /// assert_eq!(interval.total_slow_poll_count, 2);
2185 /// assert_eq!(interval.slow_poll_ratio(), ratio(2., 3.));
2186 ///
2187 /// // this task completes in three slow polls
2188 /// metrics_monitor.instrument(async {
2189 /// spin_for(slow).await; // slow poll 1
2190 /// spin_for(slow).await; // slow poll 2
2191 /// spin_for(slow); // slow poll 3
2192 /// }).await;
2193 ///
2194 /// // this task completes in two fast polls
2195 /// metrics_monitor.instrument(async {
2196 /// spin_for(fast).await; // fast poll 1
2197 /// spin_for(fast); // fast poll 2
2198 /// }).await;
2199 ///
2200 /// let interval = next_interval();
2201 /// assert_eq!(interval.total_fast_poll_count, 2);
2202 /// assert_eq!(interval.total_slow_poll_count, 3);
2203 /// assert_eq!(interval.slow_poll_ratio(), ratio(3., 2.));
2204 /// }
2205 ///
2206 /// fn ratio(a: f64, b: f64) -> f64 {
2207 /// a / (a + b)
2208 /// }
2209 ///
2210 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2211 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2212 /// let start = tokio::time::Instant::now();
2213 /// while start.elapsed() <= duration {}
2214 /// tokio::task::yield_now()
2215 /// }
2216 /// ```
2217 pub fn slow_poll_ratio(&self) -> f64 {
2218 self.total_slow_poll_count as f64 / self.total_poll_count as f64
2219 }
2220
2221 /// The ratio of tasks exceeding [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
2222 ///
2223 /// ##### Definition
2224 /// This metric is derived from [`total_long_delay_count`][TaskMetrics::total_long_delay_count] ÷
2225 /// [`total_scheduled_count`][TaskMetrics::total_scheduled_count].
2226 pub fn long_delay_ratio(&self) -> f64 {
2227 self.total_long_delay_count as f64 / self.total_scheduled_count as f64
2228 }
2229
2230 /// The mean duration of fast polls.
2231 ///
2232 /// ##### Definition
2233 /// This metric is derived from
2234 /// [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration] ÷
2235 /// [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count].
2236 ///
2237 /// ##### Examples
2238 /// In the below example, no tasks are polled in the first sampling interval; three fast polls
2239 /// consume a mean of
2240 /// ⅜ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2241 /// second sampling interval; and two fast polls consume a total of
2242 /// ½ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2243 /// third sampling interval:
2244 /// ```
2245 /// use std::future::Future;
2246 /// use std::time::Duration;
2247 ///
2248 /// #[tokio::main]
2249 /// async fn main() {
2250 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2251 /// let mut interval = metrics_monitor.intervals();
2252 /// let mut next_interval = || interval.next().unwrap();
2253 ///
2254 /// // no tasks have been constructed, instrumented, or polled
2255 /// assert_eq!(next_interval().mean_fast_poll_duration(), Duration::ZERO);
2256 ///
2257 /// let threshold = metrics_monitor.slow_poll_threshold();
2258 /// let fast_1 = 1 * Duration::from_micros(1);
2259 /// let fast_2 = 2 * Duration::from_micros(1);
2260 /// let fast_3 = 3 * Duration::from_micros(1);
2261 ///
2262 /// // this task completes in two fast polls
2263 /// let total_time = time(metrics_monitor.instrument(async {
2264 /// spin_for(fast_1).await; // fast poll 1
2265 /// spin_for(fast_2) // fast poll 2
2266 /// })).await;
2267 ///
2268 /// // `mean_fast_poll_duration` ≈ the mean of `fast_1` and `fast_2`
2269 /// let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2270 /// assert!(mean_fast_poll_duration >= (fast_1 + fast_2) / 2);
2271 /// assert!(mean_fast_poll_duration <= total_time / 2);
2272 ///
2273 /// // this task completes in three fast polls
2274 /// let total_time = time(metrics_monitor.instrument(async {
2275 /// spin_for(fast_1).await; // fast poll 1
2276 /// spin_for(fast_2).await; // fast poll 2
2277 /// spin_for(fast_3) // fast poll 3
2278 /// })).await;
2279 ///
2280 /// // `mean_fast_poll_duration` ≈ the mean of `fast_1`, `fast_2`, `fast_3`
2281 /// let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2282 /// assert!(mean_fast_poll_duration >= (fast_1 + fast_2 + fast_3) / 3);
2283 /// assert!(mean_fast_poll_duration <= total_time / 3);
2284 /// }
2285 ///
2286 /// /// Produces the amount of time it took to await a given task.
2287 /// async fn time(task: impl Future) -> Duration {
2288 /// let start = tokio::time::Instant::now();
2289 /// task.await;
2290 /// start.elapsed()
2291 /// }
2292 ///
2293 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2294 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2295 /// let start = tokio::time::Instant::now();
2296 /// while start.elapsed() <= duration {}
2297 /// tokio::task::yield_now()
2298 /// }
2299 /// ```
2300 pub fn mean_fast_poll_duration(&self) -> Duration {
2301 mean(self.total_fast_poll_duration, self.total_fast_poll_count)
2302 }
2303
2304 /// The mean duration of slow polls.
2305 ///
2306 /// ##### Definition
2307 /// This metric is derived from
2308 /// [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration] ÷
2309 /// [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
2310 ///
2311 /// ##### Interpretation
2312 /// If this metric increases, it means that a greater proportion of polls took excessively long
2313 /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2314 /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2315 ///
2316 /// ##### See also
2317 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2318 /// The mean duration of polls.
2319 /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2320 /// The ratio between the number polls categorized as slow and fast.
2321 ///
2322 /// ##### Interpretation
2323 /// If this metric increases, it means that, on average, slow polls got even slower. This does
2324 /// necessarily imply increased task latency: An increase in average slow poll duration could be
2325 /// offset by fewer or faster polls. However, as a rule, *should* yield to the scheduler
2326 /// frequently.
2327 ///
2328 /// ##### Examples
2329 /// In the below example, no tasks are polled in the first sampling interval; three slow polls
2330 /// consume a mean of
2331 /// 1.5 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2332 /// second sampling interval; and two slow polls consume a total of
2333 /// 2 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2334 /// third sampling interval:
2335 /// ```
2336 /// use std::future::Future;
2337 /// use std::time::Duration;
2338 ///
2339 /// #[tokio::main]
2340 /// async fn main() {
2341 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2342 /// let mut interval = metrics_monitor.intervals();
2343 /// let mut next_interval = || interval.next().unwrap();
2344 ///
2345 /// // no tasks have been constructed, instrumented, or polled
2346 /// assert_eq!(next_interval().mean_slow_poll_duration(), Duration::ZERO);
2347 ///
2348 /// let threshold = metrics_monitor.slow_poll_threshold();
2349 /// let slow_1 = 1 * threshold;
2350 /// let slow_2 = 2 * threshold;
2351 /// let slow_3 = 3 * threshold;
2352 ///
2353 /// // this task completes in two slow polls
2354 /// let total_time = time(metrics_monitor.instrument(async {
2355 /// spin_for(slow_1).await; // slow poll 1
2356 /// spin_for(slow_2) // slow poll 2
2357 /// })).await;
2358 ///
2359 /// // `mean_slow_poll_duration` ≈ the mean of `slow_1` and `slow_2`
2360 /// let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2361 /// assert!(mean_slow_poll_duration >= (slow_1 + slow_2) / 2);
2362 /// assert!(mean_slow_poll_duration <= total_time / 2);
2363 ///
2364 /// // this task completes in three slow polls
2365 /// let total_time = time(metrics_monitor.instrument(async {
2366 /// spin_for(slow_1).await; // slow poll 1
2367 /// spin_for(slow_2).await; // slow poll 2
2368 /// spin_for(slow_3) // slow poll 3
2369 /// })).await;
2370 ///
2371 /// // `mean_slow_poll_duration` ≈ the mean of `slow_1`, `slow_2`, `slow_3`
2372 /// let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2373 /// assert!(mean_slow_poll_duration >= (slow_1 + slow_2 + slow_3) / 3);
2374 /// assert!(mean_slow_poll_duration <= total_time / 3);
2375 /// }
2376 ///
2377 /// /// Produces the amount of time it took to await a given task.
2378 /// async fn time(task: impl Future) -> Duration {
2379 /// let start = tokio::time::Instant::now();
2380 /// task.await;
2381 /// start.elapsed()
2382 /// }
2383 ///
2384 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2385 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2386 /// let start = tokio::time::Instant::now();
2387 /// while start.elapsed() <= duration {}
2388 /// tokio::task::yield_now()
2389 /// }
2390 /// ```
2391 pub fn mean_slow_poll_duration(&self) -> Duration {
2392 mean(self.total_slow_poll_duration, self.total_slow_poll_count)
2393 }
2394
2395 /// The average time taken for a task with a short scheduling delay to be executed after being
2396 /// scheduled.
2397 ///
2398 /// ##### Definition
2399 /// This metric is derived from
2400 /// [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration] ÷
2401 /// [`total_short_delay_count`][TaskMetrics::total_short_delay_count].
2402 pub fn mean_short_delay_duration(&self) -> Duration {
2403 mean(
2404 self.total_short_delay_duration,
2405 self.total_short_delay_count,
2406 )
2407 }
2408
2409 /// The average scheduling delay for a task which takes a long time to start executing after
2410 /// being scheduled.
2411 ///
2412 /// ##### Definition
2413 /// This metric is derived from
2414 /// [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration] ÷
2415 /// [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
2416 pub fn mean_long_delay_duration(&self) -> Duration {
2417 mean(self.total_long_delay_duration, self.total_long_delay_count)
2418 }
2419 }
2420 unstable {}
2421 }
2422);
2423
2424impl<T: Future> Future for Instrumented<T> {
2425 type Output = T::Output;
2426
2427 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2428 instrument_poll(cx, self, Future::poll)
2429 }
2430}
2431
2432impl<T: Stream> Stream for Instrumented<T> {
2433 type Item = T::Item;
2434
2435 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2436 instrument_poll(cx, self, Stream::poll_next)
2437 }
2438}
2439
2440fn instrument_poll<T, Out>(
2441 cx: &mut Context<'_>,
2442 instrumented: Pin<&mut Instrumented<T>>,
2443 poll_fn: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Out>,
2444) -> Poll<Out> {
2445 let poll_start = Instant::now();
2446 let this = instrumented.project();
2447 let idled_at = this.idled_at;
2448 let state = this.state;
2449 let instrumented_at = state.instrumented_at;
2450 let metrics = &state.metrics;
2451 /* accounting for time-to-first-poll and tasks-count */
2452 // is this the first time this task has been polled?
2453 if !*this.did_poll_once {
2454 // if so, we need to do three things:
2455 /* 1. note that this task *has* been polled */
2456 *this.did_poll_once = true;
2457
2458 /* 2. account for the time-to-first-poll of this task */
2459 // if the time-to-first-poll of this task exceeds `u64::MAX` ns,
2460 // round down to `u64::MAX` nanoseconds
2461 let elapsed = (poll_start - instrumented_at)
2462 .as_nanos()
2463 .try_into()
2464 .unwrap_or(u64::MAX);
2465 // add this duration to `time_to_first_poll_ns_total`
2466 metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst);
2467
2468 /* 3. increment the count of tasks that have been polled at least once */
2469 state.metrics.first_poll_count.fetch_add(1, SeqCst);
2470 }
2471 /* accounting for time-idled and time-scheduled */
2472 // 1. note (and reset) the instant this task was last awoke
2473 let woke_at = state.woke_at.swap(0, SeqCst);
2474 // The state of a future is *idling* in the interim between the instant
2475 // it completes a `poll`, and the instant it is next awoken.
2476 if *idled_at < woke_at {
2477 // increment the counter of how many idles occurred
2478 metrics.total_idled_count.fetch_add(1, SeqCst);
2479
2480 // compute the duration of the idle
2481 let idle_ns = woke_at - *idled_at;
2482
2483 // update the max time tasks spent idling, both locally and
2484 // globally.
2485 metrics
2486 .local_max_idle_duration_ns
2487 .fetch_max(idle_ns, SeqCst);
2488 metrics
2489 .global_max_idle_duration_ns
2490 .fetch_max(idle_ns, SeqCst);
2491 // adjust the total elapsed time monitored tasks spent idling
2492 metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst);
2493 }
2494 // if this task spent any time in the scheduled state after instrumentation,
2495 // and after first poll, `woke_at` will be greater than 0.
2496 if woke_at > 0 {
2497 // increment the counter of how many schedules occurred
2498 metrics.total_scheduled_count.fetch_add(1, SeqCst);
2499
2500 // recall that the `woke_at` field is internally represented as
2501 // nanoseconds-since-instrumentation. here, for accounting purposes,
2502 // we need to instead represent it as a proper `Instant`.
2503 let woke_instant = instrumented_at + Duration::from_nanos(woke_at);
2504
2505 // the duration this task spent scheduled is time time elapsed between
2506 // when this task was awoke, and when it was polled.
2507 let scheduled_ns = (poll_start - woke_instant)
2508 .as_nanos()
2509 .try_into()
2510 .unwrap_or(u64::MAX);
2511
2512 let scheduled = Duration::from_nanos(scheduled_ns);
2513
2514 let (count_bucket, duration_bucket) = // was the scheduling delay long or short?
2515 if scheduled >= metrics.long_delay_threshold {
2516 (&metrics.total_long_delay_count, &metrics.total_long_delay_duration_ns)
2517 } else {
2518 (&metrics.total_short_delay_count, &metrics.total_short_delay_duration_ns)
2519 };
2520 // update the appropriate bucket
2521 count_bucket.fetch_add(1, SeqCst);
2522 duration_bucket.fetch_add(scheduled_ns, SeqCst);
2523
2524 // add `scheduled_ns` to the Monitor's total
2525 metrics
2526 .total_scheduled_duration_ns
2527 .fetch_add(scheduled_ns, SeqCst);
2528 }
2529 // Register the waker
2530 state.waker.register(cx.waker());
2531 // Get the instrumented waker
2532 let waker_ref = futures_util::task::waker_ref(state);
2533 let mut cx = Context::from_waker(&waker_ref);
2534 // Poll the task
2535 let inner_poll_start = Instant::now();
2536 let ret = poll_fn(this.task, &mut cx);
2537 let inner_poll_end = Instant::now();
2538 /* idle time starts now */
2539 *idled_at = (inner_poll_end - instrumented_at)
2540 .as_nanos()
2541 .try_into()
2542 .unwrap_or(u64::MAX);
2543 /* accounting for poll time */
2544 let inner_poll_duration = inner_poll_end - inner_poll_start;
2545 let inner_poll_ns: u64 = inner_poll_duration
2546 .as_nanos()
2547 .try_into()
2548 .unwrap_or(u64::MAX);
2549 let (count_bucket, duration_bucket) = // was this a slow or fast poll?
2550 if inner_poll_duration >= metrics.slow_poll_threshold {
2551 (&metrics.total_slow_poll_count, &metrics.total_slow_poll_duration)
2552 } else {
2553 (&metrics.total_fast_poll_count, &metrics.total_fast_poll_duration_ns)
2554 };
2555 // update the appropriate bucket
2556 count_bucket.fetch_add(1, SeqCst);
2557 duration_bucket.fetch_add(inner_poll_ns, SeqCst);
2558 ret
2559}
2560
2561impl State {
2562 fn on_wake(&self) {
2563 let woke_at: u64 = match self.instrumented_at.elapsed().as_nanos().try_into() {
2564 Ok(woke_at) => woke_at,
2565 // This is highly unlikely as it would mean the task ran for over
2566 // 500 years. If you ran your service for 500 years. If you are
2567 // reading this 500 years in the future, I'm sorry.
2568 Err(_) => return,
2569 };
2570
2571 // We don't actually care about the result
2572 let _ = self.woke_at.compare_exchange(0, woke_at, SeqCst, SeqCst);
2573 }
2574}
2575
2576impl ArcWake for State {
2577 fn wake_by_ref(arc_self: &Arc<State>) {
2578 arc_self.on_wake();
2579 arc_self.waker.wake();
2580 }
2581
2582 fn wake(self: Arc<State>) {
2583 self.on_wake();
2584 self.waker.wake();
2585 }
2586}
2587
2588/// Iterator returned by [`TaskMonitor::intervals`].
2589///
2590/// See that method's documentation for more details.
2591#[derive(Debug)]
2592pub struct TaskIntervals {
2593 metrics: Arc<RawMetrics>,
2594 previous: Option<TaskMetrics>,
2595}
2596
2597impl TaskIntervals {
2598 fn probe(&mut self) -> TaskMetrics {
2599 let latest = self.metrics.metrics();
2600 let local_max_idle_duration = self.metrics.get_and_reset_local_max_idle_duration();
2601
2602 let next = if let Some(previous) = self.previous {
2603 TaskMetrics {
2604 instrumented_count: latest
2605 .instrumented_count
2606 .wrapping_sub(previous.instrumented_count),
2607 dropped_count: latest.dropped_count.wrapping_sub(previous.dropped_count),
2608 total_poll_count: latest
2609 .total_poll_count
2610 .wrapping_sub(previous.total_poll_count),
2611 total_poll_duration: sub(latest.total_poll_duration, previous.total_poll_duration),
2612 first_poll_count: latest
2613 .first_poll_count
2614 .wrapping_sub(previous.first_poll_count),
2615 total_idled_count: latest
2616 .total_idled_count
2617 .wrapping_sub(previous.total_idled_count),
2618 total_scheduled_count: latest
2619 .total_scheduled_count
2620 .wrapping_sub(previous.total_scheduled_count),
2621 total_fast_poll_count: latest
2622 .total_fast_poll_count
2623 .wrapping_sub(previous.total_fast_poll_count),
2624 total_short_delay_count: latest
2625 .total_short_delay_count
2626 .wrapping_sub(previous.total_short_delay_count),
2627 total_slow_poll_count: latest
2628 .total_slow_poll_count
2629 .wrapping_sub(previous.total_slow_poll_count),
2630 total_long_delay_count: latest
2631 .total_long_delay_count
2632 .wrapping_sub(previous.total_long_delay_count),
2633 total_first_poll_delay: sub(
2634 latest.total_first_poll_delay,
2635 previous.total_first_poll_delay,
2636 ),
2637 max_idle_duration: local_max_idle_duration,
2638 total_idle_duration: sub(latest.total_idle_duration, previous.total_idle_duration),
2639 total_scheduled_duration: sub(
2640 latest.total_scheduled_duration,
2641 previous.total_scheduled_duration,
2642 ),
2643 total_fast_poll_duration: sub(
2644 latest.total_fast_poll_duration,
2645 previous.total_fast_poll_duration,
2646 ),
2647 total_short_delay_duration: sub(
2648 latest.total_short_delay_duration,
2649 previous.total_short_delay_duration,
2650 ),
2651 total_slow_poll_duration: sub(
2652 latest.total_slow_poll_duration,
2653 previous.total_slow_poll_duration,
2654 ),
2655 total_long_delay_duration: sub(
2656 latest.total_long_delay_duration,
2657 previous.total_long_delay_duration,
2658 ),
2659 }
2660 } else {
2661 latest
2662 };
2663
2664 self.previous = Some(latest);
2665
2666 next
2667 }
2668}
2669
2670impl Iterator for TaskIntervals {
2671 type Item = TaskMetrics;
2672
2673 fn next(&mut self) -> Option<Self::Item> {
2674 Some(self.probe())
2675 }
2676}
2677
2678#[inline(always)]
2679fn to_nanos(d: Duration) -> u64 {
2680 debug_assert!(d <= Duration::from_nanos(u64::MAX));
2681 d.as_secs()
2682 .wrapping_mul(1_000_000_000)
2683 .wrapping_add(d.subsec_nanos() as u64)
2684}
2685
2686#[inline(always)]
2687fn sub(a: Duration, b: Duration) -> Duration {
2688 let nanos = to_nanos(a).wrapping_sub(to_nanos(b));
2689 Duration::from_nanos(nanos)
2690}
2691
2692#[inline(always)]
2693fn mean(d: Duration, count: u64) -> Duration {
2694 if let Some(quotient) = to_nanos(d).checked_div(count) {
2695 Duration::from_nanos(quotient)
2696 } else {
2697 Duration::ZERO
2698 }
2699}