Skip to main content

tokio_metrics/
task.rs

1use futures_util::task::{ArcWake, AtomicWaker};
2use pin_project_lite::pin_project;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
6use std::sync::Arc;
7use std::task::{Context, Poll};
8use tokio_stream::Stream;
9
10#[cfg(feature = "rt")]
11use tokio::time::{Duration, Instant};
12
13use crate::derived_metrics::derived_metrics;
14#[cfg(not(feature = "rt"))]
15use std::time::{Duration, Instant};
16
17#[cfg(all(feature = "rt", feature = "metrics-rs-integration"))]
18pub(crate) mod metrics_rs_integration;
19
20/// Monitors key metrics of instrumented tasks.
21///
22/// ### Basic Usage
23/// A [`TaskMonitor`] tracks key [metrics][TaskMetrics] of async tasks that have been
24/// [instrumented][`TaskMonitor::instrument`] with the monitor.
25///
26/// In the below example, a [`TaskMonitor`] is [constructed][TaskMonitor::new] and used to
27/// [instrument][TaskMonitor::instrument] three worker tasks; meanwhile, a fourth task
28/// prints [metrics][TaskMetrics] in 500ms [intervals][TaskMonitor::intervals].
29/// ```
30/// use std::time::Duration;
31///
32/// #[tokio::main]
33/// async fn main() {
34///     // construct a metrics monitor
35///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
36///
37///     // print task metrics every 500ms
38///     {
39///         let metrics_monitor = metrics_monitor.clone();
40///         tokio::spawn(async move {
41///             for interval in metrics_monitor.intervals() {
42///                 // pretty-print the metric interval
43///                 println!("{:?}", interval);
44///                 // wait 500ms
45///                 tokio::time::sleep(Duration::from_millis(500)).await;
46///             }
47///         });
48///     }
49///
50///     // instrument some tasks and await them
51///     // note that the same TaskMonitor can be used for multiple tasks
52///     tokio::join![
53///         metrics_monitor.instrument(do_work()),
54///         metrics_monitor.instrument(do_work()),
55///         metrics_monitor.instrument(do_work())
56///     ];
57/// }
58///
59/// async fn do_work() {
60///     for _ in 0..25 {
61///         tokio::task::yield_now().await;
62///         tokio::time::sleep(Duration::from_millis(100)).await;
63///     }
64/// }
65/// ```
66///
67/// ### What should I instrument?
68/// In most cases, you should construct a *distinct* [`TaskMonitor`] for each kind of key task.
69///
70/// #### Instrumenting a web application
71/// For instance, a web service should have a distinct [`TaskMonitor`] for each endpoint. Within
72/// each endpoint, it's prudent to additionally instrument major sub-tasks, each with their own
73/// distinct [`TaskMonitor`]s. [*Why are my tasks slow?*](#why-are-my-tasks-slow) explores a
74/// debugging scenario for a web service that takes this approach to instrumentation. This
75/// approach is exemplified in the below example:
76/// ```no_run
77/// // The unabridged version of this snippet is in the examples directory of this crate.
78///
79/// #[tokio::main]
80/// async fn main() {
81///     // construct a TaskMonitor for root endpoint
82///     let monitor_root = tokio_metrics::TaskMonitor::new();
83///
84///     // construct TaskMonitors for create_users endpoint
85///     let monitor_create_user = CreateUserMonitors {
86///         // monitor for the entire endpoint
87///         route: tokio_metrics::TaskMonitor::new(),
88///         // monitor for database insertion subtask
89///         insert: tokio_metrics::TaskMonitor::new(),
90///     };
91///
92///     // build our application with two instrumented endpoints
93///     let app = axum::Router::new()
94///         // `GET /` goes to `root`
95///         .route("/", axum::routing::get({
96///             let monitor = monitor_root.clone();
97///             move || monitor.instrument(async { "Hello, World!" })
98///         }))
99///         // `POST /users` goes to `create_user`
100///         .route("/users", axum::routing::post({
101///             let monitors = monitor_create_user.clone();
102///             let route = monitors.route.clone();
103///             move |payload| {
104///                 route.instrument(create_user(payload, monitors))
105///             }
106///         }));
107///
108///     // print task metrics for each endpoint every 1s
109///     let metrics_frequency = std::time::Duration::from_secs(1);
110///     tokio::spawn(async move {
111///         let root_intervals = monitor_root.intervals();
112///         let create_user_route_intervals =
113///             monitor_create_user.route.intervals();
114///         let create_user_insert_intervals =
115///             monitor_create_user.insert.intervals();
116///         let create_user_intervals =
117///             create_user_route_intervals.zip(create_user_insert_intervals);
118///
119///         let intervals = root_intervals.zip(create_user_intervals);
120///         for (root_route, (create_user_route, create_user_insert)) in intervals {
121///             println!("root_route = {:#?}", root_route);
122///             println!("create_user_route = {:#?}", create_user_route);
123///             println!("create_user_insert = {:#?}", create_user_insert);
124///             tokio::time::sleep(metrics_frequency).await;
125///         }
126///     });
127///
128///     // run the server
129///     let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3000));
130///     let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
131///     axum::serve(listener, app)
132///         .await
133///         .unwrap();
134/// }
135///
136/// async fn create_user(
137///     axum::Json(payload): axum::Json<CreateUser>,
138///     monitors: CreateUserMonitors,
139/// ) -> impl axum::response::IntoResponse {
140///     let user = User { id: 1337, username: payload.username, };
141///     // instrument inserting the user into the db:
142///     let _ = monitors.insert.instrument(insert_user(user.clone())).await;
143///     (axum::http::StatusCode::CREATED, axum::Json(user))
144/// }
145///
146/// /* definitions of CreateUserMonitors, CreateUser and User omitted for brevity */
147///
148/// #
149/// # #[derive(Clone)]
150/// # struct CreateUserMonitors {
151/// #     // monitor for the entire endpoint
152/// #     route: tokio_metrics::TaskMonitor,
153/// #     // monitor for database insertion subtask
154/// #     insert: tokio_metrics::TaskMonitor,
155/// # }
156/// #
157/// # #[derive(serde::Deserialize)] struct CreateUser { username: String, }
158/// # #[derive(Clone, serde::Serialize)] struct User { id: u64, username: String, }
159/// #
160/// // insert the user into the database
161/// async fn insert_user(_: User) {
162///     /* implementation details elided */
163///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
164/// }
165/// ```
166///
167/// ### Why are my tasks slow?
168/// **Scenario:** You track key, high-level metrics about the customer response time. An alarm warns
169/// you that P90 latency for an endpoint exceeds your targets. What is causing the increase?
170///
171/// #### Identifying the high-level culprits
172/// A set of tasks will appear to execute more slowly if:
173/// - they are taking longer to poll (i.e., they consume too much CPU time)
174/// - they are waiting longer to be polled (e.g., they're waiting longer in tokio's scheduling
175///   queues)
176/// - they are waiting longer on external events to complete (e.g., asynchronous network requests)
177///
178/// The culprits, at a high level, may be some combination of these sources of latency. Fortunately,
179/// you have instrumented the key tasks of each of your endpoints with distinct [`TaskMonitor`]s.
180/// Using the monitors on the endpoint experiencing elevated latency, you begin by answering:
181/// - [*Are my tasks taking longer to poll?*](#are-my-tasks-taking-longer-to-poll)
182/// - [*Are my tasks spending more time waiting to be polled?*](#are-my-tasks-spending-more-time-waiting-to-be-polled)
183/// - [*Are my tasks spending more time waiting on external events to complete?*](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
184///
185/// ##### Are my tasks taking longer to poll?
186/// - **Did [`mean_poll_duration`][TaskMetrics::mean_poll_duration] increase?**
187///   This metric reflects the mean poll duration. If it increased, it means that, on average,
188///   individual polls tended to take longer. However, this does not necessarily imply increased
189///   task latency: An increase in poll durations could be offset by fewer polls.
190/// - **Did [`slow_poll_ratio`][TaskMetrics::slow_poll_ratio] increase?**
191///   This metric reflects the proportion of polls that were 'slow'. If it increased, it means that
192///   a greater proportion of polls performed excessive computation before yielding. This does not
193///   necessarily imply increased task latency: An increase in the proportion of slow polls could be
194///   offset by fewer or faster polls.
195/// - **Did [`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration] increase?**
196///   This metric reflects the mean duration of slow polls. If it increased, it means that, on
197///   average, slow polls got slower. This does not necessarily imply increased task latency: An
198///   increase in average slow poll duration could be offset by fewer or faster polls.
199///
200/// If so, [*why are my tasks taking longer to poll?*](#why-are-my-tasks-taking-longer-to-poll)
201///
202/// ##### Are my tasks spending more time waiting to be polled?
203/// - **Did [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increase?**
204///   This metric reflects the mean delay between the instant a task is first instrumented and the
205///   instant it is first polled. If it increases, it means that, on average, tasks spent longer
206///   waiting to be initially run.
207/// - **Did [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration] increase?**
208///   This metric reflects the mean duration that tasks spent in the scheduled state. The
209///   'scheduled' state of a task is the duration between the instant a task is awoken and the
210///   instant it is subsequently polled. If this metric increases, it means that, on average, tasks
211///   spent longer in tokio's queues before being polled.
212/// - **Did [`long_delay_ratio`][TaskMetrics::long_delay_ratio] increase?**
213///   This metric reflects the proportion of scheduling delays which were 'long'. If it increased,
214///   it means that a greater proportion of tasks experienced excessive delays before they could
215///   execute after being woken. This does not necessarily indicate an increase in latency, as this
216///   could be offset by fewer or faster task polls.
217/// - **Did [`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration] increase?**
218///   This metric reflects the mean duration of long delays. If it increased, it means that, on
219///   average, long delays got even longer. This does not necessarily imply increased task latency:
220///   an increase in average long delay duration could be offset by fewer or faster polls or more
221///   short schedules.
222///
223/// If so, [*why are my tasks spending more time waiting to be polled?*](#why-are-my-tasks-spending-more-time-waiting-to-be-polled)
224///
225/// ##### Are my tasks spending more time waiting on external events to complete?
226/// - **Did [`mean_idle_duration`][TaskMetrics::mean_idle_duration] increase?**
227///   This metric reflects the mean duration that tasks spent in the idle state. The idle state is
228///   the duration spanning the instant a task completes a poll, and the instant that it is next
229///   awoken. Tasks inhabit this state when they are waiting for task-external events to complete
230///   (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this metric increases,
231///   tasks, in aggregate, spent more time waiting for task-external events to complete.
232///
233/// If so, [*why are my tasks spending more time waiting on external events to complete?*](#why-are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
234///
235/// #### Digging deeper
236/// Having [established the high-level culprits](#identifying-the-high-level-culprits), you now
237/// search for further explanation...
238///
239/// ##### Why are my tasks taking longer to poll?
240/// You observed that [your tasks are taking longer to poll](#are-my-tasks-taking-longer-to-poll).
241/// The culprit is likely some combination of:
242/// - **Your tasks are accidentally blocking.** Common culprits include:
243///     1. Using the Rust standard library's [filesystem](https://doc.rust-lang.org/std/fs/) or
244///        [networking](https://doc.rust-lang.org/std/net/) APIs.
245///        These APIs are synchronous; use tokio's [filesystem](https://docs.rs/tokio/latest/tokio/fs/)
246///        and [networking](https://docs.rs/tokio/latest/tokio/net/) APIs, instead.
247///     3. Calling [`block_on`](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.block_on).
248///     4. Invoking `println!` or other synchronous logging routines.
249///        Invocations of `println!` involve acquiring an exclusive lock on stdout, followed by a
250///        synchronous write to stdout.
251/// 2. **Your tasks are computationally expensive.** Common culprits include:
252///     1. TLS/cryptographic routines
253///     2. doing a lot of processing on bytes
254///     3. calling non-Tokio resources
255///
256/// ##### Why are my tasks spending more time waiting to be polled?
257/// You observed that [your tasks are spending more time waiting to be polled](#are-my-tasks-spending-more-time-waiting-to-be-polled)
258/// suggesting some combination of:
259/// - Your application is inflating the time elapsed between instrumentation and first poll.
260/// - Your tasks are being scheduled into tokio's global queue.
261/// - Other tasks are spending too long without yielding, thus backing up tokio's queues.
262///
263/// Start by asking: [*Is time-to-first-poll unusually high?*](#is-time-to-first-poll-unusually-high)
264///
265/// ##### Why are my tasks spending more time waiting on external events to complete?
266/// You observed that [your tasks are spending more time waiting waiting on external events to
267/// complete](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete). But what
268/// event? Fortunately, within the task experiencing increased idle times, you monitored several
269/// sub-tasks with distinct [`TaskMonitor`]s. For each of these sub-tasks, you [*you try to identify
270/// the performance culprits...*](#identifying-the-high-level-culprits)
271///
272/// #### Digging even deeper
273///
274/// ##### Is time-to-first-poll unusually high?
275/// Contrast these two metrics:
276/// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
277///   This metric reflects the mean delay between the instant a task is first instrumented and the
278///   instant it is *first* polled.
279/// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
280///   This metric reflects the mean delay between the instant when tasks were awoken and the
281///   instant they were subsequently polled.
282///
283/// If the former metric exceeds the latter (or increased unexpectedly more than the latter), then
284/// start by investigating [*if your application is artificially delaying the time-to-first-poll*](#is-my-application-delaying-the-time-to-first-poll).
285///
286/// Otherwise, investigate [*if other tasks are polling too long without yielding*](#are-other-tasks-polling-too-long-without-yielding).
287///
288/// ##### Is my application delaying the time-to-first-poll?
289/// You observed that [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increased, more
290/// than [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]. Your application may be
291/// needlessly inflating the time elapsed between instrumentation and first poll. Are you
292/// constructing (and instrumenting) tasks separately from awaiting or spawning them?
293///
294/// For instance, in the below example, the application induces 1 second delay between when `task`
295/// is instrumented and when it is awaited:
296/// ```rust
297/// #[tokio::main]
298/// async fn main() {
299///     use tokio::time::Duration;
300///     let monitor = tokio_metrics::TaskMonitor::new();
301///
302///     let task = monitor.instrument(async move {});
303///
304///     let one_sec = Duration::from_secs(1);
305///     tokio::time::sleep(one_sec).await;
306///
307///     let _ = tokio::spawn(task).await;
308///
309///     assert!(monitor.cumulative().total_first_poll_delay >= one_sec);
310/// }
311/// ```
312///
313/// Otherwise, [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] might be unusually high
314/// because [*your application is spawning key tasks into tokio's global queue...*](#is-my-application-spawning-more-tasks-into-tokio’s-global-queue)
315///
316/// ##### Is my application spawning more tasks into tokio's global queue?
317/// Tasks awoken from threads *not* managed by the tokio runtime are scheduled with a slower,
318/// global "injection" queue.
319///
320/// You may be notifying runtime tasks from off-runtime. For instance, Given the following:
321/// ```ignore
322/// #[tokio::main]
323/// async fn main() {
324///     for _ in 0..100 {
325///         let (tx, rx) = oneshot::channel();
326///         tokio::spawn(async move {
327///             tx.send(());
328///         })
329///
330///         rx.await;
331///     }
332/// }
333/// ```
334/// One would expect this to run efficiently, however, the main task is run *off* the main runtime
335/// and the spawned tasks are *on* runtime, which means the snippet will run much slower than:
336/// ```ignore
337/// #[tokio::main]
338/// async fn main() {
339///     tokio::spawn(async {
340///         for _ in 0..100 {
341///             let (tx, rx) = oneshot::channel();
342///             tokio::spawn(async move {
343///                 tx.send(());
344///             })
345///
346///             rx.await;
347///         }
348///     }).await;
349/// }
350/// ```
351/// The slowdown is caused by a higher time between the `rx` task being notified (in `tx.send()`)
352/// and the task being polled.
353///
354/// ##### Are other tasks polling too long without yielding?
355/// You suspect that your tasks are slow because they're backed up in tokio's scheduling queues. For
356/// *each* of your application's [`TaskMonitor`]s you check to see [*if their associated tasks are
357/// taking longer to poll...*](#are-my-tasks-taking-longer-to-poll)
358///
359/// ### Limitations
360/// The [`TaskMetrics`] type uses [`u64`] to represent both event counters and durations (measured
361/// in nanoseconds). Consequently, event counters are accurate for ≤ [`u64::MAX`] events, and
362/// durations are accurate for ≤ [`u64::MAX`] nanoseconds.
363///
364/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::cumulative`] increase
365/// monotonically with each successive invocation of [`TaskMonitor::cumulative`]. Upon overflow,
366/// counters and durations wrap.
367///
368/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::intervals`] are
369/// calculated by computing the difference of metrics in successive invocations of
370/// [`TaskMonitor::cumulative`]. If, within a monitoring interval, an event occurs more than
371/// [`u64::MAX`] times, or a monitored duration exceeds [`u64::MAX`] nanoseconds, the metrics for
372/// that interval will overflow and not be accurate.
373///
374/// ##### Examples at the limits
375/// Consider the [`TaskMetrics::total_first_poll_delay`] metric. This metric accurately reflects
376/// delays between instrumentation and first-poll ≤ [`u64::MAX`] nanoseconds:
377/// ```
378/// use tokio::time::Duration;
379///
380/// #[tokio::main(flavor = "current_thread", start_paused = true)]
381/// async fn main() {
382///     let monitor = tokio_metrics::TaskMonitor::new();
383///     let mut interval = monitor.intervals();
384///     let mut next_interval = || interval.next().unwrap();
385///
386///     // construct and instrument a task, but do not `await` it
387///     let task = monitor.instrument(async {});
388///
389///     // this is the maximum duration representable by tokio_metrics
390///     let max_duration = Duration::from_nanos(u64::MAX);
391///
392///     // let's advance the clock by this amount and poll `task`
393///     let _ = tokio::time::advance(max_duration).await;
394///     task.await;
395///
396///     // durations ≤ `max_duration` are accurately reflected in this metric
397///     assert_eq!(next_interval().total_first_poll_delay, max_duration);
398///     assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
399/// }
400/// ```
401/// If the total delay between instrumentation and first poll exceeds [`u64::MAX`] nanoseconds,
402/// [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay] will overflow:
403/// ```
404/// # use tokio::time::Duration;
405/// #
406/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
407/// # async fn main() {
408/// #    let monitor = tokio_metrics::TaskMonitor::new();
409/// #
410///  // construct and instrument a task, but do not `await` it
411///  let task_a = monitor.instrument(async {});
412///  let task_b = monitor.instrument(async {});
413///
414///  // this is the maximum duration representable by tokio_metrics
415///  let max_duration = Duration::from_nanos(u64::MAX);
416///
417///  // let's advance the clock by 1.5x this amount and await `task`
418///  let _ = tokio::time::advance(3 * (max_duration / 2)).await;
419///  task_a.await;
420///  task_b.await;
421///
422///  // the `total_first_poll_delay` has overflowed
423///  assert!(monitor.cumulative().total_first_poll_delay < max_duration);
424/// # }
425/// ```
426/// If *many* tasks are spawned, it will take far less than a [`u64::MAX`]-nanosecond delay to bring
427/// this metric to the precipice of overflow:
428/// ```
429/// # use tokio::time::Duration;
430/// #
431/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
432/// # async fn main() {
433/// #     let monitor = tokio_metrics::TaskMonitor::new();
434/// #     let mut interval = monitor.intervals();
435/// #     let mut next_interval = || interval.next().unwrap();
436/// #
437/// // construct and instrument u16::MAX tasks, but do not `await` them
438/// let first_poll_count = u16::MAX as u64;
439/// let mut tasks = Vec::with_capacity(first_poll_count as usize);
440/// for _ in 0..first_poll_count { tasks.push(monitor.instrument(async {})); }
441///
442/// // this is the maximum duration representable by tokio_metrics
443/// let max_duration = u64::MAX;
444///
445/// // let's advance the clock justenough such that all of the time-to-first-poll
446/// // delays summed nearly equals `max_duration_nanos`, less some remainder...
447/// let iffy_delay = max_duration / (first_poll_count as u64);
448/// let small_remainder = max_duration % first_poll_count;
449/// let _ = tokio::time::advance(Duration::from_nanos(iffy_delay)).await;
450///
451/// // ...then poll all of the instrumented tasks:
452/// for task in tasks { task.await; }
453///
454/// // `total_first_poll_delay` is at the precipice of overflowing!
455/// assert_eq!(
456///     next_interval().total_first_poll_delay.as_nanos(),
457///     (max_duration - small_remainder) as u128
458/// );
459/// assert_eq!(
460///     monitor.cumulative().total_first_poll_delay.as_nanos(),
461///     (max_duration - small_remainder) as u128
462/// );
463/// # }
464/// ```
465/// Frequent, interval-sampled metrics will retain their accuracy, even if the cumulative
466/// metrics counter overflows at most once in the midst of an interval:
467/// ```
468/// # use tokio::time::Duration;
469/// # use tokio_metrics::TaskMonitor;
470/// #
471/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
472/// # async fn main() {
473/// #     let monitor = TaskMonitor::new();
474/// #     let mut interval = monitor.intervals();
475/// #     let mut next_interval = || interval.next().unwrap();
476/// #
477///  let first_poll_count = u16::MAX as u64;
478///  let batch_size = first_poll_count / 3;
479///
480///  let max_duration_ns = u64::MAX;
481///  let iffy_delay_ns = max_duration_ns / first_poll_count;
482///
483///  // Instrument `batch_size` number of tasks, wait for `delay` nanoseconds,
484///  // then await the instrumented tasks.
485///  async fn run_batch(monitor: &TaskMonitor, batch_size: usize, delay: u64) {
486///      let mut tasks = Vec::with_capacity(batch_size);
487///      for _ in 0..batch_size { tasks.push(monitor.instrument(async {})); }
488///      let _ = tokio::time::advance(Duration::from_nanos(delay)).await;
489///      for task in tasks { task.await; }
490///  }
491///
492///  // this is how much `total_time_to_first_poll_ns` will
493///  // increase with each batch we run
494///  let batch_delay = iffy_delay_ns * batch_size;
495///
496///  // run batches 1, 2, and 3
497///  for i in 1..=3 {
498///      run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
499///      assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
500///      assert_eq!(i * batch_delay as u128, monitor.cumulative().total_first_poll_delay.as_nanos());
501///  }
502///
503///  /* now, the `total_time_to_first_poll_ns` counter is at the precipice of overflow */
504///  assert_eq!(monitor.cumulative().total_first_poll_delay.as_nanos(), max_duration_ns as u128);
505///
506///  // run batch 4
507///  run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
508///  // the interval counter remains accurate
509///  assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
510///  // but the cumulative counter has overflowed
511///  assert_eq!(batch_delay as u128 - 1, monitor.cumulative().total_first_poll_delay.as_nanos());
512/// # }
513/// ```
514/// If a cumulative metric overflows *more than once* in the midst of an interval,
515/// its interval-sampled counterpart will also overflow.
516#[derive(Clone, Debug)]
517pub struct TaskMonitor {
518    metrics: Arc<RawMetrics>,
519}
520
521/// Provides an interface for constructing a [`TaskMonitor`] with specialized configuration
522/// parameters.
523#[derive(Clone, Debug, Default)]
524pub struct TaskMonitorBuilder {
525    slow_poll_threshold: Option<Duration>,
526    long_delay_threshold: Option<Duration>,
527}
528
529impl TaskMonitorBuilder {
530    /// Creates a new [`TaskMonitorBuilder`].
531    pub fn new() -> Self {
532        Self {
533            slow_poll_threshold: None,
534            long_delay_threshold: None,
535        }
536    }
537
538    /// Specifies the threshold at which polls are considered 'slow'.
539    pub fn with_slow_poll_threshold(&mut self, threshold: Duration) -> &mut Self {
540        self.slow_poll_threshold = Some(threshold);
541
542        self
543    }
544
545    /// Specifies the threshold at which schedules are considered 'long'.
546    pub fn with_long_delay_threshold(&mut self, threshold: Duration) -> &mut Self {
547        self.long_delay_threshold = Some(threshold);
548
549        self
550    }
551
552    /// Consume the builder, producing a [`TaskMonitor`].
553    pub fn build(self) -> TaskMonitor {
554        TaskMonitor::create(
555            self.slow_poll_threshold
556                .unwrap_or(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD),
557            self.long_delay_threshold
558                .unwrap_or(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD),
559        )
560    }
561}
562
563pin_project! {
564    /// An async task that has been instrumented with [`TaskMonitor::instrument`].
565    #[derive(Debug)]
566    pub struct Instrumented<T> {
567        // The task being instrumented
568        #[pin]
569        task: T,
570
571        // True when the task is polled for the first time
572        did_poll_once: bool,
573
574        // The instant, tracked as nanoseconds since `instrumented_at`, at which the future finished
575        // its last poll.
576        idled_at: u64,
577
578        // State shared between the task and its instrumented waker.
579        state: Arc<State>,
580    }
581
582    impl<T> PinnedDrop for Instrumented<T> {
583        fn drop(this: Pin<&mut Self>) {
584            this.state.metrics.dropped_count.fetch_add(1, SeqCst);
585        }
586    }
587}
588
589/// Key metrics of [instrumented][`TaskMonitor::instrument`] tasks.
590#[non_exhaustive]
591#[derive(Debug, Clone, Copy, Default)]
592pub struct TaskMetrics {
593    /// The number of tasks instrumented.
594    ///
595    /// ##### Examples
596    /// ```
597    /// #[tokio::main]
598    /// async fn main() {
599    ///     let monitor = tokio_metrics::TaskMonitor::new();
600    ///     let mut interval = monitor.intervals();
601    ///     let mut next_interval = || interval.next().unwrap();
602    ///
603    ///     // 0 tasks have been instrumented
604    ///     assert_eq!(next_interval().instrumented_count, 0);
605    ///
606    ///     monitor.instrument(async {});
607    ///
608    ///     // 1 task has been instrumented
609    ///     assert_eq!(next_interval().instrumented_count, 1);
610    ///
611    ///     monitor.instrument(async {});
612    ///     monitor.instrument(async {});
613    ///
614    ///     // 2 tasks have been instrumented
615    ///     assert_eq!(next_interval().instrumented_count, 2);
616    ///
617    ///     // since the last interval was produced, 0 tasks have been instrumented
618    ///     assert_eq!(next_interval().instrumented_count, 0);
619    /// }
620    /// ```
621    pub instrumented_count: u64,
622
623    /// The number of tasks dropped.
624    ///
625    /// ##### Examples
626    /// ```
627    /// #[tokio::main]
628    /// async fn main() {
629    ///     let monitor = tokio_metrics::TaskMonitor::new();
630    ///     let mut interval = monitor.intervals();
631    ///     let mut next_interval = || interval.next().unwrap();
632    ///
633    ///     // 0 tasks have been dropped
634    ///     assert_eq!(next_interval().dropped_count, 0);
635    ///
636    ///     let _task = monitor.instrument(async {});
637    ///
638    ///     // 0 tasks have been dropped
639    ///     assert_eq!(next_interval().dropped_count, 0);
640    ///
641    ///     monitor.instrument(async {}).await;
642    ///     drop(monitor.instrument(async {}));
643    ///
644    ///     // 2 tasks have been dropped
645    ///     assert_eq!(next_interval().dropped_count, 2);
646    ///
647    ///     // since the last interval was produced, 0 tasks have been dropped
648    ///     assert_eq!(next_interval().dropped_count, 0);
649    /// }
650    /// ```
651    pub dropped_count: u64,
652
653    /// The number of tasks polled for the first time.
654    ///
655    /// ##### Derived metrics
656    /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
657    ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
658    ///   are first polled.
659    ///
660    /// ##### Examples
661    /// In the below example, no tasks are instrumented or polled in the first sampling interval;
662    /// one task is instrumented (but not polled) in the second sampling interval; that task is
663    /// awaited to completion (and, thus, polled at least once) in the third sampling interval; no
664    /// additional tasks are polled for the first time within the fourth sampling interval:
665    /// ```
666    /// #[tokio::main]
667    /// async fn main() {
668    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
669    ///     let mut interval = metrics_monitor.intervals();
670    ///     let mut next_interval = || interval.next().unwrap();
671    ///
672    ///     // no tasks have been constructed, instrumented, and polled at least once
673    ///     assert_eq!(next_interval().first_poll_count, 0);
674    ///
675    ///     let task = metrics_monitor.instrument(async {});
676    ///
677    ///     // `task` has been constructed and instrumented, but has not yet been polled
678    ///     assert_eq!(next_interval().first_poll_count, 0);
679    ///
680    ///     // poll `task` to completion
681    ///     task.await;
682    ///
683    ///     // `task` has been constructed, instrumented, and polled at least once
684    ///     assert_eq!(next_interval().first_poll_count, 1);
685    ///
686    ///     // since the last interval was produced, 0 tasks have been constructed, instrumented and polled
687    ///     assert_eq!(next_interval().first_poll_count, 0);
688    ///
689    /// }
690    /// ```
691    pub first_poll_count: u64,
692
693    /// The total duration elapsed between the instant tasks are instrumented, and the instant they
694    /// are first polled.
695    ///
696    /// ##### Derived metrics
697    /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
698    ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
699    ///   are first polled.
700    ///
701    /// ##### Examples
702    /// In the below example, 0 tasks have been instrumented or polled within the first sampling
703    /// interval, a total of 500ms elapse between the instrumentation and polling of tasks within
704    /// the second sampling interval, and a total of 350ms elapse between the instrumentation and
705    /// polling of tasks within the third sampling interval:
706    /// ```
707    /// use tokio::time::Duration;
708    ///
709    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
710    /// async fn main() {
711    ///     let monitor = tokio_metrics::TaskMonitor::new();
712    ///     let mut interval = monitor.intervals();
713    ///     let mut next_interval = || interval.next().unwrap();
714    ///
715    ///     // no tasks have yet been created, instrumented, or polled
716    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
717    ///     assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
718    ///
719    ///     // constructs and instruments a task, pauses a given duration, then awaits the task
720    ///     async fn instrument_pause_await(monitor: &tokio_metrics::TaskMonitor, pause: Duration) {
721    ///         let task = monitor.instrument(async move {});
722    ///         tokio::time::sleep(pause).await;
723    ///         task.await;
724    ///     }
725    ///
726    ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
727    ///     let task_a_pause_time = Duration::from_millis(500);
728    ///     instrument_pause_await(&monitor, task_a_pause_time).await;
729    ///
730    ///     assert_eq!(next_interval().total_first_poll_delay, task_a_pause_time);
731    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, task_a_pause_time);
732    ///
733    ///     // construct and await a task that pauses for 250ms between instrumentation and first poll
734    ///     let task_b_pause_time = Duration::from_millis(250);
735    ///     instrument_pause_await(&monitor, task_b_pause_time).await;
736    ///
737    ///     // construct and await a task that pauses for 100ms between instrumentation and first poll
738    ///     let task_c_pause_time = Duration::from_millis(100);
739    ///     instrument_pause_await(&monitor, task_c_pause_time).await;
740    ///
741    ///     assert_eq!(
742    ///         next_interval().total_first_poll_delay,
743    ///         task_b_pause_time + task_c_pause_time
744    ///     );
745    ///     assert_eq!(
746    ///         monitor.cumulative().total_first_poll_delay,
747    ///         task_a_pause_time + task_b_pause_time + task_c_pause_time
748    ///     );
749    /// }
750    /// ```
751    ///
752    /// ##### When is this metric recorded?
753    /// The delay between instrumentation and first poll is not recorded until the first poll
754    /// actually occurs:
755    /// ```
756    /// # use tokio::time::Duration;
757    /// #
758    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
759    /// # async fn main() {
760    /// #     let monitor = tokio_metrics::TaskMonitor::new();
761    /// #     let mut interval = monitor.intervals();
762    /// #     let mut next_interval = || interval.next().unwrap();
763    /// #
764    /// // we construct and instrument a task, but do not `await` it
765    /// let task = monitor.instrument(async {});
766    ///
767    /// // let's sleep for 1s before we poll `task`
768    /// let one_sec = Duration::from_secs(1);
769    /// let _ = tokio::time::sleep(one_sec).await;
770    ///
771    /// // although 1s has now elapsed since the instrumentation of `task`,
772    /// // this is not reflected in `total_first_poll_delay`...
773    /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
774    /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
775    ///
776    /// // ...and won't be until `task` is actually polled
777    /// task.await;
778    ///
779    /// // now, the 1s delay is reflected in `total_first_poll_delay`:
780    /// assert_eq!(next_interval().total_first_poll_delay, one_sec);
781    /// assert_eq!(monitor.cumulative().total_first_poll_delay, one_sec);
782    /// # }
783    /// ```
784    ///
785    /// ##### What if first-poll-delay is very large?
786    /// The first-poll-delay of *individual* tasks saturates at `u64::MAX` nanoseconds. However, if
787    /// the *total* first-poll-delay *across* monitored tasks exceeds `u64::MAX` nanoseconds, this
788    /// metric will wrap around:
789    /// ```
790    /// use tokio::time::Duration;
791    ///
792    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
793    /// async fn main() {
794    ///     let monitor = tokio_metrics::TaskMonitor::new();
795    ///
796    ///     // construct and instrument a task, but do not `await` it
797    ///     let task = monitor.instrument(async {});
798    ///
799    ///     // this is the maximum duration representable by tokio_metrics
800    ///     let max_duration = Duration::from_nanos(u64::MAX);
801    ///
802    ///     // let's advance the clock by double this amount and await `task`
803    ///     let _ = tokio::time::advance(max_duration * 2).await;
804    ///     task.await;
805    ///
806    ///     // the time-to-first-poll of `task` saturates at `max_duration`
807    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
808    ///
809    ///     // ...but note that the metric *will* wrap around if more tasks are involved
810    ///     let task = monitor.instrument(async {});
811    ///     let _ = tokio::time::advance(Duration::from_nanos(1)).await;
812    ///     task.await;
813    ///     assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
814    /// }
815    /// ```
816    pub total_first_poll_delay: Duration,
817
818    /// The total number of times that tasks idled, waiting to be awoken.
819    ///
820    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
821    /// task completes a poll, and the instant that it is next awoken.
822    ///
823    /// ##### Derived metrics
824    /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
825    ///   The mean duration of idles.
826    ///
827    /// ##### Examples
828    /// ```
829    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
830    /// async fn main() {
831    ///     let monitor = tokio_metrics::TaskMonitor::new();
832    ///     let mut interval = monitor.intervals();
833    ///     let mut next_interval = move || interval.next().unwrap();
834    ///     let one_sec = std::time::Duration::from_secs(1);
835    ///
836    ///     monitor.instrument(async {}).await;
837    ///
838    ///     assert_eq!(next_interval().total_idled_count, 0);
839    ///     assert_eq!(monitor.cumulative().total_idled_count, 0);
840    ///
841    ///     monitor.instrument(async move {
842    ///         tokio::time::sleep(one_sec).await;
843    ///     }).await;
844    ///
845    ///     assert_eq!(next_interval().total_idled_count, 1);
846    ///     assert_eq!(monitor.cumulative().total_idled_count, 1);
847    ///
848    ///     monitor.instrument(async {
849    ///         tokio::time::sleep(one_sec).await;
850    ///         tokio::time::sleep(one_sec).await;
851    ///     }).await;
852    ///
853    ///     assert_eq!(next_interval().total_idled_count, 2);
854    ///     assert_eq!(monitor.cumulative().total_idled_count, 3);
855    /// }
856    /// ```
857    pub total_idled_count: u64,
858
859    /// The total duration that tasks idled.
860    ///
861    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
862    /// task completes a poll, and the instant that it is next awoken.
863    ///
864    /// ##### Derived metrics
865    /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
866    ///   The mean duration of idles.
867    ///
868    /// ##### Examples
869    /// ```
870    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
871    /// async fn main() {
872    ///     let monitor = tokio_metrics::TaskMonitor::new();
873    ///     let mut interval = monitor.intervals();
874    ///     let mut next_interval = move || interval.next().unwrap();
875    ///     let one_sec = std::time::Duration::from_secs(1);
876    ///     let two_sec = std::time::Duration::from_secs(2);
877    ///
878    ///     assert_eq!(next_interval().total_idle_duration.as_nanos(), 0);
879    ///     assert_eq!(monitor.cumulative().total_idle_duration.as_nanos(), 0);
880    ///
881    ///     monitor.instrument(async move {
882    ///         tokio::time::sleep(one_sec).await;
883    ///     }).await;
884    ///
885    ///     assert_eq!(next_interval().total_idle_duration, one_sec);
886    ///     assert_eq!(monitor.cumulative().total_idle_duration, one_sec);
887    ///
888    ///     monitor.instrument(async move {
889    ///         tokio::time::sleep(two_sec).await;
890    ///     }).await;
891    ///
892    ///     assert_eq!(next_interval().total_idle_duration, two_sec);
893    ///     assert_eq!(monitor.cumulative().total_idle_duration, one_sec + two_sec);
894    /// }
895    /// ```
896    pub total_idle_duration: Duration,
897
898    /// The maximum idle duration that a task took.
899    ///
900    /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
901    /// task completes a poll, and the instant that it is next awoken.
902    ///
903    /// ##### Examples
904    /// ```
905    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
906    /// async fn main() {
907    ///     let monitor = tokio_metrics::TaskMonitor::new();
908    ///     let mut interval = monitor.intervals();
909    ///     let mut next_interval = move || interval.next().unwrap();
910    ///     let one_sec = std::time::Duration::from_secs(1);
911    ///     let two_sec = std::time::Duration::from_secs(2);
912    ///
913    ///     assert_eq!(next_interval().max_idle_duration.as_nanos(), 0);
914    ///     assert_eq!(monitor.cumulative().max_idle_duration.as_nanos(), 0);
915    ///
916    ///     monitor.instrument(async move {
917    ///         tokio::time::sleep(one_sec).await;
918    ///     }).await;
919    ///
920    ///     assert_eq!(next_interval().max_idle_duration, one_sec);
921    ///     assert_eq!(monitor.cumulative().max_idle_duration, one_sec);
922    ///
923    ///     monitor.instrument(async move {
924    ///         tokio::time::sleep(two_sec).await;
925    ///     }).await;
926    ///
927    ///     assert_eq!(next_interval().max_idle_duration, two_sec);
928    ///     assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
929    ///
930    ///     monitor.instrument(async move {
931    ///         tokio::time::sleep(one_sec).await;
932    ///     }).await;
933    ///
934    ///     assert_eq!(next_interval().max_idle_duration, one_sec);
935    ///     assert_eq!(monitor.cumulative().max_idle_duration, two_sec);
936    /// }
937    /// ```
938    pub max_idle_duration: Duration,
939
940    /// The total number of times that tasks were awoken (and then, presumably, scheduled for
941    /// execution).
942    ///
943    /// ##### Definition
944    /// This metric is equal to [`total_short_delay_count`][TaskMetrics::total_short_delay_count]
945    /// \+ [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
946    ///
947    /// ##### Derived metrics
948    /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
949    ///   The mean duration that tasks spent waiting to be executed after awakening.
950    ///
951    /// ##### Examples
952    /// In the below example, a task yields to the scheduler a varying number of times between
953    /// sampling intervals; this metric is equal to the number of times the task yielded:
954    /// ```
955    /// #[tokio::main]
956    /// async fn main(){
957    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
958    ///
959    ///     // [A] no tasks have been created, instrumented, and polled more than once
960    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
961    ///
962    ///     // [B] a `task` is created and instrumented
963    ///     let task = {
964    ///         let monitor = metrics_monitor.clone();
965    ///         metrics_monitor.instrument(async move {
966    ///             let mut interval = monitor.intervals();
967    ///             let mut next_interval = move || interval.next().unwrap();
968    ///
969    ///             // [E] `task` has not yet yielded to the scheduler, and
970    ///             // thus has not yet been scheduled since its first `poll`
971    ///             assert_eq!(next_interval().total_scheduled_count, 0);
972    ///
973    ///             tokio::task::yield_now().await; // yield to the scheduler
974    ///
975    ///             // [F] `task` has yielded to the scheduler once (and thus been
976    ///             // scheduled once) since the last sampling interval
977    ///             assert_eq!(next_interval().total_scheduled_count, 1);
978    ///
979    ///             tokio::task::yield_now().await; // yield to the scheduler
980    ///             tokio::task::yield_now().await; // yield to the scheduler
981    ///             tokio::task::yield_now().await; // yield to the scheduler
982    ///
983    ///             // [G] `task` has yielded to the scheduler thrice (and thus been
984    ///             // scheduled thrice) since the last sampling interval
985    ///             assert_eq!(next_interval().total_scheduled_count, 3);
986    ///
987    ///             tokio::task::yield_now().await; // yield to the scheduler
988    ///
989    ///             next_interval
990    ///         })
991    ///     };
992    ///
993    ///     // [C] `task` has not yet been polled at all
994    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
995    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
996    ///
997    ///     // [D] poll `task` to completion
998    ///     let mut next_interval = task.await;
999    ///
1000    ///     // [H] `task` has been polled 1 times since the last sample
1001    ///     assert_eq!(next_interval().total_scheduled_count, 1);
1002    ///
1003    ///     // [I] `task` has been polled 0 times since the last sample
1004    ///     assert_eq!(next_interval().total_scheduled_count, 0);
1005    ///
1006    ///     // [J] `task` has yielded to the scheduler a total of five times
1007    ///     assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 5);
1008    /// }
1009    /// ```
1010    #[doc(alias = "total_delay_count")]
1011    pub total_scheduled_count: u64,
1012
1013    /// The total duration that tasks spent waiting to be polled after awakening.
1014    ///
1015    /// ##### Definition
1016    /// This metric is equal to [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration]
1017    /// \+ [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration].
1018    ///
1019    /// ##### Derived metrics
1020    /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
1021    ///   The mean duration that tasks spent waiting to be executed after awakening.
1022    ///
1023    /// ##### Examples
1024    /// ```
1025    /// use tokio::time::Duration;
1026    ///
1027    /// #[tokio::main(flavor = "current_thread")]
1028    /// async fn main() {
1029    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1030    ///     let mut interval = metrics_monitor.intervals();
1031    ///     let mut next_interval = || interval.next().unwrap();
1032    ///
1033    ///     // construct and instrument and spawn a task that yields endlessly
1034    ///     tokio::spawn(metrics_monitor.instrument(async {
1035    ///         loop { tokio::task::yield_now().await }
1036    ///     }));
1037    ///
1038    ///     tokio::task::yield_now().await;
1039    ///
1040    ///     // block the executor for 1 second
1041    ///     std::thread::sleep(Duration::from_millis(1000));
1042    ///
1043    ///     tokio::task::yield_now().await;
1044    ///
1045    ///     // `endless_task` will have spent approximately one second waiting
1046    ///     let total_scheduled_duration = next_interval().total_scheduled_duration;
1047    ///     assert!(total_scheduled_duration >= Duration::from_millis(1000));
1048    ///     assert!(total_scheduled_duration <= Duration::from_millis(1100));
1049    /// }
1050    /// ```
1051    #[doc(alias = "total_delay_duration")]
1052    pub total_scheduled_duration: Duration,
1053
1054    /// The total number of times that tasks were polled.
1055    ///
1056    /// ##### Definition
1057    /// This metric is equal to [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count]
1058    /// \+ [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
1059    ///
1060    /// ##### Derived metrics
1061    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1062    ///   The mean duration of polls.
1063    ///
1064    /// ##### Examples
1065    /// In the below example, a task with multiple yield points is await'ed to completion; this
1066    /// metric reflects the number of `await`s within each sampling interval:
1067    /// ```
1068    /// #[tokio::main]
1069    /// async fn main() {
1070    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1071    ///
1072    ///     // [A] no tasks have been created, instrumented, and polled more than once
1073    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1074    ///
1075    ///     // [B] a `task` is created and instrumented
1076    ///     let task = {
1077    ///         let monitor = metrics_monitor.clone();
1078    ///         metrics_monitor.instrument(async move {
1079    ///             let mut interval = monitor.intervals();
1080    ///             let mut next_interval = move || interval.next().unwrap();
1081    ///
1082    ///             // [E] task is in the midst of its first poll
1083    ///             assert_eq!(next_interval().total_poll_count, 0);
1084    ///
1085    ///             tokio::task::yield_now().await; // poll 1
1086    ///
1087    ///             // [F] task has been polled 1 time
1088    ///             assert_eq!(next_interval().total_poll_count, 1);
1089    ///
1090    ///             tokio::task::yield_now().await; // poll 2
1091    ///             tokio::task::yield_now().await; // poll 3
1092    ///             tokio::task::yield_now().await; // poll 4
1093    ///
1094    ///             // [G] task has been polled 3 times
1095    ///             assert_eq!(next_interval().total_poll_count, 3);
1096    ///
1097    ///             tokio::task::yield_now().await; // poll 5
1098    ///
1099    ///             next_interval                   // poll 6
1100    ///         })
1101    ///     };
1102    ///
1103    ///     // [C] `task` has not yet been polled at all
1104    ///     assert_eq!(metrics_monitor.cumulative().total_poll_count, 0);
1105    ///
1106    ///     // [D] poll `task` to completion
1107    ///     let mut next_interval = task.await;
1108    ///
1109    ///     // [H] `task` has been polled 2 times since the last sample
1110    ///     assert_eq!(next_interval().total_poll_count, 2);
1111    ///
1112    ///     // [I] `task` has been polled 0 times since the last sample
1113    ///     assert_eq!(next_interval().total_poll_count, 0);
1114    ///
1115    ///     // [J] `task` has been polled 6 times
1116    ///     assert_eq!(metrics_monitor.cumulative().total_poll_count, 6);
1117    /// }
1118    /// ```
1119    pub total_poll_count: u64,
1120
1121    /// The total duration elapsed during polls.
1122    ///
1123    /// ##### Definition
1124    /// This metric is equal to [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration]
1125    /// \+ [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration].
1126    ///
1127    /// ##### Derived metrics
1128    /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1129    ///   The mean duration of polls.
1130    ///
1131    /// #### Examples
1132    /// ```
1133    /// use tokio::time::Duration;
1134    ///
1135    /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1136    /// async fn main() {
1137    ///     let monitor = tokio_metrics::TaskMonitor::new();
1138    ///     let mut interval = monitor.intervals();
1139    ///     let mut next_interval = move || interval.next().unwrap();
1140    ///
1141    ///     assert_eq!(next_interval().total_poll_duration, Duration::ZERO);
1142    ///
1143    ///     monitor.instrument(async {
1144    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
1145    ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
1146    ///         ()                                                  // poll 3 (0s)
1147    ///     }).await;
1148    ///
1149    ///     assert_eq!(next_interval().total_poll_duration, Duration::from_secs(2));
1150    /// }
1151    /// ```
1152    pub total_poll_duration: Duration,
1153
1154    /// The total number of times that polling tasks completed swiftly.
1155    ///
1156    /// Here, 'swiftly' is defined as completing in strictly less time than
1157    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1158    ///
1159    /// ##### Derived metrics
1160    /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1161    ///   The mean duration of fast polls.
1162    ///
1163    /// ##### Examples
1164    /// In the below example, 0 polls occur within the first sampling interval, 3 fast polls occur
1165    /// within the second sampling interval, and 2 fast polls occur within the third sampling
1166    /// interval:
1167    /// ```
1168    /// use std::future::Future;
1169    /// use std::time::Duration;
1170    ///
1171    /// #[tokio::main]
1172    /// async fn main() {
1173    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1174    ///     let mut interval = metrics_monitor.intervals();
1175    ///     let mut next_interval = || interval.next().unwrap();
1176    ///
1177    ///     // no tasks have been constructed, instrumented, or polled
1178    ///     assert_eq!(next_interval().total_fast_poll_count, 0);
1179    ///
1180    ///     let fast = Duration::ZERO;
1181    ///
1182    ///     // this task completes in three fast polls
1183    ///     let _ = metrics_monitor.instrument(async {
1184    ///         spin_for(fast).await; // fast poll 1
1185    ///         spin_for(fast).await; // fast poll 2
1186    ///         spin_for(fast)        // fast poll 3
1187    ///     }).await;
1188    ///
1189    ///     assert_eq!(next_interval().total_fast_poll_count, 3);
1190    ///
1191    ///     // this task completes in two fast polls
1192    ///     let _ = metrics_monitor.instrument(async {
1193    ///         spin_for(fast).await; // fast poll 1
1194    ///         spin_for(fast)        // fast poll 2
1195    ///     }).await;
1196    ///
1197    ///     assert_eq!(next_interval().total_fast_poll_count, 2);
1198    /// }
1199    ///
1200    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1201    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1202    ///     let start = tokio::time::Instant::now();
1203    ///     while start.elapsed() <= duration {}
1204    ///     tokio::task::yield_now()
1205    /// }
1206    /// ```
1207    pub total_fast_poll_count: u64,
1208
1209    /// The total duration of fast polls.
1210    ///
1211    /// Here, 'fast' is defined as completing in strictly less time than
1212    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1213    ///
1214    /// ##### Derived metrics
1215    /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1216    ///   The mean duration of fast polls.
1217    ///
1218    /// ##### Examples
1219    /// In the below example, no tasks are polled in the first sampling interval; three fast polls
1220    /// consume a total of 3μs time in the second sampling interval; and two fast polls consume a
1221    /// total of 2μs time in the third sampling interval:
1222    /// ```
1223    /// use std::future::Future;
1224    /// use std::time::Duration;
1225    ///
1226    /// #[tokio::main]
1227    /// async fn main() {
1228    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1229    ///     let mut interval = metrics_monitor.intervals();
1230    ///     let mut next_interval = || interval.next().unwrap();
1231    ///
1232    ///     // no tasks have been constructed, instrumented, or polled
1233    ///     let interval = next_interval();
1234    ///     assert_eq!(interval.total_fast_poll_duration, Duration::ZERO);
1235    ///
1236    ///     let fast = Duration::from_micros(1);
1237    ///
1238    ///     // this task completes in three fast polls
1239    ///     let task_a_time = time(metrics_monitor.instrument(async {
1240    ///         spin_for(fast).await; // fast poll 1
1241    ///         spin_for(fast).await; // fast poll 2
1242    ///         spin_for(fast)        // fast poll 3
1243    ///     })).await;
1244    ///
1245    ///     let interval = next_interval();
1246    ///     assert!(interval.total_fast_poll_duration >= fast * 3);
1247    ///     assert!(interval.total_fast_poll_duration <= task_a_time);
1248    ///
1249    ///     // this task completes in two fast polls
1250    ///     let task_b_time = time(metrics_monitor.instrument(async {
1251    ///         spin_for(fast).await; // fast poll 1
1252    ///         spin_for(fast)        // fast poll 2
1253    ///     })).await;
1254    ///
1255    ///     let interval = next_interval();
1256    ///     assert!(interval.total_fast_poll_duration >= fast * 2);
1257    ///     assert!(interval.total_fast_poll_duration <= task_b_time);
1258    /// }
1259    ///
1260    /// /// Produces the amount of time it took to await a given async task.
1261    /// async fn time(task: impl Future) -> Duration {
1262    ///     let start = tokio::time::Instant::now();
1263    ///     task.await;
1264    ///     start.elapsed()
1265    /// }
1266    ///
1267    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1268    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1269    ///     let start = tokio::time::Instant::now();
1270    ///     while start.elapsed() <= duration {}
1271    ///     tokio::task::yield_now()
1272    /// }
1273    /// ```
1274    pub total_fast_poll_duration: Duration,
1275
1276    /// The total number of times that polling tasks completed slowly.
1277    ///
1278    /// Here, 'slowly' is defined as completing in at least as much time as
1279    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1280    ///
1281    /// ##### Derived metrics
1282    /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1283    ///   The mean duration of slow polls.
1284    ///
1285    /// ##### Examples
1286    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1287    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1288    /// interval:
1289    /// ```
1290    /// use std::future::Future;
1291    /// use std::time::Duration;
1292    ///
1293    /// #[tokio::main]
1294    /// async fn main() {
1295    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1296    ///     let mut interval = metrics_monitor.intervals();
1297    ///     let mut next_interval = || interval.next().unwrap();
1298    ///
1299    ///     // no tasks have been constructed, instrumented, or polled
1300    ///     assert_eq!(next_interval().total_slow_poll_count, 0);
1301    ///
1302    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1303    ///
1304    ///     // this task completes in three slow polls
1305    ///     let _ = metrics_monitor.instrument(async {
1306    ///         spin_for(slow).await; // slow poll 1
1307    ///         spin_for(slow).await; // slow poll 2
1308    ///         spin_for(slow)        // slow poll 3
1309    ///     }).await;
1310    ///
1311    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1312    ///
1313    ///     // this task completes in two slow polls
1314    ///     let _ = metrics_monitor.instrument(async {
1315    ///         spin_for(slow).await; // slow poll 1
1316    ///         spin_for(slow)        // slow poll 2
1317    ///     }).await;
1318    ///
1319    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1320    /// }
1321    ///
1322    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1323    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1324    ///     let start = tokio::time::Instant::now();
1325    ///     while start.elapsed() <= duration {}
1326    ///     tokio::task::yield_now()
1327    /// }
1328    /// ```
1329    pub total_slow_poll_count: u64,
1330
1331    /// The total duration of slow polls.
1332    ///
1333    /// Here, 'slowly' is defined as completing in at least as much time as
1334    /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1335    ///
1336    /// ##### Derived metrics
1337    /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1338    ///   The mean duration of slow polls.
1339    ///
1340    /// ##### Examples
1341    /// In the below example, no tasks are polled in the first sampling interval; three slow polls
1342    /// consume a total of
1343    /// 30 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD]
1344    /// time in the second sampling interval; and two slow polls consume a total of
1345    /// 20 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
1346    /// third sampling interval:
1347    /// ```
1348    /// use std::future::Future;
1349    /// use std::time::Duration;
1350    ///
1351    /// #[tokio::main]
1352    /// async fn main() {
1353    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1354    ///     let mut interval = metrics_monitor.intervals();
1355    ///     let mut next_interval = || interval.next().unwrap();
1356    ///
1357    ///     // no tasks have been constructed, instrumented, or polled
1358    ///     let interval = next_interval();
1359    ///     assert_eq!(interval.total_slow_poll_duration, Duration::ZERO);
1360    ///
1361    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1362    ///
1363    ///     // this task completes in three slow polls
1364    ///     let task_a_time = time(metrics_monitor.instrument(async {
1365    ///         spin_for(slow).await; // slow poll 1
1366    ///         spin_for(slow).await; // slow poll 2
1367    ///         spin_for(slow)        // slow poll 3
1368    ///     })).await;
1369    ///
1370    ///     let interval = next_interval();
1371    ///     assert!(interval.total_slow_poll_duration >= slow * 3);
1372    ///     assert!(interval.total_slow_poll_duration <= task_a_time);
1373    ///
1374    ///     // this task completes in two slow polls
1375    ///     let task_b_time = time(metrics_monitor.instrument(async {
1376    ///         spin_for(slow).await; // slow poll 1
1377    ///         spin_for(slow)        // slow poll 2
1378    ///     })).await;
1379    ///
1380    ///     let interval = next_interval();
1381    ///     assert!(interval.total_slow_poll_duration >= slow * 2);
1382    ///     assert!(interval.total_slow_poll_duration <= task_b_time);
1383    /// }
1384    ///
1385    /// /// Produces the amount of time it took to await a given async task.
1386    /// async fn time(task: impl Future) -> Duration {
1387    ///     let start = tokio::time::Instant::now();
1388    ///     task.await;
1389    ///     start.elapsed()
1390    /// }
1391    ///
1392    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1393    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1394    ///     let start = tokio::time::Instant::now();
1395    ///     while start.elapsed() <= duration {}
1396    ///     tokio::task::yield_now()
1397    /// }
1398    /// ```
1399    pub total_slow_poll_duration: Duration,
1400
1401    /// The total count of tasks with short scheduling delays.
1402    ///
1403    /// This is defined as tasks taking strictly less than
1404    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1405    /// scheduled.
1406    ///
1407    /// ##### Derived metrics
1408    /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1409    ///   The mean duration of short scheduling delays.
1410    pub total_short_delay_count: u64,
1411
1412    /// The total duration of tasks with short scheduling delays.
1413    ///
1414    /// This is defined as tasks taking strictly less than
1415    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1416    /// scheduled.
1417    ///
1418    /// ##### Derived metrics
1419    /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1420    ///   The mean duration of short scheduling delays.
1421    pub total_short_delay_duration: Duration,
1422
1423    /// The total count of tasks with long scheduling delays.
1424    ///
1425    /// This is defined as tasks taking
1426    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1427    /// after being scheduled.
1428    ///
1429    /// ##### Derived metrics
1430    /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1431    ///   The mean duration of short scheduling delays.
1432    pub total_long_delay_count: u64,
1433
1434    /// The total duration of tasks with long scheduling delays.
1435    ///
1436    /// This is defined as tasks taking
1437    /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1438    /// after being scheduled.
1439    ///
1440    /// ##### Derived metrics
1441    /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1442    ///   The mean duration of short scheduling delays.
1443    pub total_long_delay_duration: Duration,
1444}
1445
1446/// Tracks the metrics, shared across the various types.
1447#[derive(Debug)]
1448struct RawMetrics {
1449    /// A task poll takes longer than this, it is considered a slow poll.
1450    slow_poll_threshold: Duration,
1451
1452    /// A scheduling delay of at least this long will be considered a long delay
1453    long_delay_threshold: Duration,
1454
1455    /// Total number of instrumented tasks.
1456    instrumented_count: AtomicU64,
1457
1458    /// Total number of instrumented tasks polled at least once.
1459    first_poll_count: AtomicU64,
1460
1461    /// Total number of times tasks entered the `idle` state.
1462    total_idled_count: AtomicU64,
1463
1464    /// Total number of times tasks were scheduled.
1465    total_scheduled_count: AtomicU64,
1466
1467    /// Total number of times tasks were polled fast
1468    total_fast_poll_count: AtomicU64,
1469
1470    /// Total number of times tasks were polled slow
1471    total_slow_poll_count: AtomicU64,
1472
1473    /// Total number of times tasks had long delay,
1474    total_long_delay_count: AtomicU64,
1475
1476    /// Total number of times tasks had little delay
1477    total_short_delay_count: AtomicU64,
1478
1479    /// Total number of times tasks were dropped
1480    dropped_count: AtomicU64,
1481
1482    /// Total amount of time until the first poll
1483    total_first_poll_delay_ns: AtomicU64,
1484
1485    /// Total amount of time tasks spent in the `idle` state.
1486    total_idle_duration_ns: AtomicU64,
1487
1488    /// The longest time tasks spent in the `idle` state locally.
1489    /// This will be used to track the local max between interval
1490    /// metric snapshots.
1491    local_max_idle_duration_ns: AtomicU64,
1492
1493    /// The longest time tasks spent in the `idle` state.
1494    global_max_idle_duration_ns: AtomicU64,
1495
1496    /// Total amount of time tasks spent in the waking state.
1497    total_scheduled_duration_ns: AtomicU64,
1498
1499    /// Total amount of time tasks spent being polled below the slow cut off.
1500    total_fast_poll_duration_ns: AtomicU64,
1501
1502    /// Total amount of time tasks spent being polled above the slow cut off.
1503    total_slow_poll_duration: AtomicU64,
1504
1505    /// Total amount of time tasks spent being polled below the long delay cut off.
1506    total_short_delay_duration_ns: AtomicU64,
1507
1508    /// Total amount of time tasks spent being polled at or above the long delay cut off.
1509    total_long_delay_duration_ns: AtomicU64,
1510}
1511
1512#[derive(Debug)]
1513struct State {
1514    /// Where metrics should be recorded
1515    metrics: Arc<RawMetrics>,
1516
1517    /// Instant at which the task was instrumented. This is used to track the time to first poll.
1518    instrumented_at: Instant,
1519
1520    /// The instant, tracked as nanoseconds since `instrumented_at`, at which the future
1521    /// was last woken.
1522    woke_at: AtomicU64,
1523
1524    /// Waker to forward notifications to.
1525    waker: AtomicWaker,
1526}
1527
1528impl TaskMonitor {
1529    /// The default duration at which polls cross the threshold into being categorized as 'slow' is
1530    /// 50μs.
1531    #[cfg(not(test))]
1532    pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_micros(50);
1533    #[cfg(test)]
1534    #[allow(missing_docs)]
1535    pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(500);
1536
1537    /// The default duration at which schedules cross the threshold into being categorized as 'long'
1538    /// is 50μs.
1539    #[cfg(not(test))]
1540    pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_micros(50);
1541    #[cfg(test)]
1542    #[allow(missing_docs)]
1543    pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_millis(500);
1544
1545    /// Constructs a new task monitor.
1546    ///
1547    /// Uses [`Self::DEFAULT_SLOW_POLL_THRESHOLD`] as the threshold at which polls will be
1548    /// considered 'slow'.
1549    ///
1550    /// Uses [`Self::DEFAULT_LONG_DELAY_THRESHOLD`] as the threshold at which scheduling will be
1551    /// considered 'long'.
1552    pub fn new() -> TaskMonitor {
1553        TaskMonitor::with_slow_poll_threshold(Self::DEFAULT_SLOW_POLL_THRESHOLD)
1554    }
1555
1556    /// Constructs a builder for a task monitor.
1557    pub fn builder() -> TaskMonitorBuilder {
1558        TaskMonitorBuilder::new()
1559    }
1560
1561    /// Constructs a new task monitor with a given threshold at which polls are considered 'slow'.
1562    ///
1563    /// ##### Selecting an appropriate threshold
1564    /// TODO. What advice can we give here?
1565    ///
1566    /// ##### Examples
1567    /// In the below example, low-threshold and high-threshold monitors are constructed and
1568    /// instrument identical tasks; the low-threshold monitor reports4 slow polls, and the
1569    /// high-threshold monitor reports only 2 slow polls:
1570    /// ```
1571    /// use std::future::Future;
1572    /// use std::time::Duration;
1573    /// use tokio_metrics::TaskMonitor;
1574    ///
1575    /// #[tokio::main]
1576    /// async fn main() {
1577    ///     let lo_threshold = Duration::from_micros(10);
1578    ///     let hi_threshold = Duration::from_millis(10);
1579    ///
1580    ///     let lo_monitor = TaskMonitor::with_slow_poll_threshold(lo_threshold);
1581    ///     let hi_monitor = TaskMonitor::with_slow_poll_threshold(hi_threshold);
1582    ///
1583    ///     let make_task = || async {
1584    ///         spin_for(lo_threshold).await; // faster poll 1
1585    ///         spin_for(lo_threshold).await; // faster poll 2
1586    ///         spin_for(hi_threshold).await; // slower poll 3
1587    ///         spin_for(hi_threshold).await  // slower poll 4
1588    ///     };
1589    ///
1590    ///     lo_monitor.instrument(make_task()).await;
1591    ///     hi_monitor.instrument(make_task()).await;
1592    ///
1593    ///     // the low-threshold monitor reported 4 slow polls:
1594    ///     assert_eq!(lo_monitor.cumulative().total_slow_poll_count, 4);
1595    ///     // the high-threshold monitor reported only 2 slow polls:
1596    ///     assert_eq!(hi_monitor.cumulative().total_slow_poll_count, 2);
1597    /// }
1598    ///
1599    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1600    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1601    ///     let start = tokio::time::Instant::now();
1602    ///     while start.elapsed() <= duration {}
1603    ///     tokio::task::yield_now()
1604    /// }
1605    /// ```
1606    pub fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitor {
1607        Self::create(slow_poll_cut_off, Self::DEFAULT_LONG_DELAY_THRESHOLD)
1608    }
1609
1610    fn create(slow_poll_cut_off: Duration, long_delay_cut_off: Duration) -> TaskMonitor {
1611        TaskMonitor {
1612            metrics: Arc::new(RawMetrics {
1613                slow_poll_threshold: slow_poll_cut_off,
1614                first_poll_count: AtomicU64::new(0),
1615                total_idled_count: AtomicU64::new(0),
1616                total_scheduled_count: AtomicU64::new(0),
1617                total_fast_poll_count: AtomicU64::new(0),
1618                total_slow_poll_count: AtomicU64::new(0),
1619                total_long_delay_count: AtomicU64::new(0),
1620                instrumented_count: AtomicU64::new(0),
1621                dropped_count: AtomicU64::new(0),
1622                total_first_poll_delay_ns: AtomicU64::new(0),
1623                total_scheduled_duration_ns: AtomicU64::new(0),
1624                local_max_idle_duration_ns: AtomicU64::new(0),
1625                global_max_idle_duration_ns: AtomicU64::new(0),
1626                total_idle_duration_ns: AtomicU64::new(0),
1627                total_fast_poll_duration_ns: AtomicU64::new(0),
1628                total_slow_poll_duration: AtomicU64::new(0),
1629                total_short_delay_duration_ns: AtomicU64::new(0),
1630                long_delay_threshold: long_delay_cut_off,
1631                total_short_delay_count: AtomicU64::new(0),
1632                total_long_delay_duration_ns: AtomicU64::new(0),
1633            }),
1634        }
1635    }
1636
1637    /// Produces the duration greater-than-or-equal-to at which polls are categorized as slow.
1638    ///
1639    /// ##### Examples
1640    /// In the below example, [`TaskMonitor`] is initialized with [`TaskMonitor::new`];
1641    /// consequently, its slow-poll threshold equals [`TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD`]:
1642    /// ```
1643    /// use tokio_metrics::TaskMonitor;
1644    ///
1645    /// #[tokio::main]
1646    /// async fn main() {
1647    ///     let metrics_monitor = TaskMonitor::new();
1648    ///
1649    ///     assert_eq!(
1650    ///         metrics_monitor.slow_poll_threshold(),
1651    ///         TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD
1652    ///     );
1653    /// }
1654    /// ```
1655    pub fn slow_poll_threshold(&self) -> Duration {
1656        self.metrics.slow_poll_threshold
1657    }
1658
1659    /// Produces the duration greater-than-or-equal-to at which scheduling delays are categorized
1660    /// as long.
1661    pub fn long_delay_threshold(&self) -> Duration {
1662        self.metrics.long_delay_threshold
1663    }
1664
1665    /// Produces an instrumented façade around a given async task.
1666    ///
1667    /// ##### Examples
1668    /// Instrument an async task by passing it to [`TaskMonitor::instrument`]:
1669    /// ```
1670    /// #[tokio::main]
1671    /// async fn main() {
1672    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1673    ///
1674    ///     // 0 tasks have been instrumented, much less polled
1675    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1676    ///
1677    ///     // instrument a task and poll it to completion
1678    ///     metrics_monitor.instrument(async {}).await;
1679    ///
1680    ///     // 1 task has been instrumented and polled
1681    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 1);
1682    ///
1683    ///     // instrument a task and poll it to completion
1684    ///     metrics_monitor.instrument(async {}).await;
1685    ///
1686    ///     // 2 tasks have been instrumented and polled
1687    ///     assert_eq!(metrics_monitor.cumulative().first_poll_count, 2);
1688    /// }
1689    /// ```
1690    /// An aync task may be tracked by multiple [`TaskMonitor`]s; e.g.:
1691    /// ```
1692    /// #[tokio::main]
1693    /// async fn main() {
1694    ///     let monitor_a = tokio_metrics::TaskMonitor::new();
1695    ///     let monitor_b = tokio_metrics::TaskMonitor::new();
1696    ///
1697    ///     // 0 tasks have been instrumented, much less polled
1698    ///     assert_eq!(monitor_a.cumulative().first_poll_count, 0);
1699    ///     assert_eq!(monitor_b.cumulative().first_poll_count, 0);
1700    ///
1701    ///     // instrument a task and poll it to completion
1702    ///     monitor_a.instrument(monitor_b.instrument(async {})).await;
1703    ///
1704    ///     // 1 task has been instrumented and polled
1705    ///     assert_eq!(monitor_a.cumulative().first_poll_count, 1);
1706    ///     assert_eq!(monitor_b.cumulative().first_poll_count, 1);
1707    /// }
1708    /// ```
1709    /// It is also possible (but probably undesirable) to instrument an async task multiple times
1710    /// with the same [`TaskMonitor`]; e.g.:
1711    /// ```
1712    /// #[tokio::main]
1713    /// async fn main() {
1714    ///     let monitor = tokio_metrics::TaskMonitor::new();
1715    ///
1716    ///     // 0 tasks have been instrumented, much less polled
1717    ///     assert_eq!(monitor.cumulative().first_poll_count, 0);
1718    ///
1719    ///     // instrument a task and poll it to completion
1720    ///     monitor.instrument(monitor.instrument(async {})).await;
1721    ///
1722    ///     // 2 tasks have been instrumented and polled, supposedly
1723    ///     assert_eq!(monitor.cumulative().first_poll_count, 2);
1724    /// }
1725    /// ```
1726    pub fn instrument<F>(&self, task: F) -> Instrumented<F> {
1727        self.metrics.instrumented_count.fetch_add(1, SeqCst);
1728        Instrumented {
1729            task,
1730            did_poll_once: false,
1731            idled_at: 0,
1732            state: Arc::new(State {
1733                metrics: self.metrics.clone(),
1734                instrumented_at: Instant::now(),
1735                woke_at: AtomicU64::new(0),
1736                waker: AtomicWaker::new(),
1737            }),
1738        }
1739    }
1740
1741    /// Produces [`TaskMetrics`] for the tasks instrumented by this [`TaskMonitor`], collected since
1742    /// the construction of [`TaskMonitor`].
1743    ///
1744    /// ##### See also
1745    /// - [`TaskMonitor::intervals`]:
1746    ///   produces [`TaskMetrics`] for user-defined sampling intervals, instead of cumulatively
1747    ///
1748    /// ##### Examples
1749    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1750    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1751    /// interval; five slow polls occur across all sampling intervals:
1752    /// ```
1753    /// use std::future::Future;
1754    /// use std::time::Duration;
1755    ///
1756    /// #[tokio::main]
1757    /// async fn main() {
1758    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1759    ///
1760    ///     // initialize a stream of sampling intervals
1761    ///     let mut intervals = metrics_monitor.intervals();
1762    ///     // each call of `next_interval` will produce metrics for the last sampling interval
1763    ///     let mut next_interval = || intervals.next().unwrap();
1764    ///
1765    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1766    ///
1767    ///     // this task completes in three slow polls
1768    ///     let _ = metrics_monitor.instrument(async {
1769    ///         spin_for(slow).await; // slow poll 1
1770    ///         spin_for(slow).await; // slow poll 2
1771    ///         spin_for(slow)        // slow poll 3
1772    ///     }).await;
1773    ///
1774    ///     // in the previous sampling interval, there were 3 slow polls
1775    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1776    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 3);
1777    ///
1778    ///     // this task completes in two slow polls
1779    ///     let _ = metrics_monitor.instrument(async {
1780    ///         spin_for(slow).await; // slow poll 1
1781    ///         spin_for(slow)        // slow poll 2
1782    ///     }).await;
1783    ///
1784    ///     // in the previous sampling interval, there were 2 slow polls
1785    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1786    ///
1787    ///     // across all sampling interval, there were a total of 5 slow polls
1788    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1789    /// }
1790    ///
1791    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1792    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1793    ///     let start = tokio::time::Instant::now();
1794    ///     while start.elapsed() <= duration {}
1795    ///     tokio::task::yield_now()
1796    /// }
1797    /// ```
1798    pub fn cumulative(&self) -> TaskMetrics {
1799        self.metrics.metrics()
1800    }
1801
1802    /// Produces an unending iterator of metric sampling intervals.
1803    ///
1804    /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1805    /// produced by [`TaskMonitor::intervals`]. The item type of this iterator is [`TaskMetrics`],
1806    /// which is a bundle of task metrics that describe *only* events occurring within that sampling
1807    /// interval.
1808    ///
1809    /// ##### Examples
1810    /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1811    /// within the second sampling interval, and 2 slow polls occur within the third sampling
1812    /// interval; five slow polls occur across all sampling intervals:
1813    /// ```
1814    /// use std::future::Future;
1815    /// use std::time::Duration;
1816    ///
1817    /// #[tokio::main]
1818    /// async fn main() {
1819    ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1820    ///
1821    ///     // initialize a stream of sampling intervals
1822    ///     let mut intervals = metrics_monitor.intervals();
1823    ///     // each call of `next_interval` will produce metrics for the last sampling interval
1824    ///     let mut next_interval = || intervals.next().unwrap();
1825    ///
1826    ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
1827    ///
1828    ///     // this task completes in three slow polls
1829    ///     let _ = metrics_monitor.instrument(async {
1830    ///         spin_for(slow).await; // slow poll 1
1831    ///         spin_for(slow).await; // slow poll 2
1832    ///         spin_for(slow)        // slow poll 3
1833    ///     }).await;
1834    ///
1835    ///     // in the previous sampling interval, there were 3 slow polls
1836    ///     assert_eq!(next_interval().total_slow_poll_count, 3);
1837    ///
1838    ///     // this task completes in two slow polls
1839    ///     let _ = metrics_monitor.instrument(async {
1840    ///         spin_for(slow).await; // slow poll 1
1841    ///         spin_for(slow)        // slow poll 2
1842    ///     }).await;
1843    ///
1844    ///     // in the previous sampling interval, there were 2 slow polls
1845    ///     assert_eq!(next_interval().total_slow_poll_count, 2);
1846    ///
1847    ///     // across all sampling intervals, there were a total of 5 slow polls
1848    ///     assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1849    /// }
1850    ///
1851    /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1852    /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1853    ///     let start = tokio::time::Instant::now();
1854    ///     while start.elapsed() <= duration {}
1855    ///     tokio::task::yield_now()
1856    /// }
1857    /// ```
1858    pub fn intervals(&self) -> TaskIntervals {
1859        TaskIntervals {
1860            metrics: self.metrics.clone(),
1861            previous: None,
1862        }
1863    }
1864}
1865
1866impl RawMetrics {
1867    fn get_and_reset_local_max_idle_duration(&self) -> Duration {
1868        Duration::from_nanos(self.local_max_idle_duration_ns.swap(0, SeqCst))
1869    }
1870
1871    fn metrics(&self) -> TaskMetrics {
1872        let total_fast_poll_count = self.total_fast_poll_count.load(SeqCst);
1873        let total_slow_poll_count = self.total_slow_poll_count.load(SeqCst);
1874
1875        let total_fast_poll_duration =
1876            Duration::from_nanos(self.total_fast_poll_duration_ns.load(SeqCst));
1877        let total_slow_poll_duration =
1878            Duration::from_nanos(self.total_slow_poll_duration.load(SeqCst));
1879
1880        let total_poll_count = total_fast_poll_count + total_slow_poll_count;
1881        let total_poll_duration = total_fast_poll_duration + total_slow_poll_duration;
1882
1883        TaskMetrics {
1884            instrumented_count: self.instrumented_count.load(SeqCst),
1885            dropped_count: self.dropped_count.load(SeqCst),
1886
1887            total_poll_count,
1888            total_poll_duration,
1889            first_poll_count: self.first_poll_count.load(SeqCst),
1890            total_idled_count: self.total_idled_count.load(SeqCst),
1891            total_scheduled_count: self.total_scheduled_count.load(SeqCst),
1892            total_fast_poll_count: self.total_fast_poll_count.load(SeqCst),
1893            total_slow_poll_count: self.total_slow_poll_count.load(SeqCst),
1894            total_short_delay_count: self.total_short_delay_count.load(SeqCst),
1895            total_long_delay_count: self.total_long_delay_count.load(SeqCst),
1896            total_first_poll_delay: Duration::from_nanos(
1897                self.total_first_poll_delay_ns.load(SeqCst),
1898            ),
1899            max_idle_duration: Duration::from_nanos(self.global_max_idle_duration_ns.load(SeqCst)),
1900            total_idle_duration: Duration::from_nanos(self.total_idle_duration_ns.load(SeqCst)),
1901            total_scheduled_duration: Duration::from_nanos(
1902                self.total_scheduled_duration_ns.load(SeqCst),
1903            ),
1904            total_fast_poll_duration: Duration::from_nanos(
1905                self.total_fast_poll_duration_ns.load(SeqCst),
1906            ),
1907            total_slow_poll_duration: Duration::from_nanos(
1908                self.total_slow_poll_duration.load(SeqCst),
1909            ),
1910            total_short_delay_duration: Duration::from_nanos(
1911                self.total_short_delay_duration_ns.load(SeqCst),
1912            ),
1913            total_long_delay_duration: Duration::from_nanos(
1914                self.total_long_delay_duration_ns.load(SeqCst),
1915            ),
1916        }
1917    }
1918}
1919
1920impl Default for TaskMonitor {
1921    fn default() -> TaskMonitor {
1922        TaskMonitor::new()
1923    }
1924}
1925
1926derived_metrics!(
1927    [TaskMetrics] {
1928        stable {
1929            /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
1930            /// are first polled.
1931            ///
1932            /// ##### Definition
1933            /// This metric is derived from [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay]
1934            /// ÷ [`first_poll_count`][TaskMetrics::first_poll_count].
1935            ///
1936            /// ##### Interpretation
1937            /// If this metric increases, it means that, on average, tasks spent longer waiting to be
1938            /// initially polled.
1939            ///
1940            /// ##### See also
1941            /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
1942            ///   The mean duration that tasks spent waiting to be executed after awakening.
1943            ///
1944            /// ##### Examples
1945            /// In the below example, no tasks are instrumented or polled within the first sampling
1946            /// interval; in the second sampling interval, 500ms elapse between the instrumentation of a
1947            /// task and its first poll; in the third sampling interval, a mean of 750ms elapse between the
1948            /// instrumentation and first poll of two tasks:
1949            /// ```
1950            /// use std::time::Duration;
1951            ///
1952            /// #[tokio::main]
1953            /// async fn main() {
1954            ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
1955            ///     let mut interval = metrics_monitor.intervals();
1956            ///     let mut next_interval = || interval.next().unwrap();
1957            ///
1958            ///     // no tasks have yet been created, instrumented, or polled
1959            ///     assert_eq!(next_interval().mean_first_poll_delay(), Duration::ZERO);
1960            ///
1961            ///     // constructs and instruments a task, pauses for `pause_time`, awaits the task, then
1962            ///     // produces the total time it took to do all of the aforementioned
1963            ///     async fn instrument_pause_await(
1964            ///         metrics_monitor: &tokio_metrics::TaskMonitor,
1965            ///         pause_time: Duration
1966            ///     ) -> Duration
1967            ///     {
1968            ///         let before_instrumentation = tokio::time::Instant::now();
1969            ///         let task = metrics_monitor.instrument(async move {});
1970            ///         tokio::time::sleep(pause_time).await;
1971            ///         task.await;
1972            ///         before_instrumentation.elapsed()
1973            ///     }
1974            ///
1975            ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
1976            ///     let task_a_pause_time = Duration::from_millis(500);
1977            ///     let task_a_total_time = instrument_pause_await(&metrics_monitor, task_a_pause_time).await;
1978            ///
1979            ///     // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
1980            ///     // pause time of 500ms, and less-than-or-equal-to the total runtime of `task_a`
1981            ///     let mean_first_poll_delay = next_interval().mean_first_poll_delay();
1982            ///     assert!(mean_first_poll_delay >= task_a_pause_time);
1983            ///     assert!(mean_first_poll_delay <= task_a_total_time);
1984            ///
1985            ///     // construct and await a task that pauses for 500ms between instrumentation and first poll
1986            ///     let task_b_pause_time = Duration::from_millis(500);
1987            ///     let task_b_total_time = instrument_pause_await(&metrics_monitor, task_b_pause_time).await;
1988            ///
1989            ///     // construct and await a task that pauses for 1000ms between instrumentation and first poll
1990            ///     let task_c_pause_time = Duration::from_millis(1000);
1991            ///     let task_c_total_time = instrument_pause_await(&metrics_monitor, task_c_pause_time).await;
1992            ///
1993            ///     // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
1994            ///     // average pause time of 500ms, and less-than-or-equal-to the combined total runtime of
1995            ///     // `task_b` and `task_c`
1996            ///     let mean_first_poll_delay = next_interval().mean_first_poll_delay();
1997            ///     assert!(mean_first_poll_delay >= (task_b_pause_time + task_c_pause_time) / 2);
1998            ///     assert!(mean_first_poll_delay <= (task_b_total_time + task_c_total_time) / 2);
1999            /// }
2000            /// ```
2001            pub fn mean_first_poll_delay(&self) -> Duration {
2002                mean(self.total_first_poll_delay, self.first_poll_count)
2003            }
2004
2005            /// The mean duration of idles.
2006            ///
2007            /// ##### Definition
2008            /// This metric is derived from [`total_idle_duration`][TaskMetrics::total_idle_duration] ÷
2009            /// [`total_idled_count`][TaskMetrics::total_idled_count].
2010            ///
2011            /// ##### Interpretation
2012            /// The idle state is the duration spanning the instant a task completes a poll, and the instant
2013            /// that it is next awoken. Tasks inhabit this state when they are waiting for task-external
2014            /// events to complete (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this
2015            /// metric increases, it means that tasks, in aggregate, spent more time waiting for
2016            /// task-external events to complete.
2017            ///
2018            /// ##### Examples
2019            /// ```
2020            /// #[tokio::main]
2021            /// async fn main() {
2022            ///     let monitor = tokio_metrics::TaskMonitor::new();
2023            ///     let one_sec = std::time::Duration::from_secs(1);
2024            ///
2025            ///     monitor.instrument(async move {
2026            ///         tokio::time::sleep(one_sec).await;
2027            ///     }).await;
2028            ///
2029            ///     assert!(monitor.cumulative().mean_idle_duration() >= one_sec);
2030            /// }
2031            /// ```
2032            pub fn mean_idle_duration(&self) -> Duration {
2033                mean(self.total_idle_duration, self.total_idled_count)
2034            }
2035
2036            /// The mean duration that tasks spent waiting to be executed after awakening.
2037            ///
2038            /// ##### Definition
2039            /// This metric is derived from
2040            /// [`total_scheduled_duration`][TaskMetrics::total_scheduled_duration] ÷
2041            /// [`total_scheduled_count`][`TaskMetrics::total_scheduled_count`].
2042            ///
2043            /// ##### Interpretation
2044            /// If this metric increases, it means that, on average, tasks spent longer in the runtime's
2045            /// queues before being polled.
2046            ///
2047            /// ##### See also
2048            /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
2049            ///   The mean duration elapsed between the instant tasks are instrumented, and the instant they
2050            ///   are first polled.
2051            ///
2052            /// ##### Examples
2053            /// ```
2054            /// use tokio::time::Duration;
2055            ///
2056            /// #[tokio::main(flavor = "current_thread")]
2057            /// async fn main() {
2058            ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2059            ///     let mut interval = metrics_monitor.intervals();
2060            ///     let mut next_interval = || interval.next().unwrap();
2061            ///
2062            ///     // construct and instrument and spawn a task that yields endlessly
2063            ///     tokio::spawn(metrics_monitor.instrument(async {
2064            ///         loop { tokio::task::yield_now().await }
2065            ///     }));
2066            ///
2067            ///     tokio::task::yield_now().await;
2068            ///
2069            ///     // block the executor for 1 second
2070            ///     std::thread::sleep(Duration::from_millis(1000));
2071            ///
2072            ///     // get the task to run twice
2073            ///     // the first will have a 1 sec scheduling delay, the second will have almost none
2074            ///     tokio::task::yield_now().await;
2075            ///     tokio::task::yield_now().await;
2076            ///
2077            ///     // `endless_task` will have spent approximately one second waiting
2078            ///     let mean_scheduled_duration = next_interval().mean_scheduled_duration();
2079            ///     assert!(mean_scheduled_duration >= Duration::from_millis(500), "{}", mean_scheduled_duration.as_secs_f64());
2080            ///     assert!(mean_scheduled_duration <= Duration::from_millis(600), "{}", mean_scheduled_duration.as_secs_f64());
2081            /// }
2082            /// ```
2083            pub fn mean_scheduled_duration(&self) -> Duration {
2084                mean(self.total_scheduled_duration, self.total_scheduled_count)
2085            }
2086
2087            /// The mean duration of polls.
2088            ///
2089            /// ##### Definition
2090            /// This metric is derived from [`total_poll_duration`][TaskMetrics::total_poll_duration] ÷
2091            /// [`total_poll_count`][TaskMetrics::total_poll_count].
2092            ///
2093            /// ##### Interpretation
2094            /// If this metric increases, it means that, on average, individual polls are tending to take
2095            /// longer. However, this does not necessarily imply increased task latency: An increase in poll
2096            /// durations could be offset by fewer polls.
2097            ///
2098            /// ##### See also
2099            /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2100            ///   The ratio between the number polls categorized as slow and fast.
2101            /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2102            ///   The mean duration of slow polls.
2103            ///
2104            /// ##### Examples
2105            /// ```
2106            /// use std::time::Duration;
2107            ///
2108            /// #[tokio::main(flavor = "current_thread", start_paused = true)]
2109            /// async fn main() {
2110            ///     let monitor = tokio_metrics::TaskMonitor::new();
2111            ///     let mut interval = monitor.intervals();
2112            ///     let mut next_interval = move || interval.next().unwrap();
2113            ///
2114            ///     assert_eq!(next_interval().mean_poll_duration(), Duration::ZERO);
2115            ///
2116            ///     monitor.instrument(async {
2117            ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
2118            ///         tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
2119            ///         ()                                                  // poll 3 (0s)
2120            ///     }).await;
2121            ///
2122            ///     assert_eq!(next_interval().mean_poll_duration(), Duration::from_secs(2) / 3);
2123            /// }
2124            /// ```
2125            pub fn mean_poll_duration(&self) -> Duration {
2126                mean(self.total_poll_duration, self.total_poll_count)
2127            }
2128
2129            /// The ratio between the number polls categorized as slow and fast.
2130            ///
2131            /// ##### Definition
2132            /// This metric is derived from [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count] ÷
2133            /// [`total_poll_count`][TaskMetrics::total_poll_count].
2134            ///
2135            /// ##### Interpretation
2136            /// If this metric increases, it means that a greater proportion of polls took excessively long
2137            /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2138            /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2139            /// However, as a rule, *should* yield to the scheduler frequently.
2140            ///
2141            /// ##### See also
2142            /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2143            ///   The mean duration of polls.
2144            /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2145            ///   The mean duration of slow polls.
2146            ///
2147            /// ##### Examples
2148            /// Changes in this metric may be observed by varying the ratio of slow and slow fast within
2149            /// sampling intervals; for instance:
2150            /// ```
2151            /// use std::future::Future;
2152            /// use std::time::Duration;
2153            ///
2154            /// #[tokio::main]
2155            /// async fn main() {
2156            ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2157            ///     let mut interval = metrics_monitor.intervals();
2158            ///     let mut next_interval = || interval.next().unwrap();
2159            ///
2160            ///     // no tasks have been constructed, instrumented, or polled
2161            ///     let interval = next_interval();
2162            ///     assert_eq!(interval.total_fast_poll_count, 0);
2163            ///     assert_eq!(interval.total_slow_poll_count, 0);
2164            ///     assert!(interval.slow_poll_ratio().is_nan());
2165            ///
2166            ///     let fast = Duration::ZERO;
2167            ///     let slow = 10 * metrics_monitor.slow_poll_threshold();
2168            ///
2169            ///     // this task completes in three fast polls
2170            ///     metrics_monitor.instrument(async {
2171            ///         spin_for(fast).await;   // fast poll 1
2172            ///         spin_for(fast).await;   // fast poll 2
2173            ///         spin_for(fast);         // fast poll 3
2174            ///     }).await;
2175            ///
2176            ///     // this task completes in two slow polls
2177            ///     metrics_monitor.instrument(async {
2178            ///         spin_for(slow).await;   // slow poll 1
2179            ///         spin_for(slow);         // slow poll 2
2180            ///     }).await;
2181            ///
2182            ///     let interval = next_interval();
2183            ///     assert_eq!(interval.total_fast_poll_count, 3);
2184            ///     assert_eq!(interval.total_slow_poll_count, 2);
2185            ///     assert_eq!(interval.slow_poll_ratio(), ratio(2., 3.));
2186            ///
2187            ///     // this task completes in three slow polls
2188            ///     metrics_monitor.instrument(async {
2189            ///         spin_for(slow).await;   // slow poll 1
2190            ///         spin_for(slow).await;   // slow poll 2
2191            ///         spin_for(slow);         // slow poll 3
2192            ///     }).await;
2193            ///
2194            ///     // this task completes in two fast polls
2195            ///     metrics_monitor.instrument(async {
2196            ///         spin_for(fast).await; // fast poll 1
2197            ///         spin_for(fast);       // fast poll 2
2198            ///     }).await;
2199            ///
2200            ///     let interval = next_interval();
2201            ///     assert_eq!(interval.total_fast_poll_count, 2);
2202            ///     assert_eq!(interval.total_slow_poll_count, 3);
2203            ///     assert_eq!(interval.slow_poll_ratio(), ratio(3., 2.));
2204            /// }
2205            ///
2206            /// fn ratio(a: f64, b: f64) -> f64 {
2207            ///     a / (a + b)
2208            /// }
2209            ///
2210            /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2211            /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2212            ///     let start = tokio::time::Instant::now();
2213            ///     while start.elapsed() <= duration {}
2214            ///     tokio::task::yield_now()
2215            /// }
2216            /// ```
2217            pub fn slow_poll_ratio(&self) -> f64 {
2218                self.total_slow_poll_count as f64 / self.total_poll_count as f64
2219            }
2220
2221            /// The ratio of tasks exceeding [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
2222            ///
2223            /// ##### Definition
2224            /// This metric is derived from [`total_long_delay_count`][TaskMetrics::total_long_delay_count] ÷
2225            /// [`total_scheduled_count`][TaskMetrics::total_scheduled_count].
2226            pub fn long_delay_ratio(&self) -> f64 {
2227                self.total_long_delay_count as f64 / self.total_scheduled_count as f64
2228            }
2229
2230            /// The mean duration of fast polls.
2231            ///
2232            /// ##### Definition
2233            /// This metric is derived from
2234            /// [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration] ÷
2235            /// [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count].
2236            ///
2237            /// ##### Examples
2238            /// In the below example, no tasks are polled in the first sampling interval; three fast polls
2239            /// consume a mean of
2240            /// ⅜ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2241            /// second sampling interval; and two fast polls consume a total of
2242            /// ½ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2243            /// third sampling interval:
2244            /// ```
2245            /// use std::future::Future;
2246            /// use std::time::Duration;
2247            ///
2248            /// #[tokio::main]
2249            /// async fn main() {
2250            ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2251            ///     let mut interval = metrics_monitor.intervals();
2252            ///     let mut next_interval = || interval.next().unwrap();
2253            ///
2254            ///     // no tasks have been constructed, instrumented, or polled
2255            ///     assert_eq!(next_interval().mean_fast_poll_duration(), Duration::ZERO);
2256            ///
2257            ///     let threshold = metrics_monitor.slow_poll_threshold();
2258            ///     let fast_1 = 1 * Duration::from_micros(1);
2259            ///     let fast_2 = 2 * Duration::from_micros(1);
2260            ///     let fast_3 = 3 * Duration::from_micros(1);
2261            ///
2262            ///     // this task completes in two fast polls
2263            ///     let total_time = time(metrics_monitor.instrument(async {
2264            ///         spin_for(fast_1).await; // fast poll 1
2265            ///         spin_for(fast_2)        // fast poll 2
2266            ///     })).await;
2267            ///
2268            ///     // `mean_fast_poll_duration` ≈ the mean of `fast_1` and `fast_2`
2269            ///     let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2270            ///     assert!(mean_fast_poll_duration >= (fast_1 + fast_2) / 2);
2271            ///     assert!(mean_fast_poll_duration <= total_time / 2);
2272            ///
2273            ///     // this task completes in three fast polls
2274            ///     let total_time = time(metrics_monitor.instrument(async {
2275            ///         spin_for(fast_1).await; // fast poll 1
2276            ///         spin_for(fast_2).await; // fast poll 2
2277            ///         spin_for(fast_3)        // fast poll 3
2278            ///     })).await;
2279            ///
2280            ///     // `mean_fast_poll_duration` ≈ the mean of `fast_1`, `fast_2`, `fast_3`
2281            ///     let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2282            ///     assert!(mean_fast_poll_duration >= (fast_1 + fast_2 + fast_3) / 3);
2283            ///     assert!(mean_fast_poll_duration <= total_time / 3);
2284            /// }
2285            ///
2286            /// /// Produces the amount of time it took to await a given task.
2287            /// async fn time(task: impl Future) -> Duration {
2288            ///     let start = tokio::time::Instant::now();
2289            ///     task.await;
2290            ///     start.elapsed()
2291            /// }
2292            ///
2293            /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2294            /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2295            ///     let start = tokio::time::Instant::now();
2296            ///     while start.elapsed() <= duration {}
2297            ///     tokio::task::yield_now()
2298            /// }
2299            /// ```
2300            pub fn mean_fast_poll_duration(&self) -> Duration {
2301                mean(self.total_fast_poll_duration, self.total_fast_poll_count)
2302            }
2303
2304            /// The mean duration of slow polls.
2305            ///
2306            /// ##### Definition
2307            /// This metric is derived from
2308            /// [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration] ÷
2309            /// [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
2310            ///
2311            /// ##### Interpretation
2312            /// If this metric increases, it means that a greater proportion of polls took excessively long
2313            /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2314            /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2315            ///
2316            /// ##### See also
2317            /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2318            ///   The mean duration of polls.
2319            /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2320            ///   The ratio between the number polls categorized as slow and fast.
2321            ///
2322            /// ##### Interpretation
2323            /// If this metric increases, it means that, on average, slow polls got even slower. This does
2324            /// necessarily imply increased task latency: An increase in average slow poll duration could be
2325            /// offset by fewer or faster polls. However, as a rule, *should* yield to the scheduler
2326            /// frequently.
2327            ///
2328            /// ##### Examples
2329            /// In the below example, no tasks are polled in the first sampling interval; three slow polls
2330            /// consume a mean of
2331            /// 1.5 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2332            /// second sampling interval; and two slow polls consume a total of
2333            /// 2 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2334            /// third sampling interval:
2335            /// ```
2336            /// use std::future::Future;
2337            /// use std::time::Duration;
2338            ///
2339            /// #[tokio::main]
2340            /// async fn main() {
2341            ///     let metrics_monitor = tokio_metrics::TaskMonitor::new();
2342            ///     let mut interval = metrics_monitor.intervals();
2343            ///     let mut next_interval = || interval.next().unwrap();
2344            ///
2345            ///     // no tasks have been constructed, instrumented, or polled
2346            ///     assert_eq!(next_interval().mean_slow_poll_duration(), Duration::ZERO);
2347            ///
2348            ///     let threshold = metrics_monitor.slow_poll_threshold();
2349            ///     let slow_1 = 1 * threshold;
2350            ///     let slow_2 = 2 * threshold;
2351            ///     let slow_3 = 3 * threshold;
2352            ///
2353            ///     // this task completes in two slow polls
2354            ///     let total_time = time(metrics_monitor.instrument(async {
2355            ///         spin_for(slow_1).await; // slow poll 1
2356            ///         spin_for(slow_2)        // slow poll 2
2357            ///     })).await;
2358            ///
2359            ///     // `mean_slow_poll_duration` ≈ the mean of `slow_1` and `slow_2`
2360            ///     let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2361            ///     assert!(mean_slow_poll_duration >= (slow_1 + slow_2) / 2);
2362            ///     assert!(mean_slow_poll_duration <= total_time / 2);
2363            ///
2364            ///     // this task completes in three slow polls
2365            ///     let total_time = time(metrics_monitor.instrument(async {
2366            ///         spin_for(slow_1).await; // slow poll 1
2367            ///         spin_for(slow_2).await; // slow poll 2
2368            ///         spin_for(slow_3)        // slow poll 3
2369            ///     })).await;
2370            ///
2371            ///     // `mean_slow_poll_duration` ≈ the mean of `slow_1`, `slow_2`, `slow_3`
2372            ///     let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2373            ///     assert!(mean_slow_poll_duration >= (slow_1 + slow_2 + slow_3) / 3);
2374            ///     assert!(mean_slow_poll_duration <= total_time / 3);
2375            /// }
2376            ///
2377            /// /// Produces the amount of time it took to await a given task.
2378            /// async fn time(task: impl Future) -> Duration {
2379            ///     let start = tokio::time::Instant::now();
2380            ///     task.await;
2381            ///     start.elapsed()
2382            /// }
2383            ///
2384            /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2385            /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2386            ///     let start = tokio::time::Instant::now();
2387            ///     while start.elapsed() <= duration {}
2388            ///     tokio::task::yield_now()
2389            /// }
2390            /// ```
2391            pub fn mean_slow_poll_duration(&self) -> Duration {
2392                mean(self.total_slow_poll_duration, self.total_slow_poll_count)
2393            }
2394
2395            /// The average time taken for a task with a short scheduling delay to be executed after being
2396            /// scheduled.
2397            ///
2398            /// ##### Definition
2399            /// This metric is derived from
2400            /// [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration] ÷
2401            /// [`total_short_delay_count`][TaskMetrics::total_short_delay_count].
2402            pub fn mean_short_delay_duration(&self) -> Duration {
2403                mean(
2404                    self.total_short_delay_duration,
2405                    self.total_short_delay_count,
2406                )
2407            }
2408
2409            /// The average scheduling delay for a task which takes a long time to start executing after
2410            /// being scheduled.
2411            ///
2412            /// ##### Definition
2413            /// This metric is derived from
2414            /// [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration] ÷
2415            /// [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
2416            pub fn mean_long_delay_duration(&self) -> Duration {
2417                mean(self.total_long_delay_duration, self.total_long_delay_count)
2418            }
2419        }
2420        unstable {}
2421    }
2422);
2423
2424impl<T: Future> Future for Instrumented<T> {
2425    type Output = T::Output;
2426
2427    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2428        instrument_poll(cx, self, Future::poll)
2429    }
2430}
2431
2432impl<T: Stream> Stream for Instrumented<T> {
2433    type Item = T::Item;
2434
2435    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2436        instrument_poll(cx, self, Stream::poll_next)
2437    }
2438}
2439
2440fn instrument_poll<T, Out>(
2441    cx: &mut Context<'_>,
2442    instrumented: Pin<&mut Instrumented<T>>,
2443    poll_fn: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Out>,
2444) -> Poll<Out> {
2445    let poll_start = Instant::now();
2446    let this = instrumented.project();
2447    let idled_at = this.idled_at;
2448    let state = this.state;
2449    let instrumented_at = state.instrumented_at;
2450    let metrics = &state.metrics;
2451    /* accounting for time-to-first-poll and tasks-count */
2452    // is this the first time this task has been polled?
2453    if !*this.did_poll_once {
2454        // if so, we need to do three things:
2455        /* 1. note that this task *has* been polled */
2456        *this.did_poll_once = true;
2457
2458        /* 2. account for the time-to-first-poll of this task */
2459        // if the time-to-first-poll of this task exceeds `u64::MAX` ns,
2460        // round down to `u64::MAX` nanoseconds
2461        let elapsed = (poll_start - instrumented_at)
2462            .as_nanos()
2463            .try_into()
2464            .unwrap_or(u64::MAX);
2465        // add this duration to `time_to_first_poll_ns_total`
2466        metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst);
2467
2468        /* 3. increment the count of tasks that have been polled at least once */
2469        state.metrics.first_poll_count.fetch_add(1, SeqCst);
2470    }
2471    /* accounting for time-idled and time-scheduled */
2472    // 1. note (and reset) the instant this task was last awoke
2473    let woke_at = state.woke_at.swap(0, SeqCst);
2474    // The state of a future is *idling* in the interim between the instant
2475    // it completes a `poll`, and the instant it is next awoken.
2476    if *idled_at < woke_at {
2477        // increment the counter of how many idles occurred
2478        metrics.total_idled_count.fetch_add(1, SeqCst);
2479
2480        // compute the duration of the idle
2481        let idle_ns = woke_at - *idled_at;
2482
2483        // update the max time tasks spent idling, both locally and
2484        // globally.
2485        metrics
2486            .local_max_idle_duration_ns
2487            .fetch_max(idle_ns, SeqCst);
2488        metrics
2489            .global_max_idle_duration_ns
2490            .fetch_max(idle_ns, SeqCst);
2491        // adjust the total elapsed time monitored tasks spent idling
2492        metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst);
2493    }
2494    // if this task spent any time in the scheduled state after instrumentation,
2495    // and after first poll, `woke_at` will be greater than 0.
2496    if woke_at > 0 {
2497        // increment the counter of how many schedules occurred
2498        metrics.total_scheduled_count.fetch_add(1, SeqCst);
2499
2500        // recall that the `woke_at` field is internally represented as
2501        // nanoseconds-since-instrumentation. here, for accounting purposes,
2502        // we need to instead represent it as a proper `Instant`.
2503        let woke_instant = instrumented_at + Duration::from_nanos(woke_at);
2504
2505        // the duration this task spent scheduled is time time elapsed between
2506        // when this task was awoke, and when it was polled.
2507        let scheduled_ns = (poll_start - woke_instant)
2508            .as_nanos()
2509            .try_into()
2510            .unwrap_or(u64::MAX);
2511
2512        let scheduled = Duration::from_nanos(scheduled_ns);
2513
2514        let (count_bucket, duration_bucket) = // was the scheduling delay long or short?
2515            if scheduled >= metrics.long_delay_threshold {
2516                (&metrics.total_long_delay_count, &metrics.total_long_delay_duration_ns)
2517            } else {
2518                (&metrics.total_short_delay_count, &metrics.total_short_delay_duration_ns)
2519            };
2520        // update the appropriate bucket
2521        count_bucket.fetch_add(1, SeqCst);
2522        duration_bucket.fetch_add(scheduled_ns, SeqCst);
2523
2524        // add `scheduled_ns` to the Monitor's total
2525        metrics
2526            .total_scheduled_duration_ns
2527            .fetch_add(scheduled_ns, SeqCst);
2528    }
2529    // Register the waker
2530    state.waker.register(cx.waker());
2531    // Get the instrumented waker
2532    let waker_ref = futures_util::task::waker_ref(state);
2533    let mut cx = Context::from_waker(&waker_ref);
2534    // Poll the task
2535    let inner_poll_start = Instant::now();
2536    let ret = poll_fn(this.task, &mut cx);
2537    let inner_poll_end = Instant::now();
2538    /* idle time starts now */
2539    *idled_at = (inner_poll_end - instrumented_at)
2540        .as_nanos()
2541        .try_into()
2542        .unwrap_or(u64::MAX);
2543    /* accounting for poll time */
2544    let inner_poll_duration = inner_poll_end - inner_poll_start;
2545    let inner_poll_ns: u64 = inner_poll_duration
2546        .as_nanos()
2547        .try_into()
2548        .unwrap_or(u64::MAX);
2549    let (count_bucket, duration_bucket) = // was this a slow or fast poll?
2550            if inner_poll_duration >= metrics.slow_poll_threshold {
2551                (&metrics.total_slow_poll_count, &metrics.total_slow_poll_duration)
2552            } else {
2553                (&metrics.total_fast_poll_count, &metrics.total_fast_poll_duration_ns)
2554            };
2555    // update the appropriate bucket
2556    count_bucket.fetch_add(1, SeqCst);
2557    duration_bucket.fetch_add(inner_poll_ns, SeqCst);
2558    ret
2559}
2560
2561impl State {
2562    fn on_wake(&self) {
2563        let woke_at: u64 = match self.instrumented_at.elapsed().as_nanos().try_into() {
2564            Ok(woke_at) => woke_at,
2565            // This is highly unlikely as it would mean the task ran for over
2566            // 500 years. If you ran your service for 500 years. If you are
2567            // reading this 500 years in the future, I'm sorry.
2568            Err(_) => return,
2569        };
2570
2571        // We don't actually care about the result
2572        let _ = self.woke_at.compare_exchange(0, woke_at, SeqCst, SeqCst);
2573    }
2574}
2575
2576impl ArcWake for State {
2577    fn wake_by_ref(arc_self: &Arc<State>) {
2578        arc_self.on_wake();
2579        arc_self.waker.wake();
2580    }
2581
2582    fn wake(self: Arc<State>) {
2583        self.on_wake();
2584        self.waker.wake();
2585    }
2586}
2587
2588/// Iterator returned by [`TaskMonitor::intervals`].
2589///
2590/// See that method's documentation for more details.
2591#[derive(Debug)]
2592pub struct TaskIntervals {
2593    metrics: Arc<RawMetrics>,
2594    previous: Option<TaskMetrics>,
2595}
2596
2597impl TaskIntervals {
2598    fn probe(&mut self) -> TaskMetrics {
2599        let latest = self.metrics.metrics();
2600        let local_max_idle_duration = self.metrics.get_and_reset_local_max_idle_duration();
2601
2602        let next = if let Some(previous) = self.previous {
2603            TaskMetrics {
2604                instrumented_count: latest
2605                    .instrumented_count
2606                    .wrapping_sub(previous.instrumented_count),
2607                dropped_count: latest.dropped_count.wrapping_sub(previous.dropped_count),
2608                total_poll_count: latest
2609                    .total_poll_count
2610                    .wrapping_sub(previous.total_poll_count),
2611                total_poll_duration: sub(latest.total_poll_duration, previous.total_poll_duration),
2612                first_poll_count: latest
2613                    .first_poll_count
2614                    .wrapping_sub(previous.first_poll_count),
2615                total_idled_count: latest
2616                    .total_idled_count
2617                    .wrapping_sub(previous.total_idled_count),
2618                total_scheduled_count: latest
2619                    .total_scheduled_count
2620                    .wrapping_sub(previous.total_scheduled_count),
2621                total_fast_poll_count: latest
2622                    .total_fast_poll_count
2623                    .wrapping_sub(previous.total_fast_poll_count),
2624                total_short_delay_count: latest
2625                    .total_short_delay_count
2626                    .wrapping_sub(previous.total_short_delay_count),
2627                total_slow_poll_count: latest
2628                    .total_slow_poll_count
2629                    .wrapping_sub(previous.total_slow_poll_count),
2630                total_long_delay_count: latest
2631                    .total_long_delay_count
2632                    .wrapping_sub(previous.total_long_delay_count),
2633                total_first_poll_delay: sub(
2634                    latest.total_first_poll_delay,
2635                    previous.total_first_poll_delay,
2636                ),
2637                max_idle_duration: local_max_idle_duration,
2638                total_idle_duration: sub(latest.total_idle_duration, previous.total_idle_duration),
2639                total_scheduled_duration: sub(
2640                    latest.total_scheduled_duration,
2641                    previous.total_scheduled_duration,
2642                ),
2643                total_fast_poll_duration: sub(
2644                    latest.total_fast_poll_duration,
2645                    previous.total_fast_poll_duration,
2646                ),
2647                total_short_delay_duration: sub(
2648                    latest.total_short_delay_duration,
2649                    previous.total_short_delay_duration,
2650                ),
2651                total_slow_poll_duration: sub(
2652                    latest.total_slow_poll_duration,
2653                    previous.total_slow_poll_duration,
2654                ),
2655                total_long_delay_duration: sub(
2656                    latest.total_long_delay_duration,
2657                    previous.total_long_delay_duration,
2658                ),
2659            }
2660        } else {
2661            latest
2662        };
2663
2664        self.previous = Some(latest);
2665
2666        next
2667    }
2668}
2669
2670impl Iterator for TaskIntervals {
2671    type Item = TaskMetrics;
2672
2673    fn next(&mut self) -> Option<Self::Item> {
2674        Some(self.probe())
2675    }
2676}
2677
2678#[inline(always)]
2679fn to_nanos(d: Duration) -> u64 {
2680    debug_assert!(d <= Duration::from_nanos(u64::MAX));
2681    d.as_secs()
2682        .wrapping_mul(1_000_000_000)
2683        .wrapping_add(d.subsec_nanos() as u64)
2684}
2685
2686#[inline(always)]
2687fn sub(a: Duration, b: Duration) -> Duration {
2688    let nanos = to_nanos(a).wrapping_sub(to_nanos(b));
2689    Duration::from_nanos(nanos)
2690}
2691
2692#[inline(always)]
2693fn mean(d: Duration, count: u64) -> Duration {
2694    if let Some(quotient) = to_nanos(d).checked_div(count) {
2695        Duration::from_nanos(quotient)
2696    } else {
2697        Duration::ZERO
2698    }
2699}