tokio_metrics/task.rs
1use futures_util::task::{ArcWake, AtomicWaker};
2use pin_project_lite::pin_project;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
6use std::sync::Arc;
7use std::task::{Context, Poll};
8use tokio_stream::Stream;
9
10#[cfg(feature = "rt")]
11use tokio::time::{Duration, Instant};
12
13#[cfg(not(feature = "rt"))]
14use std::time::{Duration, Instant};
15
16/// Monitors key metrics of instrumented tasks.
17///
18/// ### Basic Usage
19/// A [`TaskMonitor`] tracks key [metrics][TaskMetrics] of async tasks that have been
20/// [instrumented][`TaskMonitor::instrument`] with the monitor.
21///
22/// In the below example, a [`TaskMonitor`] is [constructed][TaskMonitor::new] and used to
23/// [instrument][TaskMonitor::instrument] three worker tasks; meanwhile, a fourth task
24/// prints [metrics][TaskMetrics] in 500ms [intervals][TaskMonitor::intervals].
25/// ```
26/// use std::time::Duration;
27///
28/// #[tokio::main]
29/// async fn main() {
30/// // construct a metrics monitor
31/// let metrics_monitor = tokio_metrics::TaskMonitor::new();
32///
33/// // print task metrics every 500ms
34/// {
35/// let metrics_monitor = metrics_monitor.clone();
36/// tokio::spawn(async move {
37/// for interval in metrics_monitor.intervals() {
38/// // pretty-print the metric interval
39/// println!("{:?}", interval);
40/// // wait 500ms
41/// tokio::time::sleep(Duration::from_millis(500)).await;
42/// }
43/// });
44/// }
45///
46/// // instrument some tasks and await them
47/// // note that the same TaskMonitor can be used for multiple tasks
48/// tokio::join![
49/// metrics_monitor.instrument(do_work()),
50/// metrics_monitor.instrument(do_work()),
51/// metrics_monitor.instrument(do_work())
52/// ];
53/// }
54///
55/// async fn do_work() {
56/// for _ in 0..25 {
57/// tokio::task::yield_now().await;
58/// tokio::time::sleep(Duration::from_millis(100)).await;
59/// }
60/// }
61/// ```
62///
63/// ### What should I instrument?
64/// In most cases, you should construct a *distinct* [`TaskMonitor`] for each kind of key task.
65///
66/// #### Instrumenting a web application
67/// For instance, a web service should have a distinct [`TaskMonitor`] for each endpoint. Within
68/// each endpoint, it's prudent to additionally instrument major sub-tasks, each with their own
69/// distinct [`TaskMonitor`]s. [*Why are my tasks slow?*](#why-are-my-tasks-slow) explores a
70/// debugging scenario for a web service that takes this approach to instrumentation. This
71/// approach is exemplified in the below example:
72/// ```no_run
73/// // The unabridged version of this snippet is in the examples directory of this crate.
74///
75/// #[tokio::main]
76/// async fn main() {
77/// // construct a TaskMonitor for root endpoint
78/// let monitor_root = tokio_metrics::TaskMonitor::new();
79///
80/// // construct TaskMonitors for create_users endpoint
81/// let monitor_create_user = CreateUserMonitors {
82/// // monitor for the entire endpoint
83/// route: tokio_metrics::TaskMonitor::new(),
84/// // monitor for database insertion subtask
85/// insert: tokio_metrics::TaskMonitor::new(),
86/// };
87///
88/// // build our application with two instrumented endpoints
89/// let app = axum::Router::new()
90/// // `GET /` goes to `root`
91/// .route("/", axum::routing::get({
92/// let monitor = monitor_root.clone();
93/// move || monitor.instrument(async { "Hello, World!" })
94/// }))
95/// // `POST /users` goes to `create_user`
96/// .route("/users", axum::routing::post({
97/// let monitors = monitor_create_user.clone();
98/// let route = monitors.route.clone();
99/// move |payload| {
100/// route.instrument(create_user(payload, monitors))
101/// }
102/// }));
103///
104/// // print task metrics for each endpoint every 1s
105/// let metrics_frequency = std::time::Duration::from_secs(1);
106/// tokio::spawn(async move {
107/// let root_intervals = monitor_root.intervals();
108/// let create_user_route_intervals =
109/// monitor_create_user.route.intervals();
110/// let create_user_insert_intervals =
111/// monitor_create_user.insert.intervals();
112/// let create_user_intervals =
113/// create_user_route_intervals.zip(create_user_insert_intervals);
114///
115/// let intervals = root_intervals.zip(create_user_intervals);
116/// for (root_route, (create_user_route, create_user_insert)) in intervals {
117/// println!("root_route = {:#?}", root_route);
118/// println!("create_user_route = {:#?}", create_user_route);
119/// println!("create_user_insert = {:#?}", create_user_insert);
120/// tokio::time::sleep(metrics_frequency).await;
121/// }
122/// });
123///
124/// // run the server
125/// let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3000));
126/// axum::Server::bind(&addr)
127/// .serve(app.into_make_service())
128/// .await
129/// .unwrap();
130/// }
131///
132/// async fn create_user(
133/// axum::Json(payload): axum::Json<CreateUser>,
134/// monitors: CreateUserMonitors,
135/// ) -> impl axum::response::IntoResponse {
136/// let user = User { id: 1337, username: payload.username, };
137/// // instrument inserting the user into the db:
138/// let _ = monitors.insert.instrument(insert_user(user.clone())).await;
139/// (axum::http::StatusCode::CREATED, axum::Json(user))
140/// }
141///
142/// /* definitions of CreateUserMonitors, CreateUser and User omitted for brevity */
143///
144/// #
145/// # #[derive(Clone)]
146/// # struct CreateUserMonitors {
147/// # // monitor for the entire endpoint
148/// # route: tokio_metrics::TaskMonitor,
149/// # // monitor for database insertion subtask
150/// # insert: tokio_metrics::TaskMonitor,
151/// # }
152/// #
153/// # #[derive(serde::Deserialize)] struct CreateUser { username: String, }
154/// # #[derive(Clone, serde::Serialize)] struct User { id: u64, username: String, }
155/// #
156/// // insert the user into the database
157/// async fn insert_user(_: User) {
158/// /* implementation details elided */
159/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
160/// }
161/// ```
162///
163/// ### Why are my tasks slow?
164/// **Scenario:** You track key, high-level metrics about the customer response time. An alarm warns
165/// you that P90 latency for an endpoint exceeds your targets. What is causing the increase?
166///
167/// #### Identifying the high-level culprits
168/// A set of tasks will appear to execute more slowly if:
169/// - they are taking longer to poll (i.e., they consume too much CPU time)
170/// - they are waiting longer to be polled (e.g., they're waiting longer in tokio's scheduling
171/// queues)
172/// - they are waiting longer on external events to complete (e.g., asynchronous network requests)
173///
174/// The culprits, at a high level, may be some combination of these sources of latency. Fortunately,
175/// you have instrumented the key tasks of each of your endpoints with distinct [`TaskMonitor`]s.
176/// Using the monitors on the endpoint experiencing elevated latency, you begin by answering:
177/// - [*Are my tasks taking longer to poll?*](#are-my-tasks-taking-longer-to-poll)
178/// - [*Are my tasks spending more time waiting to be polled?*](#are-my-tasks-spending-more-time-waiting-to-be-polled)
179/// - [*Are my tasks spending more time waiting on external events to complete?*](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
180///
181/// ##### Are my tasks taking longer to poll?
182/// - **Did [`mean_poll_duration`][TaskMetrics::mean_poll_duration] increase?**
183/// This metric reflects the mean poll duration. If it increased, it means that, on average,
184/// individual polls tended to take longer. However, this does not necessarily imply increased
185/// task latency: An increase in poll durations could be offset by fewer polls.
186/// - **Did [`slow_poll_ratio`][TaskMetrics::slow_poll_ratio] increase?**
187/// This metric reflects the proportion of polls that were 'slow'. If it increased, it means that
188/// a greater proportion of polls performed excessive computation before yielding. This does not
189/// necessarily imply increased task latency: An increase in the proportion of slow polls could be
190/// offset by fewer or faster polls.
191/// - **Did [`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration] increase?**
192/// This metric reflects the mean duration of slow polls. If it increased, it means that, on
193/// average, slow polls got slower. This does not necessarily imply increased task latency: An
194/// increase in average slow poll duration could be offset by fewer or faster polls.
195///
196/// If so, [*why are my tasks taking longer to poll?*](#why-are-my-tasks-taking-longer-to-poll)
197///
198/// ##### Are my tasks spending more time waiting to be polled?
199/// - **Did [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increase?**
200/// This metric reflects the mean delay between the instant a task is first instrumented and the
201/// instant it is first polled. If it increases, it means that, on average, tasks spent longer
202/// waiting to be initially run.
203/// - **Did [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration] increase?**
204/// This metric reflects the mean duration that tasks spent in the scheduled state. The
205/// 'scheduled' state of a task is the duration between the instant a task is awoken and the
206/// instant it is subsequently polled. If this metric increases, it means that, on average, tasks
207/// spent longer in tokio's queues before being polled.
208/// - **Did [`long_delay_ratio`][TaskMetrics::long_delay_ratio] increase?**
209/// This metric reflects the proportion of scheduling delays which were 'long'. If it increased,
210/// it means that a greater proportion of tasks experienced excessive delays before they could
211/// execute after being woken. This does not necessarily indicate an increase in latency, as this
212/// could be offset by fewer or faster task polls.
213/// - **Did [`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration] increase?**
214/// This metric reflects the mean duration of long delays. If it increased, it means that, on
215/// average, long delays got even longer. This does not necessarily imply increased task latency:
216/// an increase in average long delay duration could be offset by fewer or faster polls or more
217/// short schedules.
218///
219/// If so, [*why are my tasks spending more time waiting to be polled?*](#why-are-my-tasks-spending-more-time-waiting-to-be-polled)
220///
221/// ##### Are my tasks spending more time waiting on external events to complete?
222/// - **Did [`mean_idle_duration`][TaskMetrics::mean_idle_duration] increase?**
223/// This metric reflects the mean duration that tasks spent in the idle state. The idle state is
224/// the duration spanning the instant a task completes a poll, and the instant that it is next
225/// awoken. Tasks inhabit this state when they are waiting for task-external events to complete
226/// (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this metric increases,
227/// tasks, in aggregate, spent more time waiting for task-external events to complete.
228///
229/// If so, [*why are my tasks spending more time waiting on external events to complete?*](#why-are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
230///
231/// #### Digging deeper
232/// Having [established the high-level culprits](#identifying-the-high-level-culprits), you now
233/// search for further explanation...
234///
235/// ##### Why are my tasks taking longer to poll?
236/// You observed that [your tasks are taking longer to poll](#are-my-tasks-taking-longer-to-poll).
237/// The culprit is likely some combination of:
238/// - **Your tasks are accidentally blocking.** Common culprits include:
239/// 1. Using the Rust standard library's [filesystem](https://doc.rust-lang.org/std/fs/) or
240/// [networking](https://doc.rust-lang.org/std/net/) APIs.
241/// These APIs are synchronous; use tokio's [filesystem](https://docs.rs/tokio/latest/tokio/fs/)
242/// and [networking](https://docs.rs/tokio/latest/tokio/net/) APIs, instead.
243/// 3. Calling [`block_on`](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.block_on).
244/// 4. Invoking `println!` or other synchronous logging routines.
245/// Invocations of `println!` involve acquiring an exclusive lock on stdout, followed by a
246/// synchronous write to stdout.
247/// 2. **Your tasks are computationally expensive.** Common culprits include:
248/// 1. TLS/cryptographic routines
249/// 2. doing a lot of processing on bytes
250/// 3. calling non-Tokio resources
251///
252/// ##### Why are my tasks spending more time waiting to be polled?
253/// You observed that [your tasks are spending more time waiting to be polled](#are-my-tasks-spending-more-time-waiting-to-be-polled)
254/// suggesting some combination of:
255/// - Your application is inflating the time elapsed between instrumentation and first poll.
256/// - Your tasks are being scheduled into tokio's global queue.
257/// - Other tasks are spending too long without yielding, thus backing up tokio's queues.
258///
259/// Start by asking: [*Is time-to-first-poll unusually high?*](#is-time-to-first-poll-unusually-high)
260///
261/// ##### Why are my tasks spending more time waiting on external events to complete?
262/// You observed that [your tasks are spending more time waiting waiting on external events to
263/// complete](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete). But what
264/// event? Fortunately, within the task experiencing increased idle times, you monitored several
265/// sub-tasks with distinct [`TaskMonitor`]s. For each of these sub-tasks, you [*you try to identify
266/// the performance culprits...*](#identifying-the-high-level-culprits)
267///
268/// #### Digging even deeper
269///
270/// ##### Is time-to-first-poll unusually high?
271/// Contrast these two metrics:
272/// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
273/// This metric reflects the mean delay between the instant a task is first instrumented and the
274/// instant it is *first* polled.
275/// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
276/// This metric reflects the mean delay between the instant when tasks were awoken and the
277/// instant they were subsequently polled.
278///
279/// If the former metric exceeds the latter (or increased unexpectedly more than the latter), then
280/// start by investigating [*if your application is artificially delaying the time-to-first-poll*](#is-my-application-delaying-the-time-to-first-poll).
281///
282/// Otherwise, investigate [*if other tasks are polling too long without yielding*](#are-other-tasks-polling-too-long-without-yielding).
283///
284/// ##### Is my application delaying the time-to-first-poll?
285/// You observed that [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increased, more
286/// than [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]. Your application may be
287/// needlessly inflating the time elapsed between instrumentation and first poll. Are you
288/// constructing (and instrumenting) tasks separately from awaiting or spawning them?
289///
290/// For instance, in the below example, the application induces 1 second delay between when `task`
291/// is instrumented and when it is awaited:
292/// ```rust
293/// #[tokio::main]
294/// async fn main() {
295/// use tokio::time::Duration;
296/// let monitor = tokio_metrics::TaskMonitor::new();
297///
298/// let task = monitor.instrument(async move {});
299///
300/// let one_sec = Duration::from_secs(1);
301/// tokio::time::sleep(one_sec).await;
302///
303/// let _ = tokio::spawn(task).await;
304///
305/// assert!(monitor.cumulative().total_first_poll_delay >= one_sec);
306/// }
307/// ```
308///
309/// Otherwise, [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] might be unusually high
310/// because [*your application is spawning key tasks into tokio's global queue...*](#is-my-application-spawning-more-tasks-into-tokio’s-global-queue)
311///
312/// ##### Is my application spawning more tasks into tokio's global queue?
313/// Tasks awoken from threads *not* managed by the tokio runtime are scheduled with a slower,
314/// global "injection" queue.
315///
316/// You may be notifying runtime tasks from off-runtime. For instance, Given the following:
317/// ```ignore
318/// #[tokio::main]
319/// async fn main() {
320/// for _ in 0..100 {
321/// let (tx, rx) = oneshot::channel();
322/// tokio::spawn(async move {
323/// tx.send(());
324/// })
325///
326/// rx.await;
327/// }
328/// }
329/// ```
330/// One would expect this to run efficiently, however, the main task is run *off* the main runtime
331/// and the spawned tasks are *on* runtime, which means the snippet will run much slower than:
332/// ```ignore
333/// #[tokio::main]
334/// async fn main() {
335/// tokio::spawn(async {
336/// for _ in 0..100 {
337/// let (tx, rx) = oneshot::channel();
338/// tokio::spawn(async move {
339/// tx.send(());
340/// })
341///
342/// rx.await;
343/// }
344/// }).await;
345/// }
346/// ```
347/// The slowdown is caused by a higher time between the `rx` task being notified (in `tx.send()`)
348/// and the task being polled.
349///
350/// ##### Are other tasks polling too long without yielding?
351/// You suspect that your tasks are slow because they're backed up in tokio's scheduling queues. For
352/// *each* of your application's [`TaskMonitor`]s you check to see [*if their associated tasks are
353/// taking longer to poll...*](#are-my-tasks-taking-longer-to-poll)
354///
355/// ### Limitations
356/// The [`TaskMetrics`] type uses [`u64`] to represent both event counters and durations (measured
357/// in nanoseconds). Consequently, event counters are accurate for ≤ [`u64::MAX`] events, and
358/// durations are accurate for ≤ [`u64::MAX`] nanoseconds.
359///
360/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::cumulative`] increase
361/// monotonically with each successive invocation of [`TaskMonitor::cumulative`]. Upon overflow,
362/// counters and durations wrap.
363///
364/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::intervals`] are
365/// calculated by computing the difference of metrics in successive invocations of
366/// [`TaskMonitor::cumulative`]. If, within a monitoring interval, an event occurs more than
367/// [`u64::MAX`] times, or a monitored duration exceeds [`u64::MAX`] nanoseconds, the metrics for
368/// that interval will overflow and not be accurate.
369///
370/// ##### Examples at the limits
371/// Consider the [`TaskMetrics::total_first_poll_delay`] metric. This metric accurately reflects
372/// delays between instrumentation and first-poll ≤ [`u64::MAX`] nanoseconds:
373/// ```
374/// use tokio::time::Duration;
375///
376/// #[tokio::main(flavor = "current_thread", start_paused = true)]
377/// async fn main() {
378/// let monitor = tokio_metrics::TaskMonitor::new();
379/// let mut interval = monitor.intervals();
380/// let mut next_interval = || interval.next().unwrap();
381///
382/// // construct and instrument a task, but do not `await` it
383/// let task = monitor.instrument(async {});
384///
385/// // this is the maximum duration representable by tokio_metrics
386/// let max_duration = Duration::from_nanos(u64::MAX);
387///
388/// // let's advance the clock by this amount and poll `task`
389/// let _ = tokio::time::advance(max_duration).await;
390/// task.await;
391///
392/// // durations ≤ `max_duration` are accurately reflected in this metric
393/// assert_eq!(next_interval().total_first_poll_delay, max_duration);
394/// assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
395/// }
396/// ```
397/// If the total delay between instrumentation and first poll exceeds [`u64::MAX`] nanoseconds,
398/// [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay] will overflow:
399/// ```
400/// # use tokio::time::Duration;
401/// #
402/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
403/// # async fn main() {
404/// # let monitor = tokio_metrics::TaskMonitor::new();
405/// #
406/// // construct and instrument a task, but do not `await` it
407/// let task_a = monitor.instrument(async {});
408/// let task_b = monitor.instrument(async {});
409///
410/// // this is the maximum duration representable by tokio_metrics
411/// let max_duration = Duration::from_nanos(u64::MAX);
412///
413/// // let's advance the clock by 1.5x this amount and await `task`
414/// let _ = tokio::time::advance(3 * (max_duration / 2)).await;
415/// task_a.await;
416/// task_b.await;
417///
418/// // the `total_first_poll_delay` has overflowed
419/// assert!(monitor.cumulative().total_first_poll_delay < max_duration);
420/// # }
421/// ```
422/// If *many* tasks are spawned, it will take far less than a [`u64::MAX`]-nanosecond delay to bring
423/// this metric to the precipice of overflow:
424/// ```
425/// # use tokio::time::Duration;
426/// #
427/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
428/// # async fn main() {
429/// # let monitor = tokio_metrics::TaskMonitor::new();
430/// # let mut interval = monitor.intervals();
431/// # let mut next_interval = || interval.next().unwrap();
432/// #
433/// // construct and instrument u16::MAX tasks, but do not `await` them
434/// let first_poll_count = u16::MAX as u64;
435/// let mut tasks = Vec::with_capacity(first_poll_count as usize);
436/// for _ in 0..first_poll_count { tasks.push(monitor.instrument(async {})); }
437///
438/// // this is the maximum duration representable by tokio_metrics
439/// let max_duration = u64::MAX;
440///
441/// // let's advance the clock justenough such that all of the time-to-first-poll
442/// // delays summed nearly equals `max_duration_nanos`, less some remainder...
443/// let iffy_delay = max_duration / (first_poll_count as u64);
444/// let small_remainder = max_duration % first_poll_count;
445/// let _ = tokio::time::advance(Duration::from_nanos(iffy_delay)).await;
446///
447/// // ...then poll all of the instrumented tasks:
448/// for task in tasks { task.await; }
449///
450/// // `total_first_poll_delay` is at the precipice of overflowing!
451/// assert_eq!(
452/// next_interval().total_first_poll_delay.as_nanos(),
453/// (max_duration - small_remainder) as u128
454/// );
455/// assert_eq!(
456/// monitor.cumulative().total_first_poll_delay.as_nanos(),
457/// (max_duration - small_remainder) as u128
458/// );
459/// # }
460/// ```
461/// Frequent, interval-sampled metrics will retain their accuracy, even if the cumulative
462/// metrics counter overflows at most once in the midst of an interval:
463/// ```
464/// # use tokio::time::Duration;
465/// # use tokio_metrics::TaskMonitor;
466/// #
467/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
468/// # async fn main() {
469/// # let monitor = TaskMonitor::new();
470/// # let mut interval = monitor.intervals();
471/// # let mut next_interval = || interval.next().unwrap();
472/// #
473/// let first_poll_count = u16::MAX as u64;
474/// let batch_size = first_poll_count / 3;
475///
476/// let max_duration_ns = u64::MAX;
477/// let iffy_delay_ns = max_duration_ns / first_poll_count;
478///
479/// // Instrument `batch_size` number of tasks, wait for `delay` nanoseconds,
480/// // then await the instrumented tasks.
481/// async fn run_batch(monitor: &TaskMonitor, batch_size: usize, delay: u64) {
482/// let mut tasks = Vec::with_capacity(batch_size);
483/// for _ in 0..batch_size { tasks.push(monitor.instrument(async {})); }
484/// let _ = tokio::time::advance(Duration::from_nanos(delay)).await;
485/// for task in tasks { task.await; }
486/// }
487///
488/// // this is how much `total_time_to_first_poll_ns` will
489/// // increase with each batch we run
490/// let batch_delay = iffy_delay_ns * batch_size;
491///
492/// // run batches 1, 2, and 3
493/// for i in 1..=3 {
494/// run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
495/// assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
496/// assert_eq!(i * batch_delay as u128, monitor.cumulative().total_first_poll_delay.as_nanos());
497/// }
498///
499/// /* now, the `total_time_to_first_poll_ns` counter is at the precipice of overflow */
500/// assert_eq!(monitor.cumulative().total_first_poll_delay.as_nanos(), max_duration_ns as u128);
501///
502/// // run batch 4
503/// run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
504/// // the interval counter remains accurate
505/// assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
506/// // but the cumulative counter has overflowed
507/// assert_eq!(batch_delay as u128 - 1, monitor.cumulative().total_first_poll_delay.as_nanos());
508/// # }
509/// ```
510/// If a cumulative metric overflows *more than once* in the midst of an interval,
511/// its interval-sampled counterpart will also overflow.
512#[derive(Clone, Debug)]
513pub struct TaskMonitor {
514 metrics: Arc<RawMetrics>,
515}
516
517/// Provides an interface for constructing a [`TaskMonitor`] with specialized configuration
518/// parameters.
519#[derive(Clone, Debug, Default)]
520pub struct TaskMonitorBuilder {
521 slow_poll_threshold: Option<Duration>,
522 long_delay_threshold: Option<Duration>,
523}
524
525impl TaskMonitorBuilder {
526 /// Creates a new [`TaskMonitorBuilder`].
527 pub fn new() -> Self {
528 Self {
529 slow_poll_threshold: None,
530 long_delay_threshold: None,
531 }
532 }
533
534 /// Specifies the threshold at which polls are considered 'slow'.
535 pub fn with_slow_poll_threshold(&mut self, threshold: Duration) -> &mut Self {
536 self.slow_poll_threshold = Some(threshold);
537
538 self
539 }
540
541 /// Specifies the threshold at which schedules are considered 'long'.
542 pub fn with_long_delay_threshold(&mut self, threshold: Duration) -> &mut Self {
543 self.long_delay_threshold = Some(threshold);
544
545 self
546 }
547
548 /// Consume the builder, producing a [`TaskMonitor`].
549 pub fn build(self) -> TaskMonitor {
550 TaskMonitor::create(
551 self.slow_poll_threshold
552 .unwrap_or(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD),
553 self.long_delay_threshold
554 .unwrap_or(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD),
555 )
556 }
557}
558
559pin_project! {
560 /// An async task that has been instrumented with [`TaskMonitor::instrument`].
561 #[derive(Debug)]
562 pub struct Instrumented<T> {
563 // The task being instrumented
564 #[pin]
565 task: T,
566
567 // True when the task is polled for the first time
568 did_poll_once: bool,
569
570 // The instant, tracked as nanoseconds since `instrumented_at`, at which the future finished
571 // its last poll.
572 idled_at: u64,
573
574 // State shared between the task and its instrumented waker.
575 state: Arc<State>,
576 }
577
578 impl<T> PinnedDrop for Instrumented<T> {
579 fn drop(this: Pin<&mut Self>) {
580 this.state.metrics.dropped_count.fetch_add(1, SeqCst);
581 }
582 }
583}
584
585/// Key metrics of [instrumented][`TaskMonitor::instrument`] tasks.
586#[non_exhaustive]
587#[derive(Debug, Clone, Copy, Default)]
588pub struct TaskMetrics {
589 /// The number of tasks instrumented.
590 ///
591 /// ##### Examples
592 /// ```
593 /// #[tokio::main]
594 /// async fn main() {
595 /// let monitor = tokio_metrics::TaskMonitor::new();
596 /// let mut interval = monitor.intervals();
597 /// let mut next_interval = || interval.next().unwrap();
598 ///
599 /// // 0 tasks have been instrumented
600 /// assert_eq!(next_interval().instrumented_count, 0);
601 ///
602 /// monitor.instrument(async {});
603 ///
604 /// // 1 task has been instrumented
605 /// assert_eq!(next_interval().instrumented_count, 1);
606 ///
607 /// monitor.instrument(async {});
608 /// monitor.instrument(async {});
609 ///
610 /// // 2 tasks have been instrumented
611 /// assert_eq!(next_interval().instrumented_count, 2);
612 ///
613 /// // since the last interval was produced, 0 tasks have been instrumented
614 /// assert_eq!(next_interval().instrumented_count, 0);
615 /// }
616 /// ```
617 pub instrumented_count: u64,
618
619 /// The number of tasks dropped.
620 ///
621 /// ##### Examples
622 /// ```
623 /// #[tokio::main]
624 /// async fn main() {
625 /// let monitor = tokio_metrics::TaskMonitor::new();
626 /// let mut interval = monitor.intervals();
627 /// let mut next_interval = || interval.next().unwrap();
628 ///
629 /// // 0 tasks have been dropped
630 /// assert_eq!(next_interval().dropped_count, 0);
631 ///
632 /// let _task = monitor.instrument(async {});
633 ///
634 /// // 0 tasks have been dropped
635 /// assert_eq!(next_interval().dropped_count, 0);
636 ///
637 /// monitor.instrument(async {}).await;
638 /// drop(monitor.instrument(async {}));
639 ///
640 /// // 2 tasks have been dropped
641 /// assert_eq!(next_interval().dropped_count, 2);
642 ///
643 /// // since the last interval was produced, 0 tasks have been dropped
644 /// assert_eq!(next_interval().dropped_count, 0);
645 /// }
646 /// ```
647 pub dropped_count: u64,
648
649 /// The number of tasks polled for the first time.
650 ///
651 /// ##### Derived metrics
652 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
653 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
654 /// are first polled.
655 ///
656 /// ##### Examples
657 /// In the below example, no tasks are instrumented or polled in the first sampling interval;
658 /// one task is instrumented (but not polled) in the second sampling interval; that task is
659 /// awaited to completion (and, thus, polled at least once) in the third sampling interval; no
660 /// additional tasks are polled for the first time within the fourth sampling interval:
661 /// ```
662 /// #[tokio::main]
663 /// async fn main() {
664 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
665 /// let mut interval = metrics_monitor.intervals();
666 /// let mut next_interval = || interval.next().unwrap();
667 ///
668 /// // no tasks have been constructed, instrumented, and polled at least once
669 /// assert_eq!(next_interval().first_poll_count, 0);
670 ///
671 /// let task = metrics_monitor.instrument(async {});
672 ///
673 /// // `task` has been constructed and instrumented, but has not yet been polled
674 /// assert_eq!(next_interval().first_poll_count, 0);
675 ///
676 /// // poll `task` to completion
677 /// task.await;
678 ///
679 /// // `task` has been constructed, instrumented, and polled at least once
680 /// assert_eq!(next_interval().first_poll_count, 1);
681 ///
682 /// // since the last interval was produced, 0 tasks have been constructed, instrumented and polled
683 /// assert_eq!(next_interval().first_poll_count, 0);
684 ///
685 /// }
686 /// ```
687 pub first_poll_count: u64,
688
689 /// The total duration elapsed between the instant tasks are instrumented, and the instant they
690 /// are first polled.
691 ///
692 /// ##### Derived metrics
693 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
694 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
695 /// are first polled.
696 ///
697 /// ##### Examples
698 /// In the below example, 0 tasks have been instrumented or polled within the first sampling
699 /// interval, a total of 500ms elapse between the instrumentation and polling of tasks within
700 /// the second sampling interval, and a total of 350ms elapse between the instrumentation and
701 /// polling of tasks within the third sampling interval:
702 /// ```
703 /// use tokio::time::Duration;
704 ///
705 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
706 /// async fn main() {
707 /// let monitor = tokio_metrics::TaskMonitor::new();
708 /// let mut interval = monitor.intervals();
709 /// let mut next_interval = || interval.next().unwrap();
710 ///
711 /// // no tasks have yet been created, instrumented, or polled
712 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
713 /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
714 ///
715 /// // constructs and instruments a task, pauses a given duration, then awaits the task
716 /// async fn instrument_pause_await(monitor: &tokio_metrics::TaskMonitor, pause: Duration) {
717 /// let task = monitor.instrument(async move {});
718 /// tokio::time::sleep(pause).await;
719 /// task.await;
720 /// }
721 ///
722 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
723 /// let task_a_pause_time = Duration::from_millis(500);
724 /// instrument_pause_await(&monitor, task_a_pause_time).await;
725 ///
726 /// assert_eq!(next_interval().total_first_poll_delay, task_a_pause_time);
727 /// assert_eq!(monitor.cumulative().total_first_poll_delay, task_a_pause_time);
728 ///
729 /// // construct and await a task that pauses for 250ms between instrumentation and first poll
730 /// let task_b_pause_time = Duration::from_millis(250);
731 /// instrument_pause_await(&monitor, task_b_pause_time).await;
732 ///
733 /// // construct and await a task that pauses for 100ms between instrumentation and first poll
734 /// let task_c_pause_time = Duration::from_millis(100);
735 /// instrument_pause_await(&monitor, task_c_pause_time).await;
736 ///
737 /// assert_eq!(
738 /// next_interval().total_first_poll_delay,
739 /// task_b_pause_time + task_c_pause_time
740 /// );
741 /// assert_eq!(
742 /// monitor.cumulative().total_first_poll_delay,
743 /// task_a_pause_time + task_b_pause_time + task_c_pause_time
744 /// );
745 /// }
746 /// ```
747 ///
748 /// ##### When is this metric recorded?
749 /// The delay between instrumentation and first poll is not recorded until the first poll
750 /// actually occurs:
751 /// ```
752 /// # use tokio::time::Duration;
753 /// #
754 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
755 /// # async fn main() {
756 /// # let monitor = tokio_metrics::TaskMonitor::new();
757 /// # let mut interval = monitor.intervals();
758 /// # let mut next_interval = || interval.next().unwrap();
759 /// #
760 /// // we construct and instrument a task, but do not `await` it
761 /// let task = monitor.instrument(async {});
762 ///
763 /// // let's sleep for 1s before we poll `task`
764 /// let one_sec = Duration::from_secs(1);
765 /// let _ = tokio::time::sleep(one_sec).await;
766 ///
767 /// // although 1s has now elapsed since the instrumentation of `task`,
768 /// // this is not reflected in `total_first_poll_delay`...
769 /// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
770 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
771 ///
772 /// // ...and won't be until `task` is actually polled
773 /// task.await;
774 ///
775 /// // now, the 1s delay is reflected in `total_first_poll_delay`:
776 /// assert_eq!(next_interval().total_first_poll_delay, one_sec);
777 /// assert_eq!(monitor.cumulative().total_first_poll_delay, one_sec);
778 /// # }
779 /// ```
780 ///
781 /// ##### What if first-poll-delay is very large?
782 /// The first-poll-delay of *individual* tasks saturates at `u64::MAX` nanoseconds. However, if
783 /// the *total* first-poll-delay *across* monitored tasks exceeds `u64::MAX` nanoseconds, this
784 /// metric will wrap around:
785 /// ```
786 /// use tokio::time::Duration;
787 ///
788 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
789 /// async fn main() {
790 /// let monitor = tokio_metrics::TaskMonitor::new();
791 ///
792 /// // construct and instrument a task, but do not `await` it
793 /// let task = monitor.instrument(async {});
794 ///
795 /// // this is the maximum duration representable by tokio_metrics
796 /// let max_duration = Duration::from_nanos(u64::MAX);
797 ///
798 /// // let's advance the clock by double this amount and await `task`
799 /// let _ = tokio::time::advance(max_duration * 2).await;
800 /// task.await;
801 ///
802 /// // the time-to-first-poll of `task` saturates at `max_duration`
803 /// assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
804 ///
805 /// // ...but note that the metric *will* wrap around if more tasks are involved
806 /// let task = monitor.instrument(async {});
807 /// let _ = tokio::time::advance(Duration::from_nanos(1)).await;
808 /// task.await;
809 /// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
810 /// }
811 /// ```
812 pub total_first_poll_delay: Duration,
813
814 /// The total number of times that tasks idled, waiting to be awoken.
815 ///
816 /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
817 /// task completes a poll, and the instant that it is next awoken.
818 ///
819 /// ##### Derived metrics
820 /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
821 /// The mean duration of idles.
822 ///
823 /// ##### Examples
824 /// ```
825 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
826 /// async fn main() {
827 /// let monitor = tokio_metrics::TaskMonitor::new();
828 /// let mut interval = monitor.intervals();
829 /// let mut next_interval = move || interval.next().unwrap();
830 /// let one_sec = std::time::Duration::from_secs(1);
831 ///
832 /// monitor.instrument(async {}).await;
833 ///
834 /// assert_eq!(next_interval().total_idled_count, 0);
835 /// assert_eq!(monitor.cumulative().total_idled_count, 0);
836 ///
837 /// monitor.instrument(async move {
838 /// tokio::time::sleep(one_sec).await;
839 /// }).await;
840 ///
841 /// assert_eq!(next_interval().total_idled_count, 1);
842 /// assert_eq!(monitor.cumulative().total_idled_count, 1);
843 ///
844 /// monitor.instrument(async {
845 /// tokio::time::sleep(one_sec).await;
846 /// tokio::time::sleep(one_sec).await;
847 /// }).await;
848 ///
849 /// assert_eq!(next_interval().total_idled_count, 2);
850 /// assert_eq!(monitor.cumulative().total_idled_count, 3);
851 /// }
852 /// ```
853 pub total_idled_count: u64,
854
855 /// The total duration that tasks idled.
856 ///
857 /// An idle is recorded as occurring if a non-zero duration elapses between the instant a
858 /// task completes a poll, and the instant that it is next awoken.
859 ///
860 /// ##### Derived metrics
861 /// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
862 /// The mean duration of idles.
863 ///
864 /// ##### Examples
865 /// ```
866 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
867 /// async fn main() {
868 /// let monitor = tokio_metrics::TaskMonitor::new();
869 /// let mut interval = monitor.intervals();
870 /// let mut next_interval = move || interval.next().unwrap();
871 /// let one_sec = std::time::Duration::from_secs(1);
872 /// let two_sec = std::time::Duration::from_secs(2);
873 ///
874 /// assert_eq!(next_interval().total_idle_duration.as_nanos(), 0);
875 /// assert_eq!(monitor.cumulative().total_idle_duration.as_nanos(), 0);
876 ///
877 /// monitor.instrument(async move {
878 /// tokio::time::sleep(one_sec).await;
879 /// }).await;
880 ///
881 /// assert_eq!(next_interval().total_idle_duration, one_sec);
882 /// assert_eq!(monitor.cumulative().total_idle_duration, one_sec);
883 ///
884 /// monitor.instrument(async move {
885 /// tokio::time::sleep(two_sec).await;
886 /// }).await;
887 ///
888 /// assert_eq!(next_interval().total_idle_duration, two_sec);
889 /// assert_eq!(monitor.cumulative().total_idle_duration, one_sec + two_sec);
890 /// }
891 /// ```
892 pub total_idle_duration: Duration,
893
894 /// The total number of times that tasks were awoken (and then, presumably, scheduled for
895 /// execution).
896 ///
897 /// ##### Definition
898 /// This metric is equal to [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration]
899 /// \+ [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration].
900 ///
901 /// ##### Derived metrics
902 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
903 /// The mean duration that tasks spent waiting to be executed after awakening.
904 ///
905 /// ##### Examples
906 /// In the below example, a task yields to the scheduler a varying number of times between
907 /// sampling intervals; this metric is equal to the number of times the task yielded:
908 /// ```
909 /// #[tokio::main]
910 /// async fn main(){
911 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
912 ///
913 /// // [A] no tasks have been created, instrumented, and polled more than once
914 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
915 ///
916 /// // [B] a `task` is created and instrumented
917 /// let task = {
918 /// let monitor = metrics_monitor.clone();
919 /// metrics_monitor.instrument(async move {
920 /// let mut interval = monitor.intervals();
921 /// let mut next_interval = move || interval.next().unwrap();
922 ///
923 /// // [E] `task` has not yet yielded to the scheduler, and
924 /// // thus has not yet been scheduled since its first `poll`
925 /// assert_eq!(next_interval().total_scheduled_count, 0);
926 ///
927 /// tokio::task::yield_now().await; // yield to the scheduler
928 ///
929 /// // [F] `task` has yielded to the scheduler once (and thus been
930 /// // scheduled once) since the last sampling interval
931 /// assert_eq!(next_interval().total_scheduled_count, 1);
932 ///
933 /// tokio::task::yield_now().await; // yield to the scheduler
934 /// tokio::task::yield_now().await; // yield to the scheduler
935 /// tokio::task::yield_now().await; // yield to the scheduler
936 ///
937 /// // [G] `task` has yielded to the scheduler thrice (and thus been
938 /// // scheduled thrice) since the last sampling interval
939 /// assert_eq!(next_interval().total_scheduled_count, 3);
940 ///
941 /// tokio::task::yield_now().await; // yield to the scheduler
942 ///
943 /// next_interval
944 /// })
945 /// };
946 ///
947 /// // [C] `task` has not yet been polled at all
948 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
949 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
950 ///
951 /// // [D] poll `task` to completion
952 /// let mut next_interval = task.await;
953 ///
954 /// // [H] `task` has been polled 1 times since the last sample
955 /// assert_eq!(next_interval().total_scheduled_count, 1);
956 ///
957 /// // [I] `task` has been polled 0 times since the last sample
958 /// assert_eq!(next_interval().total_scheduled_count, 0);
959 ///
960 /// // [J] `task` has yielded to the scheduler a total of five times
961 /// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 5);
962 /// }
963 /// ```
964 #[doc(alias = "total_delay_count")]
965 pub total_scheduled_count: u64,
966
967 /// The total duration that tasks spent waiting to be polled after awakening.
968 ///
969 /// ##### Definition
970 /// This metric is equal to [`total_short_delay_count`][TaskMetrics::total_short_delay_count]
971 /// \+ [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
972 ///
973 /// ##### Derived metrics
974 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
975 /// The mean duration that tasks spent waiting to be executed after awakening.
976 ///
977 /// ##### Examples
978 /// ```
979 /// use tokio::time::Duration;
980 ///
981 /// #[tokio::main(flavor = "current_thread")]
982 /// async fn main() {
983 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
984 /// let mut interval = metrics_monitor.intervals();
985 /// let mut next_interval = || interval.next().unwrap();
986 ///
987 /// // construct and instrument and spawn a task that yields endlessly
988 /// tokio::spawn(metrics_monitor.instrument(async {
989 /// loop { tokio::task::yield_now().await }
990 /// }));
991 ///
992 /// tokio::task::yield_now().await;
993 ///
994 /// // block the executor for 1 second
995 /// std::thread::sleep(Duration::from_millis(1000));
996 ///
997 /// tokio::task::yield_now().await;
998 ///
999 /// // `endless_task` will have spent approximately one second waiting
1000 /// let total_scheduled_duration = next_interval().total_scheduled_duration;
1001 /// assert!(total_scheduled_duration >= Duration::from_millis(1000));
1002 /// assert!(total_scheduled_duration <= Duration::from_millis(1100));
1003 /// }
1004 /// ```
1005 #[doc(alias = "total_delay_duration")]
1006 pub total_scheduled_duration: Duration,
1007
1008 /// The total number of times that tasks were polled.
1009 ///
1010 /// ##### Definition
1011 /// This metric is equal to [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count]
1012 /// \+ [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
1013 ///
1014 /// ##### Derived metrics
1015 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1016 /// The mean duration of polls.
1017 ///
1018 /// ##### Examples
1019 /// In the below example, a task with multiple yield points is await'ed to completion; this
1020 /// metric reflects the number of `await`s within each sampling interval:
1021 /// ```
1022 /// #[tokio::main]
1023 /// async fn main() {
1024 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1025 ///
1026 /// // [A] no tasks have been created, instrumented, and polled more than once
1027 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1028 ///
1029 /// // [B] a `task` is created and instrumented
1030 /// let task = {
1031 /// let monitor = metrics_monitor.clone();
1032 /// metrics_monitor.instrument(async move {
1033 /// let mut interval = monitor.intervals();
1034 /// let mut next_interval = move || interval.next().unwrap();
1035 ///
1036 /// // [E] task is in the midst of its first poll
1037 /// assert_eq!(next_interval().total_poll_count, 0);
1038 ///
1039 /// tokio::task::yield_now().await; // poll 1
1040 ///
1041 /// // [F] task has been polled 1 time
1042 /// assert_eq!(next_interval().total_poll_count, 1);
1043 ///
1044 /// tokio::task::yield_now().await; // poll 2
1045 /// tokio::task::yield_now().await; // poll 3
1046 /// tokio::task::yield_now().await; // poll 4
1047 ///
1048 /// // [G] task has been polled 3 times
1049 /// assert_eq!(next_interval().total_poll_count, 3);
1050 ///
1051 /// tokio::task::yield_now().await; // poll 5
1052 ///
1053 /// next_interval // poll 6
1054 /// })
1055 /// };
1056 ///
1057 /// // [C] `task` has not yet been polled at all
1058 /// assert_eq!(metrics_monitor.cumulative().total_poll_count, 0);
1059 ///
1060 /// // [D] poll `task` to completion
1061 /// let mut next_interval = task.await;
1062 ///
1063 /// // [H] `task` has been polled 2 times since the last sample
1064 /// assert_eq!(next_interval().total_poll_count, 2);
1065 ///
1066 /// // [I] `task` has been polled 0 times since the last sample
1067 /// assert_eq!(next_interval().total_poll_count, 0);
1068 ///
1069 /// // [J] `task` has been polled 6 times
1070 /// assert_eq!(metrics_monitor.cumulative().total_poll_count, 6);
1071 /// }
1072 /// ```
1073 pub total_poll_count: u64,
1074
1075 /// The total duration elapsed during polls.
1076 ///
1077 /// ##### Definition
1078 /// This metric is equal to [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration]
1079 /// \+ [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration].
1080 ///
1081 /// ##### Derived metrics
1082 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
1083 /// The mean duration of polls.
1084 ///
1085 /// #### Examples
1086 /// ```
1087 /// use tokio::time::Duration;
1088 ///
1089 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
1090 /// async fn main() {
1091 /// let monitor = tokio_metrics::TaskMonitor::new();
1092 /// let mut interval = monitor.intervals();
1093 /// let mut next_interval = move || interval.next().unwrap();
1094 ///
1095 /// assert_eq!(next_interval().total_poll_duration, Duration::ZERO);
1096 ///
1097 /// monitor.instrument(async {
1098 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
1099 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
1100 /// () // poll 3 (0s)
1101 /// }).await;
1102 ///
1103 /// assert_eq!(next_interval().total_poll_duration, Duration::from_secs(2));
1104 /// }
1105 /// ```
1106 pub total_poll_duration: Duration,
1107
1108 /// The total number of times that polling tasks completed swiftly.
1109 ///
1110 /// Here, 'swiftly' is defined as completing in strictly less time than
1111 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1112 ///
1113 /// ##### Derived metrics
1114 /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1115 /// The mean duration of fast polls.
1116 ///
1117 /// ##### Examples
1118 /// In the below example, 0 polls occur within the first sampling interval, 3 fast polls occur
1119 /// within the second sampling interval, and 2 fast polls occur within the third sampling
1120 /// interval:
1121 /// ```
1122 /// use std::future::Future;
1123 /// use std::time::Duration;
1124 ///
1125 /// #[tokio::main]
1126 /// async fn main() {
1127 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1128 /// let mut interval = metrics_monitor.intervals();
1129 /// let mut next_interval = || interval.next().unwrap();
1130 ///
1131 /// // no tasks have been constructed, instrumented, or polled
1132 /// assert_eq!(next_interval().total_fast_poll_count, 0);
1133 ///
1134 /// let fast = Duration::ZERO;
1135 ///
1136 /// // this task completes in three fast polls
1137 /// let _ = metrics_monitor.instrument(async {
1138 /// spin_for(fast).await; // fast poll 1
1139 /// spin_for(fast).await; // fast poll 2
1140 /// spin_for(fast) // fast poll 3
1141 /// }).await;
1142 ///
1143 /// assert_eq!(next_interval().total_fast_poll_count, 3);
1144 ///
1145 /// // this task completes in two fast polls
1146 /// let _ = metrics_monitor.instrument(async {
1147 /// spin_for(fast).await; // fast poll 1
1148 /// spin_for(fast) // fast poll 2
1149 /// }).await;
1150 ///
1151 /// assert_eq!(next_interval().total_fast_poll_count, 2);
1152 /// }
1153 ///
1154 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1155 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1156 /// let start = tokio::time::Instant::now();
1157 /// while start.elapsed() <= duration {}
1158 /// tokio::task::yield_now()
1159 /// }
1160 /// ```
1161 pub total_fast_poll_count: u64,
1162
1163 /// The total duration of fast polls.
1164 ///
1165 /// Here, 'fast' is defined as completing in strictly less time than
1166 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1167 ///
1168 /// ##### Derived metrics
1169 /// - **[`mean_fast_poll_duration`][TaskMetrics::mean_fast_poll_duration]**
1170 /// The mean duration of fast polls.
1171 ///
1172 /// ##### Examples
1173 /// In the below example, no tasks are polled in the first sampling interval; three fast polls
1174 /// consume a total of 3μs time in the second sampling interval; and two fast polls consume a
1175 /// total of 2μs time in the third sampling interval:
1176 /// ```
1177 /// use std::future::Future;
1178 /// use std::time::Duration;
1179 ///
1180 /// #[tokio::main]
1181 /// async fn main() {
1182 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1183 /// let mut interval = metrics_monitor.intervals();
1184 /// let mut next_interval = || interval.next().unwrap();
1185 ///
1186 /// // no tasks have been constructed, instrumented, or polled
1187 /// let interval = next_interval();
1188 /// assert_eq!(interval.total_fast_poll_duration, Duration::ZERO);
1189 ///
1190 /// let fast = Duration::from_micros(1);
1191 ///
1192 /// // this task completes in three fast polls
1193 /// let task_a_time = time(metrics_monitor.instrument(async {
1194 /// spin_for(fast).await; // fast poll 1
1195 /// spin_for(fast).await; // fast poll 2
1196 /// spin_for(fast) // fast poll 3
1197 /// })).await;
1198 ///
1199 /// let interval = next_interval();
1200 /// assert!(interval.total_fast_poll_duration >= fast * 3);
1201 /// assert!(interval.total_fast_poll_duration <= task_a_time);
1202 ///
1203 /// // this task completes in two fast polls
1204 /// let task_b_time = time(metrics_monitor.instrument(async {
1205 /// spin_for(fast).await; // fast poll 1
1206 /// spin_for(fast) // fast poll 2
1207 /// })).await;
1208 ///
1209 /// let interval = next_interval();
1210 /// assert!(interval.total_fast_poll_duration >= fast * 2);
1211 /// assert!(interval.total_fast_poll_duration <= task_b_time);
1212 /// }
1213 ///
1214 /// /// Produces the amount of time it took to await a given async task.
1215 /// async fn time(task: impl Future) -> Duration {
1216 /// let start = tokio::time::Instant::now();
1217 /// task.await;
1218 /// start.elapsed()
1219 /// }
1220 ///
1221 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1222 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1223 /// let start = tokio::time::Instant::now();
1224 /// while start.elapsed() <= duration {}
1225 /// tokio::task::yield_now()
1226 /// }
1227 /// ```
1228 pub total_fast_poll_duration: Duration,
1229
1230 /// The total number of times that polling tasks completed slowly.
1231 ///
1232 /// Here, 'slowly' is defined as completing in at least as much time as
1233 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1234 ///
1235 /// ##### Derived metrics
1236 /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1237 /// The mean duration of slow polls.
1238 ///
1239 /// ##### Examples
1240 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1241 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1242 /// interval:
1243 /// ```
1244 /// use std::future::Future;
1245 /// use std::time::Duration;
1246 ///
1247 /// #[tokio::main]
1248 /// async fn main() {
1249 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1250 /// let mut interval = metrics_monitor.intervals();
1251 /// let mut next_interval = || interval.next().unwrap();
1252 ///
1253 /// // no tasks have been constructed, instrumented, or polled
1254 /// assert_eq!(next_interval().total_slow_poll_count, 0);
1255 ///
1256 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1257 ///
1258 /// // this task completes in three slow polls
1259 /// let _ = metrics_monitor.instrument(async {
1260 /// spin_for(slow).await; // slow poll 1
1261 /// spin_for(slow).await; // slow poll 2
1262 /// spin_for(slow) // slow poll 3
1263 /// }).await;
1264 ///
1265 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1266 ///
1267 /// // this task completes in two slow polls
1268 /// let _ = metrics_monitor.instrument(async {
1269 /// spin_for(slow).await; // slow poll 1
1270 /// spin_for(slow) // slow poll 2
1271 /// }).await;
1272 ///
1273 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1274 /// }
1275 ///
1276 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1277 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1278 /// let start = tokio::time::Instant::now();
1279 /// while start.elapsed() <= duration {}
1280 /// tokio::task::yield_now()
1281 /// }
1282 /// ```
1283 pub total_slow_poll_count: u64,
1284
1285 /// The total duration of slow polls.
1286 ///
1287 /// Here, 'slowly' is defined as completing in at least as much time as
1288 /// [`slow_poll_threshold`][TaskMonitor::slow_poll_threshold].
1289 ///
1290 /// ##### Derived metrics
1291 /// - **[`mean_slow_poll_duration`][`TaskMetrics::mean_slow_poll_duration`]**
1292 /// The mean duration of slow polls.
1293 ///
1294 /// ##### Examples
1295 /// In the below example, no tasks are polled in the first sampling interval; three slow polls
1296 /// consume a total of
1297 /// 30 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD]
1298 /// time in the second sampling interval; and two slow polls consume a total of
1299 /// 20 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
1300 /// third sampling interval:
1301 /// ```
1302 /// use std::future::Future;
1303 /// use std::time::Duration;
1304 ///
1305 /// #[tokio::main]
1306 /// async fn main() {
1307 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1308 /// let mut interval = metrics_monitor.intervals();
1309 /// let mut next_interval = || interval.next().unwrap();
1310 ///
1311 /// // no tasks have been constructed, instrumented, or polled
1312 /// let interval = next_interval();
1313 /// assert_eq!(interval.total_slow_poll_duration, Duration::ZERO);
1314 ///
1315 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1316 ///
1317 /// // this task completes in three slow polls
1318 /// let task_a_time = time(metrics_monitor.instrument(async {
1319 /// spin_for(slow).await; // slow poll 1
1320 /// spin_for(slow).await; // slow poll 2
1321 /// spin_for(slow) // slow poll 3
1322 /// })).await;
1323 ///
1324 /// let interval = next_interval();
1325 /// assert!(interval.total_slow_poll_duration >= slow * 3);
1326 /// assert!(interval.total_slow_poll_duration <= task_a_time);
1327 ///
1328 /// // this task completes in two slow polls
1329 /// let task_b_time = time(metrics_monitor.instrument(async {
1330 /// spin_for(slow).await; // slow poll 1
1331 /// spin_for(slow) // slow poll 2
1332 /// })).await;
1333 ///
1334 /// let interval = next_interval();
1335 /// assert!(interval.total_slow_poll_duration >= slow * 2);
1336 /// assert!(interval.total_slow_poll_duration <= task_b_time);
1337 /// }
1338 ///
1339 /// /// Produces the amount of time it took to await a given async task.
1340 /// async fn time(task: impl Future) -> Duration {
1341 /// let start = tokio::time::Instant::now();
1342 /// task.await;
1343 /// start.elapsed()
1344 /// }
1345 ///
1346 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1347 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1348 /// let start = tokio::time::Instant::now();
1349 /// while start.elapsed() <= duration {}
1350 /// tokio::task::yield_now()
1351 /// }
1352 /// ```
1353 pub total_slow_poll_duration: Duration,
1354
1355 /// The total count of tasks with short scheduling delays.
1356 ///
1357 /// This is defined as tasks taking strictly less than
1358 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1359 /// scheduled.
1360 ///
1361 /// ##### Derived metrics
1362 /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1363 /// The mean duration of short scheduling delays.
1364 pub total_short_delay_count: u64,
1365
1366 /// The total count of tasks with long scheduling delays.
1367 ///
1368 /// This is defined as tasks taking
1369 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] or longer to be executed
1370 /// after being scheduled.
1371 ///
1372 /// ##### Derived metrics
1373 /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1374 /// The mean duration of short scheduling delays.
1375 pub total_long_delay_count: u64,
1376
1377 /// The total duration of tasks with short scheduling delays.
1378 ///
1379 /// This is defined as tasks taking strictly less than
1380 /// [`long_delay_threshold`][TaskMonitor::long_delay_threshold] to be executed after being
1381 /// scheduled.
1382 ///
1383 /// ##### Derived metrics
1384 /// - **[`mean_short_delay_duration`][TaskMetrics::mean_short_delay_duration]**
1385 /// The mean duration of short scheduling delays.
1386 pub total_short_delay_duration: Duration,
1387
1388 /// The total number of times that a task had a long scheduling duration.
1389 ///
1390 /// Here, a long scheduling duration is defined as taking longer to start execution after
1391 /// scheduling than [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
1392 ///
1393 /// ##### Derived metrics
1394 /// - **[`mean_long_delay_duration`][TaskMetrics::mean_long_delay_duration]**
1395 /// The mean duration of short scheduling delays.
1396 pub total_long_delay_duration: Duration,
1397}
1398
1399/// Tracks the metrics, shared across the various types.
1400#[derive(Debug)]
1401struct RawMetrics {
1402 /// A task poll takes longer than this, it is considered a slow poll.
1403 slow_poll_threshold: Duration,
1404
1405 /// A scheduling delay of at least this long will be considered a long delay
1406 long_delay_threshold: Duration,
1407
1408 /// Total number of instrumented tasks.
1409 instrumented_count: AtomicU64,
1410
1411 /// Total number of instrumented tasks polled at least once.
1412 first_poll_count: AtomicU64,
1413
1414 /// Total number of times tasks entered the `idle` state.
1415 total_idled_count: AtomicU64,
1416
1417 /// Total number of times tasks were scheduled.
1418 total_scheduled_count: AtomicU64,
1419
1420 /// Total number of times tasks were polled fast
1421 total_fast_poll_count: AtomicU64,
1422
1423 /// Total number of times tasks were polled slow
1424 total_slow_poll_count: AtomicU64,
1425
1426 /// Total number of times tasks had long delay,
1427 total_long_delay_count: AtomicU64,
1428
1429 /// Total number of times tasks had little delay
1430 total_short_delay_count: AtomicU64,
1431
1432 /// Total number of times tasks were dropped
1433 dropped_count: AtomicU64,
1434
1435 /// Total amount of time until the first poll
1436 total_first_poll_delay_ns: AtomicU64,
1437
1438 /// Total amount of time tasks spent in the `idle` state.
1439 total_idle_duration_ns: AtomicU64,
1440
1441 /// Total amount of time tasks spent in the waking state.
1442 total_scheduled_duration_ns: AtomicU64,
1443
1444 /// Total amount of time tasks spent being polled below the slow cut off.
1445 total_fast_poll_duration_ns: AtomicU64,
1446
1447 /// Total amount of time tasks spent being polled above the slow cut off.
1448 total_slow_poll_duration: AtomicU64,
1449
1450 /// Total amount of time tasks spent being polled below the long delay cut off.
1451 total_short_delay_duration_ns: AtomicU64,
1452
1453 /// Total amount of time tasks spent being polled at or above the long delay cut off.
1454 total_long_delay_duration_ns: AtomicU64,
1455}
1456
1457#[derive(Debug)]
1458struct State {
1459 /// Where metrics should be recorded
1460 metrics: Arc<RawMetrics>,
1461
1462 /// Instant at which the task was instrumented. This is used to track the time to first poll.
1463 instrumented_at: Instant,
1464
1465 /// The instant, tracked as nanoseconds since `instrumented_at`, at which the future
1466 /// was last woken.
1467 woke_at: AtomicU64,
1468
1469 /// Waker to forward notifications to.
1470 waker: AtomicWaker,
1471}
1472
1473impl TaskMonitor {
1474 /// The default duration at which polls cross the threshold into being categorized as 'slow' is
1475 /// 50μs.
1476 #[cfg(not(test))]
1477 pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_micros(50);
1478 #[cfg(test)]
1479 #[allow(missing_docs)]
1480 pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = Duration::from_millis(500);
1481
1482 /// The default duration at which schedules cross the threshold into being categorized as 'long'
1483 /// is 50μs.
1484 #[cfg(not(test))]
1485 pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_micros(50);
1486 #[cfg(test)]
1487 #[allow(missing_docs)]
1488 pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = Duration::from_millis(500);
1489
1490 /// Constructs a new task monitor.
1491 ///
1492 /// Uses [`Self::DEFAULT_SLOW_POLL_THRESHOLD`] as the threshold at which polls will be
1493 /// considered 'slow'.
1494 ///
1495 /// Uses [`Self::DEFAULT_LONG_DELAY_THRESHOLD`] as the threshold at which scheduling will be
1496 /// considered 'long'.
1497 pub fn new() -> TaskMonitor {
1498 TaskMonitor::with_slow_poll_threshold(Self::DEFAULT_SLOW_POLL_THRESHOLD)
1499 }
1500
1501 /// Constructs a builder for a task monitor.
1502 pub fn builder() -> TaskMonitorBuilder {
1503 TaskMonitorBuilder::new()
1504 }
1505
1506 /// Constructs a new task monitor with a given threshold at which polls are considered 'slow'.
1507 ///
1508 /// ##### Selecting an appropriate threshold
1509 /// TODO. What advice can we give here?
1510 ///
1511 /// ##### Examples
1512 /// In the below example, low-threshold and high-threshold monitors are constructed and
1513 /// instrument identical tasks; the low-threshold monitor reports4 slow polls, and the
1514 /// high-threshold monitor reports only 2 slow polls:
1515 /// ```
1516 /// use std::future::Future;
1517 /// use std::time::Duration;
1518 /// use tokio_metrics::TaskMonitor;
1519 ///
1520 /// #[tokio::main]
1521 /// async fn main() {
1522 /// let lo_threshold = Duration::from_micros(10);
1523 /// let hi_threshold = Duration::from_millis(10);
1524 ///
1525 /// let lo_monitor = TaskMonitor::with_slow_poll_threshold(lo_threshold);
1526 /// let hi_monitor = TaskMonitor::with_slow_poll_threshold(hi_threshold);
1527 ///
1528 /// let make_task = || async {
1529 /// spin_for(lo_threshold).await; // faster poll 1
1530 /// spin_for(lo_threshold).await; // faster poll 2
1531 /// spin_for(hi_threshold).await; // slower poll 3
1532 /// spin_for(hi_threshold).await // slower poll 4
1533 /// };
1534 ///
1535 /// lo_monitor.instrument(make_task()).await;
1536 /// hi_monitor.instrument(make_task()).await;
1537 ///
1538 /// // the low-threshold monitor reported 4 slow polls:
1539 /// assert_eq!(lo_monitor.cumulative().total_slow_poll_count, 4);
1540 /// // the high-threshold monitor reported only 2 slow polls:
1541 /// assert_eq!(hi_monitor.cumulative().total_slow_poll_count, 2);
1542 /// }
1543 ///
1544 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1545 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1546 /// let start = tokio::time::Instant::now();
1547 /// while start.elapsed() <= duration {}
1548 /// tokio::task::yield_now()
1549 /// }
1550 /// ```
1551 pub fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitor {
1552 Self::create(slow_poll_cut_off, Self::DEFAULT_LONG_DELAY_THRESHOLD)
1553 }
1554
1555 fn create(slow_poll_cut_off: Duration, long_delay_cut_off: Duration) -> TaskMonitor {
1556 TaskMonitor {
1557 metrics: Arc::new(RawMetrics {
1558 slow_poll_threshold: slow_poll_cut_off,
1559 first_poll_count: AtomicU64::new(0),
1560 total_idled_count: AtomicU64::new(0),
1561 total_scheduled_count: AtomicU64::new(0),
1562 total_fast_poll_count: AtomicU64::new(0),
1563 total_slow_poll_count: AtomicU64::new(0),
1564 total_long_delay_count: AtomicU64::new(0),
1565 instrumented_count: AtomicU64::new(0),
1566 dropped_count: AtomicU64::new(0),
1567 total_first_poll_delay_ns: AtomicU64::new(0),
1568 total_scheduled_duration_ns: AtomicU64::new(0),
1569 total_idle_duration_ns: AtomicU64::new(0),
1570 total_fast_poll_duration_ns: AtomicU64::new(0),
1571 total_slow_poll_duration: AtomicU64::new(0),
1572 total_short_delay_duration_ns: AtomicU64::new(0),
1573 long_delay_threshold: long_delay_cut_off,
1574 total_short_delay_count: AtomicU64::new(0),
1575 total_long_delay_duration_ns: AtomicU64::new(0),
1576 }),
1577 }
1578 }
1579
1580 /// Produces the duration greater-than-or-equal-to at which polls are categorized as slow.
1581 ///
1582 /// ##### Examples
1583 /// In the below example, [`TaskMonitor`] is initialized with [`TaskMonitor::new`];
1584 /// consequently, its slow-poll threshold equals [`TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD`]:
1585 /// ```
1586 /// use tokio_metrics::TaskMonitor;
1587 ///
1588 /// #[tokio::main]
1589 /// async fn main() {
1590 /// let metrics_monitor = TaskMonitor::new();
1591 ///
1592 /// assert_eq!(
1593 /// metrics_monitor.slow_poll_threshold(),
1594 /// TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD
1595 /// );
1596 /// }
1597 /// ```
1598 pub fn slow_poll_threshold(&self) -> Duration {
1599 self.metrics.slow_poll_threshold
1600 }
1601
1602 /// Produces the duration greater-than-or-equal-to at which scheduling delays are categorized
1603 /// as long.
1604 pub fn long_delay_threshold(&self) -> Duration {
1605 self.metrics.long_delay_threshold
1606 }
1607
1608 /// Produces an instrumented façade around a given async task.
1609 ///
1610 /// ##### Examples
1611 /// Instrument an async task by passing it to [`TaskMonitor::instrument`]:
1612 /// ```
1613 /// #[tokio::main]
1614 /// async fn main() {
1615 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1616 ///
1617 /// // 0 tasks have been instrumented, much less polled
1618 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
1619 ///
1620 /// // instrument a task and poll it to completion
1621 /// metrics_monitor.instrument(async {}).await;
1622 ///
1623 /// // 1 task has been instrumented and polled
1624 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 1);
1625 ///
1626 /// // instrument a task and poll it to completion
1627 /// metrics_monitor.instrument(async {}).await;
1628 ///
1629 /// // 2 tasks have been instrumented and polled
1630 /// assert_eq!(metrics_monitor.cumulative().first_poll_count, 2);
1631 /// }
1632 /// ```
1633 /// An aync task may be tracked by multiple [`TaskMonitor`]s; e.g.:
1634 /// ```
1635 /// #[tokio::main]
1636 /// async fn main() {
1637 /// let monitor_a = tokio_metrics::TaskMonitor::new();
1638 /// let monitor_b = tokio_metrics::TaskMonitor::new();
1639 ///
1640 /// // 0 tasks have been instrumented, much less polled
1641 /// assert_eq!(monitor_a.cumulative().first_poll_count, 0);
1642 /// assert_eq!(monitor_b.cumulative().first_poll_count, 0);
1643 ///
1644 /// // instrument a task and poll it to completion
1645 /// monitor_a.instrument(monitor_b.instrument(async {})).await;
1646 ///
1647 /// // 1 task has been instrumented and polled
1648 /// assert_eq!(monitor_a.cumulative().first_poll_count, 1);
1649 /// assert_eq!(monitor_b.cumulative().first_poll_count, 1);
1650 /// }
1651 /// ```
1652 /// It is also possible (but probably undesirable) to instrument an async task multiple times
1653 /// with the same [`TaskMonitor`]; e.g.:
1654 /// ```
1655 /// #[tokio::main]
1656 /// async fn main() {
1657 /// let monitor = tokio_metrics::TaskMonitor::new();
1658 ///
1659 /// // 0 tasks have been instrumented, much less polled
1660 /// assert_eq!(monitor.cumulative().first_poll_count, 0);
1661 ///
1662 /// // instrument a task and poll it to completion
1663 /// monitor.instrument(monitor.instrument(async {})).await;
1664 ///
1665 /// // 2 tasks have been instrumented and polled, supposedly
1666 /// assert_eq!(monitor.cumulative().first_poll_count, 2);
1667 /// }
1668 /// ```
1669 pub fn instrument<F>(&self, task: F) -> Instrumented<F> {
1670 self.metrics.instrumented_count.fetch_add(1, SeqCst);
1671 Instrumented {
1672 task,
1673 did_poll_once: false,
1674 idled_at: 0,
1675 state: Arc::new(State {
1676 metrics: self.metrics.clone(),
1677 instrumented_at: Instant::now(),
1678 woke_at: AtomicU64::new(0),
1679 waker: AtomicWaker::new(),
1680 }),
1681 }
1682 }
1683
1684 /// Produces [`TaskMetrics`] for the tasks instrumented by this [`TaskMonitor`], collected since
1685 /// the construction of [`TaskMonitor`].
1686 ///
1687 /// ##### See also
1688 /// - [`TaskMonitor::intervals`]:
1689 /// produces [`TaskMetrics`] for user-defined sampling intervals, instead of cumulatively
1690 ///
1691 /// ##### Examples
1692 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1693 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1694 /// interval; five slow polls occur across all sampling intervals:
1695 /// ```
1696 /// use std::future::Future;
1697 /// use std::time::Duration;
1698 ///
1699 /// #[tokio::main]
1700 /// async fn main() {
1701 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1702 ///
1703 /// // initialize a stream of sampling intervals
1704 /// let mut intervals = metrics_monitor.intervals();
1705 /// // each call of `next_interval` will produce metrics for the last sampling interval
1706 /// let mut next_interval = || intervals.next().unwrap();
1707 ///
1708 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1709 ///
1710 /// // this task completes in three slow polls
1711 /// let _ = metrics_monitor.instrument(async {
1712 /// spin_for(slow).await; // slow poll 1
1713 /// spin_for(slow).await; // slow poll 2
1714 /// spin_for(slow) // slow poll 3
1715 /// }).await;
1716 ///
1717 /// // in the previous sampling interval, there were 3 slow polls
1718 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1719 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 3);
1720 ///
1721 /// // this task completes in two slow polls
1722 /// let _ = metrics_monitor.instrument(async {
1723 /// spin_for(slow).await; // slow poll 1
1724 /// spin_for(slow) // slow poll 2
1725 /// }).await;
1726 ///
1727 /// // in the previous sampling interval, there were 2 slow polls
1728 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1729 ///
1730 /// // across all sampling interval, there were a total of 5 slow polls
1731 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1732 /// }
1733 ///
1734 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1735 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1736 /// let start = tokio::time::Instant::now();
1737 /// while start.elapsed() <= duration {}
1738 /// tokio::task::yield_now()
1739 /// }
1740 /// ```
1741 pub fn cumulative(&self) -> TaskMetrics {
1742 self.metrics.metrics()
1743 }
1744
1745 /// Produces an unending iterator of metric sampling intervals.
1746 ///
1747 /// Each sampling interval is defined by the time elapsed between advancements of the iterator
1748 /// produced by [`TaskMonitor::intervals`]. The item type of this iterator is [`TaskMetrics`],
1749 /// which is a bundle of task metrics that describe *only* events occurring within that sampling
1750 /// interval.
1751 ///
1752 /// ##### Examples
1753 /// In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur
1754 /// within the second sampling interval, and 2 slow polls occur within the third sampling
1755 /// interval; five slow polls occur across all sampling intervals:
1756 /// ```
1757 /// use std::future::Future;
1758 /// use std::time::Duration;
1759 ///
1760 /// #[tokio::main]
1761 /// async fn main() {
1762 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1763 ///
1764 /// // initialize a stream of sampling intervals
1765 /// let mut intervals = metrics_monitor.intervals();
1766 /// // each call of `next_interval` will produce metrics for the last sampling interval
1767 /// let mut next_interval = || intervals.next().unwrap();
1768 ///
1769 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
1770 ///
1771 /// // this task completes in three slow polls
1772 /// let _ = metrics_monitor.instrument(async {
1773 /// spin_for(slow).await; // slow poll 1
1774 /// spin_for(slow).await; // slow poll 2
1775 /// spin_for(slow) // slow poll 3
1776 /// }).await;
1777 ///
1778 /// // in the previous sampling interval, there were 3 slow polls
1779 /// assert_eq!(next_interval().total_slow_poll_count, 3);
1780 ///
1781 /// // this task completes in two slow polls
1782 /// let _ = metrics_monitor.instrument(async {
1783 /// spin_for(slow).await; // slow poll 1
1784 /// spin_for(slow) // slow poll 2
1785 /// }).await;
1786 ///
1787 /// // in the previous sampling interval, there were 2 slow polls
1788 /// assert_eq!(next_interval().total_slow_poll_count, 2);
1789 ///
1790 /// // across all sampling intervals, there were a total of 5 slow polls
1791 /// assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
1792 /// }
1793 ///
1794 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
1795 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
1796 /// let start = tokio::time::Instant::now();
1797 /// while start.elapsed() <= duration {}
1798 /// tokio::task::yield_now()
1799 /// }
1800 /// ```
1801 pub fn intervals(&self) -> impl Iterator<Item = TaskMetrics> {
1802 let latest = self.metrics.clone();
1803 let mut previous: Option<TaskMetrics> = None;
1804
1805 std::iter::from_fn(move || {
1806 let latest: TaskMetrics = latest.metrics();
1807 let next = if let Some(previous) = previous {
1808 TaskMetrics {
1809 instrumented_count: latest
1810 .instrumented_count
1811 .wrapping_sub(previous.instrumented_count),
1812 dropped_count: latest.dropped_count.wrapping_sub(previous.dropped_count),
1813 total_poll_count: latest
1814 .total_poll_count
1815 .wrapping_sub(previous.total_poll_count),
1816 total_poll_duration: sub(
1817 latest.total_poll_duration,
1818 previous.total_poll_duration,
1819 ),
1820 first_poll_count: latest
1821 .first_poll_count
1822 .wrapping_sub(previous.first_poll_count),
1823 total_idled_count: latest
1824 .total_idled_count
1825 .wrapping_sub(previous.total_idled_count),
1826 total_scheduled_count: latest
1827 .total_scheduled_count
1828 .wrapping_sub(previous.total_scheduled_count),
1829 total_fast_poll_count: latest
1830 .total_fast_poll_count
1831 .wrapping_sub(previous.total_fast_poll_count),
1832 total_short_delay_count: latest
1833 .total_short_delay_count
1834 .wrapping_sub(previous.total_short_delay_count),
1835 total_slow_poll_count: latest
1836 .total_slow_poll_count
1837 .wrapping_sub(previous.total_slow_poll_count),
1838 total_long_delay_count: latest
1839 .total_long_delay_count
1840 .wrapping_sub(previous.total_long_delay_count),
1841 total_first_poll_delay: sub(
1842 latest.total_first_poll_delay,
1843 previous.total_first_poll_delay,
1844 ),
1845 total_idle_duration: sub(
1846 latest.total_idle_duration,
1847 previous.total_idle_duration,
1848 ),
1849 total_scheduled_duration: sub(
1850 latest.total_scheduled_duration,
1851 previous.total_scheduled_duration,
1852 ),
1853 total_fast_poll_duration: sub(
1854 latest.total_fast_poll_duration,
1855 previous.total_fast_poll_duration,
1856 ),
1857 total_short_delay_duration: sub(
1858 latest.total_short_delay_duration,
1859 previous.total_short_delay_duration,
1860 ),
1861 total_slow_poll_duration: sub(
1862 latest.total_slow_poll_duration,
1863 previous.total_slow_poll_duration,
1864 ),
1865 total_long_delay_duration: sub(
1866 latest.total_long_delay_duration,
1867 previous.total_long_delay_duration,
1868 ),
1869 }
1870 } else {
1871 latest
1872 };
1873
1874 previous = Some(latest);
1875
1876 Some(next)
1877 })
1878 }
1879}
1880
1881impl RawMetrics {
1882 fn metrics(&self) -> TaskMetrics {
1883 let total_fast_poll_count = self.total_fast_poll_count.load(SeqCst);
1884 let total_slow_poll_count = self.total_slow_poll_count.load(SeqCst);
1885
1886 let total_fast_poll_duration =
1887 Duration::from_nanos(self.total_fast_poll_duration_ns.load(SeqCst));
1888 let total_slow_poll_duration =
1889 Duration::from_nanos(self.total_slow_poll_duration.load(SeqCst));
1890
1891 let total_poll_count = total_fast_poll_count + total_slow_poll_count;
1892 let total_poll_duration = total_fast_poll_duration + total_slow_poll_duration;
1893
1894 TaskMetrics {
1895 instrumented_count: self.instrumented_count.load(SeqCst),
1896 dropped_count: self.dropped_count.load(SeqCst),
1897
1898 total_poll_count,
1899 total_poll_duration,
1900 first_poll_count: self.first_poll_count.load(SeqCst),
1901 total_idled_count: self.total_idled_count.load(SeqCst),
1902 total_scheduled_count: self.total_scheduled_count.load(SeqCst),
1903 total_fast_poll_count: self.total_fast_poll_count.load(SeqCst),
1904 total_slow_poll_count: self.total_slow_poll_count.load(SeqCst),
1905 total_short_delay_count: self.total_short_delay_count.load(SeqCst),
1906 total_long_delay_count: self.total_long_delay_count.load(SeqCst),
1907 total_first_poll_delay: Duration::from_nanos(
1908 self.total_first_poll_delay_ns.load(SeqCst),
1909 ),
1910 total_idle_duration: Duration::from_nanos(self.total_idle_duration_ns.load(SeqCst)),
1911 total_scheduled_duration: Duration::from_nanos(
1912 self.total_scheduled_duration_ns.load(SeqCst),
1913 ),
1914 total_fast_poll_duration: Duration::from_nanos(
1915 self.total_fast_poll_duration_ns.load(SeqCst),
1916 ),
1917 total_slow_poll_duration: Duration::from_nanos(
1918 self.total_slow_poll_duration.load(SeqCst),
1919 ),
1920 total_short_delay_duration: Duration::from_nanos(
1921 self.total_short_delay_duration_ns.load(SeqCst),
1922 ),
1923 total_long_delay_duration: Duration::from_nanos(
1924 self.total_long_delay_duration_ns.load(SeqCst),
1925 ),
1926 }
1927 }
1928}
1929
1930impl Default for TaskMonitor {
1931 fn default() -> TaskMonitor {
1932 TaskMonitor::new()
1933 }
1934}
1935
1936impl TaskMetrics {
1937 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
1938 /// are first polled.
1939 ///
1940 /// ##### Definition
1941 /// This metric is derived from [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay]
1942 /// ÷ [`first_poll_count`][TaskMetrics::first_poll_count].
1943 ///
1944 /// ##### Interpretation
1945 /// If this metric increases, it means that, on average, tasks spent longer waiting to be
1946 /// initially polled.
1947 ///
1948 /// ##### See also
1949 /// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
1950 /// The mean duration that tasks spent waiting to be executed after awakening.
1951 ///
1952 /// ##### Examples
1953 /// In the below example, no tasks are instrumented or polled within the first sampling
1954 /// interval; in the second sampling interval, 500ms elapse between the instrumentation of a
1955 /// task and its first poll; in the third sampling interval, a mean of 750ms elapse between the
1956 /// instrumentation and first poll of two tasks:
1957 /// ```
1958 /// use std::time::Duration;
1959 ///
1960 /// #[tokio::main]
1961 /// async fn main() {
1962 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
1963 /// let mut interval = metrics_monitor.intervals();
1964 /// let mut next_interval = || interval.next().unwrap();
1965 ///
1966 /// // no tasks have yet been created, instrumented, or polled
1967 /// assert_eq!(next_interval().mean_first_poll_delay(), Duration::ZERO);
1968 ///
1969 /// // constructs and instruments a task, pauses for `pause_time`, awaits the task, then
1970 /// // produces the total time it took to do all of the aforementioned
1971 /// async fn instrument_pause_await(
1972 /// metrics_monitor: &tokio_metrics::TaskMonitor,
1973 /// pause_time: Duration
1974 /// ) -> Duration
1975 /// {
1976 /// let before_instrumentation = tokio::time::Instant::now();
1977 /// let task = metrics_monitor.instrument(async move {});
1978 /// tokio::time::sleep(pause_time).await;
1979 /// task.await;
1980 /// before_instrumentation.elapsed()
1981 /// }
1982 ///
1983 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
1984 /// let task_a_pause_time = Duration::from_millis(500);
1985 /// let task_a_total_time = instrument_pause_await(&metrics_monitor, task_a_pause_time).await;
1986 ///
1987 /// // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
1988 /// // pause time of 500ms, and less-than-or-equal-to the total runtime of `task_a`
1989 /// let mean_first_poll_delay = next_interval().mean_first_poll_delay();
1990 /// assert!(mean_first_poll_delay >= task_a_pause_time);
1991 /// assert!(mean_first_poll_delay <= task_a_total_time);
1992 ///
1993 /// // construct and await a task that pauses for 500ms between instrumentation and first poll
1994 /// let task_b_pause_time = Duration::from_millis(500);
1995 /// let task_b_total_time = instrument_pause_await(&metrics_monitor, task_b_pause_time).await;
1996 ///
1997 /// // construct and await a task that pauses for 1000ms between instrumentation and first poll
1998 /// let task_c_pause_time = Duration::from_millis(1000);
1999 /// let task_c_total_time = instrument_pause_await(&metrics_monitor, task_c_pause_time).await;
2000 ///
2001 /// // the `mean_first_poll_delay` will be some duration greater-than-or-equal-to the
2002 /// // average pause time of 500ms, and less-than-or-equal-to the combined total runtime of
2003 /// // `task_b` and `task_c`
2004 /// let mean_first_poll_delay = next_interval().mean_first_poll_delay();
2005 /// assert!(mean_first_poll_delay >= (task_b_pause_time + task_c_pause_time) / 2);
2006 /// assert!(mean_first_poll_delay <= (task_b_total_time + task_c_total_time) / 2);
2007 /// }
2008 /// ```
2009 pub fn mean_first_poll_delay(&self) -> Duration {
2010 mean(self.total_first_poll_delay, self.first_poll_count)
2011 }
2012
2013 /// The mean duration of idles.
2014 ///
2015 /// ##### Definition
2016 /// This metric is derived from [`total_idle_duration`][TaskMetrics::total_idle_duration] ÷
2017 /// [`total_idled_count`][TaskMetrics::total_idled_count].
2018 ///
2019 /// ##### Interpretation
2020 /// The idle state is the duration spanning the instant a task completes a poll, and the instant
2021 /// that it is next awoken. Tasks inhabit this state when they are waiting for task-external
2022 /// events to complete (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this
2023 /// metric increases, it means that tasks, in aggregate, spent more time waiting for
2024 /// task-external events to complete.
2025 ///
2026 /// ##### Examples
2027 /// ```
2028 /// #[tokio::main]
2029 /// async fn main() {
2030 /// let monitor = tokio_metrics::TaskMonitor::new();
2031 /// let one_sec = std::time::Duration::from_secs(1);
2032 ///
2033 /// monitor.instrument(async move {
2034 /// tokio::time::sleep(one_sec).await;
2035 /// }).await;
2036 ///
2037 /// assert!(monitor.cumulative().mean_idle_duration() >= one_sec);
2038 /// }
2039 /// ```
2040 pub fn mean_idle_duration(&self) -> Duration {
2041 mean(self.total_idle_duration, self.total_idled_count)
2042 }
2043
2044 /// The mean duration that tasks spent waiting to be executed after awakening.
2045 ///
2046 /// ##### Definition
2047 /// This metric is derived from
2048 /// [`total_scheduled_duration`][TaskMetrics::total_scheduled_duration] ÷
2049 /// [`total_scheduled_count`][`TaskMetrics::total_scheduled_count`].
2050 ///
2051 /// ##### Interpretation
2052 /// If this metric increases, it means that, on average, tasks spent longer in the runtime's
2053 /// queues before being polled.
2054 ///
2055 /// ##### See also
2056 /// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
2057 /// The mean duration elapsed between the instant tasks are instrumented, and the instant they
2058 /// are first polled.
2059 ///
2060 /// ##### Examples
2061 /// ```
2062 /// use tokio::time::Duration;
2063 ///
2064 /// #[tokio::main(flavor = "current_thread")]
2065 /// async fn main() {
2066 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2067 /// let mut interval = metrics_monitor.intervals();
2068 /// let mut next_interval = || interval.next().unwrap();
2069 ///
2070 /// // construct and instrument and spawn a task that yields endlessly
2071 /// tokio::spawn(metrics_monitor.instrument(async {
2072 /// loop { tokio::task::yield_now().await }
2073 /// }));
2074 ///
2075 /// tokio::task::yield_now().await;
2076 ///
2077 /// // block the executor for 1 second
2078 /// std::thread::sleep(Duration::from_millis(1000));
2079 ///
2080 /// // get the task to run twice
2081 /// // the first will have a 1 sec scheduling delay, the second will have almost none
2082 /// tokio::task::yield_now().await;
2083 /// tokio::task::yield_now().await;
2084 ///
2085 /// // `endless_task` will have spent approximately one second waiting
2086 /// let mean_scheduled_duration = next_interval().mean_scheduled_duration();
2087 /// assert!(mean_scheduled_duration >= Duration::from_millis(500), "{}", mean_scheduled_duration.as_secs_f64());
2088 /// assert!(mean_scheduled_duration <= Duration::from_millis(600), "{}", mean_scheduled_duration.as_secs_f64());
2089 /// }
2090 /// ```
2091 pub fn mean_scheduled_duration(&self) -> Duration {
2092 mean(self.total_scheduled_duration, self.total_scheduled_count)
2093 }
2094
2095 /// The mean duration of polls.
2096 ///
2097 /// ##### Definition
2098 /// This metric is derived from [`total_poll_duration`][TaskMetrics::total_poll_duration] ÷
2099 /// [`total_poll_count`][TaskMetrics::total_poll_count].
2100 ///
2101 /// ##### Interpretation
2102 /// If this metric increases, it means that, on average, individual polls are tending to take
2103 /// longer. However, this does not necessarily imply increased task latency: An increase in poll
2104 /// durations could be offset by fewer polls.
2105 ///
2106 /// ##### See also
2107 /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2108 /// The ratio between the number polls categorized as slow and fast.
2109 /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2110 /// The mean duration of slow polls.
2111 ///
2112 /// ##### Examples
2113 /// ```
2114 /// use std::time::Duration;
2115 ///
2116 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
2117 /// async fn main() {
2118 /// let monitor = tokio_metrics::TaskMonitor::new();
2119 /// let mut interval = monitor.intervals();
2120 /// let mut next_interval = move || interval.next().unwrap();
2121 ///
2122 /// assert_eq!(next_interval().mean_poll_duration(), Duration::ZERO);
2123 ///
2124 /// monitor.instrument(async {
2125 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 1 (1s)
2126 /// tokio::time::advance(Duration::from_secs(1)).await; // poll 2 (1s)
2127 /// () // poll 3 (0s)
2128 /// }).await;
2129 ///
2130 /// assert_eq!(next_interval().mean_poll_duration(), Duration::from_secs(2) / 3);
2131 /// }
2132 /// ```
2133 pub fn mean_poll_duration(&self) -> Duration {
2134 mean(self.total_poll_duration, self.total_poll_count)
2135 }
2136
2137 /// The ratio between the number polls categorized as slow and fast.
2138 ///
2139 /// ##### Definition
2140 /// This metric is derived from [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count] ÷
2141 /// [`total_poll_count`][TaskMetrics::total_poll_count].
2142 ///
2143 /// ##### Interpretation
2144 /// If this metric increases, it means that a greater proportion of polls took excessively long
2145 /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2146 /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2147 /// However, as a rule, *should* yield to the scheduler frequently.
2148 ///
2149 /// ##### See also
2150 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2151 /// The mean duration of polls.
2152 /// - **[`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration]**
2153 /// The mean duration of slow polls.
2154 ///
2155 /// ##### Examples
2156 /// Changes in this metric may be observed by varying the ratio of slow and slow fast within
2157 /// sampling intervals; for instance:
2158 /// ```
2159 /// use std::future::Future;
2160 /// use std::time::Duration;
2161 ///
2162 /// #[tokio::main]
2163 /// async fn main() {
2164 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2165 /// let mut interval = metrics_monitor.intervals();
2166 /// let mut next_interval = || interval.next().unwrap();
2167 ///
2168 /// // no tasks have been constructed, instrumented, or polled
2169 /// let interval = next_interval();
2170 /// assert_eq!(interval.total_fast_poll_count, 0);
2171 /// assert_eq!(interval.total_slow_poll_count, 0);
2172 /// assert!(interval.slow_poll_ratio().is_nan());
2173 ///
2174 /// let fast = Duration::ZERO;
2175 /// let slow = 10 * metrics_monitor.slow_poll_threshold();
2176 ///
2177 /// // this task completes in three fast polls
2178 /// metrics_monitor.instrument(async {
2179 /// spin_for(fast).await; // fast poll 1
2180 /// spin_for(fast).await; // fast poll 2
2181 /// spin_for(fast); // fast poll 3
2182 /// }).await;
2183 ///
2184 /// // this task completes in two slow polls
2185 /// metrics_monitor.instrument(async {
2186 /// spin_for(slow).await; // slow poll 1
2187 /// spin_for(slow); // slow poll 2
2188 /// }).await;
2189 ///
2190 /// let interval = next_interval();
2191 /// assert_eq!(interval.total_fast_poll_count, 3);
2192 /// assert_eq!(interval.total_slow_poll_count, 2);
2193 /// assert_eq!(interval.slow_poll_ratio(), ratio(2., 3.));
2194 ///
2195 /// // this task completes in three slow polls
2196 /// metrics_monitor.instrument(async {
2197 /// spin_for(slow).await; // slow poll 1
2198 /// spin_for(slow).await; // slow poll 2
2199 /// spin_for(slow); // slow poll 3
2200 /// }).await;
2201 ///
2202 /// // this task completes in two fast polls
2203 /// metrics_monitor.instrument(async {
2204 /// spin_for(fast).await; // fast poll 1
2205 /// spin_for(fast); // fast poll 2
2206 /// }).await;
2207 ///
2208 /// let interval = next_interval();
2209 /// assert_eq!(interval.total_fast_poll_count, 2);
2210 /// assert_eq!(interval.total_slow_poll_count, 3);
2211 /// assert_eq!(interval.slow_poll_ratio(), ratio(3., 2.));
2212 /// }
2213 ///
2214 /// fn ratio(a: f64, b: f64) -> f64 {
2215 /// a / (a + b)
2216 /// }
2217 ///
2218 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2219 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2220 /// let start = tokio::time::Instant::now();
2221 /// while start.elapsed() <= duration {}
2222 /// tokio::task::yield_now()
2223 /// }
2224 /// ```
2225 pub fn slow_poll_ratio(&self) -> f64 {
2226 self.total_slow_poll_count as f64 / self.total_poll_count as f64
2227 }
2228
2229 /// The ratio of tasks exceeding [`long_delay_threshold`][TaskMonitor::long_delay_threshold].
2230 ///
2231 /// ##### Definition
2232 /// This metric is derived from [`total_long_delay_count`][TaskMetrics::total_long_delay_count] ÷
2233 /// [`total_scheduled_count`][TaskMetrics::total_scheduled_count].
2234 pub fn long_delay_ratio(&self) -> f64 {
2235 self.total_long_delay_count as f64 / self.total_scheduled_count as f64
2236 }
2237
2238 /// The mean duration of fast polls.
2239 ///
2240 /// ##### Definition
2241 /// This metric is derived from
2242 /// [`total_fast_poll_duration`][TaskMetrics::total_fast_poll_duration] ÷
2243 /// [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count].
2244 ///
2245 /// ##### Examples
2246 /// In the below example, no tasks are polled in the first sampling interval; three fast polls
2247 /// consume a mean of
2248 /// ⅜ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2249 /// second sampling interval; and two fast polls consume a total of
2250 /// ½ × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2251 /// third sampling interval:
2252 /// ```
2253 /// use std::future::Future;
2254 /// use std::time::Duration;
2255 ///
2256 /// #[tokio::main]
2257 /// async fn main() {
2258 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2259 /// let mut interval = metrics_monitor.intervals();
2260 /// let mut next_interval = || interval.next().unwrap();
2261 ///
2262 /// // no tasks have been constructed, instrumented, or polled
2263 /// assert_eq!(next_interval().mean_fast_poll_duration(), Duration::ZERO);
2264 ///
2265 /// let threshold = metrics_monitor.slow_poll_threshold();
2266 /// let fast_1 = 1 * Duration::from_micros(1);
2267 /// let fast_2 = 2 * Duration::from_micros(1);
2268 /// let fast_3 = 3 * Duration::from_micros(1);
2269 ///
2270 /// // this task completes in two fast polls
2271 /// let total_time = time(metrics_monitor.instrument(async {
2272 /// spin_for(fast_1).await; // fast poll 1
2273 /// spin_for(fast_2) // fast poll 2
2274 /// })).await;
2275 ///
2276 /// // `mean_fast_poll_duration` ≈ the mean of `fast_1` and `fast_2`
2277 /// let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2278 /// assert!(mean_fast_poll_duration >= (fast_1 + fast_2) / 2);
2279 /// assert!(mean_fast_poll_duration <= total_time / 2);
2280 ///
2281 /// // this task completes in three fast polls
2282 /// let total_time = time(metrics_monitor.instrument(async {
2283 /// spin_for(fast_1).await; // fast poll 1
2284 /// spin_for(fast_2).await; // fast poll 2
2285 /// spin_for(fast_3) // fast poll 3
2286 /// })).await;
2287 ///
2288 /// // `mean_fast_poll_duration` ≈ the mean of `fast_1`, `fast_2`, `fast_3`
2289 /// let mean_fast_poll_duration = next_interval().mean_fast_poll_duration();
2290 /// assert!(mean_fast_poll_duration >= (fast_1 + fast_2 + fast_3) / 3);
2291 /// assert!(mean_fast_poll_duration <= total_time / 3);
2292 /// }
2293 ///
2294 /// /// Produces the amount of time it took to await a given task.
2295 /// async fn time(task: impl Future) -> Duration {
2296 /// let start = tokio::time::Instant::now();
2297 /// task.await;
2298 /// start.elapsed()
2299 /// }
2300 ///
2301 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2302 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2303 /// let start = tokio::time::Instant::now();
2304 /// while start.elapsed() <= duration {}
2305 /// tokio::task::yield_now()
2306 /// }
2307 /// ```
2308 pub fn mean_fast_poll_duration(&self) -> Duration {
2309 mean(self.total_fast_poll_duration, self.total_fast_poll_count)
2310 }
2311
2312 /// The average time taken for a task with a short scheduling delay to be executed after being
2313 /// scheduled.
2314 ///
2315 /// ##### Definition
2316 /// This metric is derived from
2317 /// [`total_short_delay_duration`][TaskMetrics::total_short_delay_duration] ÷
2318 /// [`total_short_delay_count`][TaskMetrics::total_short_delay_count].
2319 pub fn mean_short_delay_duration(&self) -> Duration {
2320 mean(
2321 self.total_short_delay_duration,
2322 self.total_short_delay_count,
2323 )
2324 }
2325
2326 /// The mean duration of slow polls.
2327 ///
2328 /// ##### Definition
2329 /// This metric is derived from
2330 /// [`total_slow_poll_duration`][TaskMetrics::total_slow_poll_duration] ÷
2331 /// [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
2332 ///
2333 /// ##### Interpretation
2334 /// If this metric increases, it means that a greater proportion of polls took excessively long
2335 /// before yielding to the scheduler. This does not necessarily imply increased task latency:
2336 /// An increase in the proportion of slow polls could be offset by fewer or faster polls.
2337 ///
2338 /// ##### See also
2339 /// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
2340 /// The mean duration of polls.
2341 /// - **[`slow_poll_ratio`][TaskMetrics::slow_poll_ratio]**
2342 /// The ratio between the number polls categorized as slow and fast.
2343 ///
2344 /// ##### Interpretation
2345 /// If this metric increases, it means that, on average, slow polls got even slower. This does
2346 /// necessarily imply increased task latency: An increase in average slow poll duration could be
2347 /// offset by fewer or faster polls. However, as a rule, *should* yield to the scheduler
2348 /// frequently.
2349 ///
2350 /// ##### Examples
2351 /// In the below example, no tasks are polled in the first sampling interval; three slow polls
2352 /// consume a mean of
2353 /// 1.5 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2354 /// second sampling interval; and two slow polls consume a total of
2355 /// 2 × [`DEFAULT_SLOW_POLL_THRESHOLD`][TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD] time in the
2356 /// third sampling interval:
2357 /// ```
2358 /// use std::future::Future;
2359 /// use std::time::Duration;
2360 ///
2361 /// #[tokio::main]
2362 /// async fn main() {
2363 /// let metrics_monitor = tokio_metrics::TaskMonitor::new();
2364 /// let mut interval = metrics_monitor.intervals();
2365 /// let mut next_interval = || interval.next().unwrap();
2366 ///
2367 /// // no tasks have been constructed, instrumented, or polled
2368 /// assert_eq!(next_interval().mean_slow_poll_duration(), Duration::ZERO);
2369 ///
2370 /// let threshold = metrics_monitor.slow_poll_threshold();
2371 /// let slow_1 = 1 * threshold;
2372 /// let slow_2 = 2 * threshold;
2373 /// let slow_3 = 3 * threshold;
2374 ///
2375 /// // this task completes in two slow polls
2376 /// let total_time = time(metrics_monitor.instrument(async {
2377 /// spin_for(slow_1).await; // slow poll 1
2378 /// spin_for(slow_2) // slow poll 2
2379 /// })).await;
2380 ///
2381 /// // `mean_slow_poll_duration` ≈ the mean of `slow_1` and `slow_2`
2382 /// let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2383 /// assert!(mean_slow_poll_duration >= (slow_1 + slow_2) / 2);
2384 /// assert!(mean_slow_poll_duration <= total_time / 2);
2385 ///
2386 /// // this task completes in three slow polls
2387 /// let total_time = time(metrics_monitor.instrument(async {
2388 /// spin_for(slow_1).await; // slow poll 1
2389 /// spin_for(slow_2).await; // slow poll 2
2390 /// spin_for(slow_3) // slow poll 3
2391 /// })).await;
2392 ///
2393 /// // `mean_slow_poll_duration` ≈ the mean of `slow_1`, `slow_2`, `slow_3`
2394 /// let mean_slow_poll_duration = next_interval().mean_slow_poll_duration();
2395 /// assert!(mean_slow_poll_duration >= (slow_1 + slow_2 + slow_3) / 3);
2396 /// assert!(mean_slow_poll_duration <= total_time / 3);
2397 /// }
2398 ///
2399 /// /// Produces the amount of time it took to await a given task.
2400 /// async fn time(task: impl Future) -> Duration {
2401 /// let start = tokio::time::Instant::now();
2402 /// task.await;
2403 /// start.elapsed()
2404 /// }
2405 ///
2406 /// /// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
2407 /// fn spin_for(duration: Duration) -> impl Future<Output=()> {
2408 /// let start = tokio::time::Instant::now();
2409 /// while start.elapsed() <= duration {}
2410 /// tokio::task::yield_now()
2411 /// }
2412 /// ```
2413 pub fn mean_slow_poll_duration(&self) -> Duration {
2414 mean(self.total_slow_poll_duration, self.total_slow_poll_count)
2415 }
2416
2417 /// The average scheduling delay for a task which takes a long time to start executing after
2418 /// being scheduled.
2419 ///
2420 /// ##### Definition
2421 /// This metric is derived from
2422 /// [`total_long_delay_duration`][TaskMetrics::total_long_delay_duration] ÷
2423 /// [`total_long_delay_count`][TaskMetrics::total_long_delay_count].
2424 pub fn mean_long_delay_duration(&self) -> Duration {
2425 mean(self.total_long_delay_duration, self.total_long_delay_count)
2426 }
2427}
2428
2429impl<T: Future> Future for Instrumented<T> {
2430 type Output = T::Output;
2431
2432 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2433 instrument_poll(cx, self, Future::poll)
2434 }
2435}
2436
2437impl<T: Stream> Stream for Instrumented<T> {
2438 type Item = T::Item;
2439
2440 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2441 instrument_poll(cx, self, Stream::poll_next)
2442 }
2443}
2444
2445fn instrument_poll<T, Out>(
2446 cx: &mut Context<'_>,
2447 instrumented: Pin<&mut Instrumented<T>>,
2448 poll_fn: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Out>,
2449) -> Poll<Out> {
2450 let poll_start = Instant::now();
2451 let this = instrumented.project();
2452 let idled_at = this.idled_at;
2453 let state = this.state;
2454 let instrumented_at = state.instrumented_at;
2455 let metrics = &state.metrics;
2456 /* accounting for time-to-first-poll and tasks-count */
2457 // is this the first time this task has been polled?
2458 if !*this.did_poll_once {
2459 // if so, we need to do three things:
2460 /* 1. note that this task *has* been polled */
2461 *this.did_poll_once = true;
2462
2463 /* 2. account for the time-to-first-poll of this task */
2464 // if the time-to-first-poll of this task exceeds `u64::MAX` ns,
2465 // round down to `u64::MAX` nanoseconds
2466 let elapsed = (poll_start - instrumented_at)
2467 .as_nanos()
2468 .try_into()
2469 .unwrap_or(u64::MAX);
2470 // add this duration to `time_to_first_poll_ns_total`
2471 metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst);
2472
2473 /* 3. increment the count of tasks that have been polled at least once */
2474 state.metrics.first_poll_count.fetch_add(1, SeqCst);
2475 }
2476 /* accounting for time-idled and time-scheduled */
2477 // 1. note (and reset) the instant this task was last awoke
2478 let woke_at = state.woke_at.swap(0, SeqCst);
2479 // The state of a future is *idling* in the interim between the instant
2480 // it completes a `poll`, and the instant it is next awoken.
2481 if *idled_at < woke_at {
2482 // increment the counter of how many idles occurred
2483 metrics.total_idled_count.fetch_add(1, SeqCst);
2484
2485 // compute the duration of the idle
2486 let idle_ns = woke_at - *idled_at;
2487
2488 // adjust the total elapsed time monitored tasks spent idling
2489 metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst);
2490 }
2491 // if this task spent any time in the scheduled state after instrumentation,
2492 // and after first poll, `woke_at` will be greater than 0.
2493 if woke_at > 0 {
2494 // increment the counter of how many schedules occurred
2495 metrics.total_scheduled_count.fetch_add(1, SeqCst);
2496
2497 // recall that the `woke_at` field is internally represented as
2498 // nanoseconds-since-instrumentation. here, for accounting purposes,
2499 // we need to instead represent it as a proper `Instant`.
2500 let woke_instant = instrumented_at + Duration::from_nanos(woke_at);
2501
2502 // the duration this task spent scheduled is time time elapsed between
2503 // when this task was awoke, and when it was polled.
2504 let scheduled_ns = (poll_start - woke_instant)
2505 .as_nanos()
2506 .try_into()
2507 .unwrap_or(u64::MAX);
2508
2509 let scheduled = Duration::from_nanos(scheduled_ns);
2510
2511 let (count_bucket, duration_bucket) = // was the scheduling delay long or short?
2512 if scheduled >= metrics.long_delay_threshold {
2513 (&metrics.total_long_delay_count, &metrics.total_long_delay_duration_ns)
2514 } else {
2515 (&metrics.total_short_delay_count, &metrics.total_short_delay_duration_ns)
2516 };
2517 // update the appropriate bucket
2518 count_bucket.fetch_add(1, SeqCst);
2519 duration_bucket.fetch_add(scheduled_ns, SeqCst);
2520
2521 // add `scheduled_ns` to the Monitor's total
2522 metrics
2523 .total_scheduled_duration_ns
2524 .fetch_add(scheduled_ns, SeqCst);
2525 }
2526 // Register the waker
2527 state.waker.register(cx.waker());
2528 // Get the instrumented waker
2529 let waker_ref = futures_util::task::waker_ref(state);
2530 let mut cx = Context::from_waker(&waker_ref);
2531 // Poll the task
2532 let inner_poll_start = Instant::now();
2533 let ret = poll_fn(this.task, &mut cx);
2534 let inner_poll_end = Instant::now();
2535 /* idle time starts now */
2536 *idled_at = (inner_poll_end - instrumented_at)
2537 .as_nanos()
2538 .try_into()
2539 .unwrap_or(u64::MAX);
2540 /* accounting for poll time */
2541 let inner_poll_duration = inner_poll_end - inner_poll_start;
2542 let inner_poll_ns: u64 = inner_poll_duration
2543 .as_nanos()
2544 .try_into()
2545 .unwrap_or(u64::MAX);
2546 let (count_bucket, duration_bucket) = // was this a slow or fast poll?
2547 if inner_poll_duration >= metrics.slow_poll_threshold {
2548 (&metrics.total_slow_poll_count, &metrics.total_slow_poll_duration)
2549 } else {
2550 (&metrics.total_fast_poll_count, &metrics.total_fast_poll_duration_ns)
2551 };
2552 // update the appropriate bucket
2553 count_bucket.fetch_add(1, SeqCst);
2554 duration_bucket.fetch_add(inner_poll_ns, SeqCst);
2555 ret
2556}
2557
2558impl State {
2559 fn on_wake(&self) {
2560 let woke_at: u64 = match self.instrumented_at.elapsed().as_nanos().try_into() {
2561 Ok(woke_at) => woke_at,
2562 // This is highly unlikely as it would mean the task ran for over
2563 // 500 years. If you ran your service for 500 years. If you are
2564 // reading this 500 years in the future, I'm sorry.
2565 Err(_) => return,
2566 };
2567
2568 // We don't actually care about the result
2569 let _ = self.woke_at.compare_exchange(0, woke_at, SeqCst, SeqCst);
2570 }
2571}
2572
2573impl ArcWake for State {
2574 fn wake_by_ref(arc_self: &Arc<State>) {
2575 arc_self.on_wake();
2576 arc_self.waker.wake();
2577 }
2578
2579 fn wake(self: Arc<State>) {
2580 self.on_wake();
2581 self.waker.wake();
2582 }
2583}
2584
2585#[inline(always)]
2586fn to_nanos(d: Duration) -> u64 {
2587 debug_assert!(d <= Duration::from_nanos(u64::MAX));
2588 d.as_secs()
2589 .wrapping_mul(1_000_000_000)
2590 .wrapping_add(d.subsec_nanos() as u64)
2591}
2592
2593#[inline(always)]
2594fn sub(a: Duration, b: Duration) -> Duration {
2595 let nanos = to_nanos(a).wrapping_sub(to_nanos(b));
2596 Duration::from_nanos(nanos)
2597}
2598
2599#[inline(always)]
2600fn mean(d: Duration, count: u64) -> Duration {
2601 if let Some(quotient) = to_nanos(d).checked_div(count) {
2602 Duration::from_nanos(quotient)
2603 } else {
2604 Duration::ZERO
2605 }
2606}