tokio_metrics

Struct TaskMonitor

Source
pub struct TaskMonitor { /* private fields */ }
Expand description

Monitors key metrics of instrumented tasks.

§Basic Usage

A TaskMonitor tracks key metrics of async tasks that have been instrumented with the monitor.

In the below example, a TaskMonitor is constructed and used to instrument three worker tasks; meanwhile, a fourth task prints metrics in 500ms intervals.

use std::time::Duration;

#[tokio::main]
async fn main() {
    // construct a metrics monitor
    let metrics_monitor = tokio_metrics::TaskMonitor::new();

    // print task metrics every 500ms
    {
        let metrics_monitor = metrics_monitor.clone();
        tokio::spawn(async move {
            for interval in metrics_monitor.intervals() {
                // pretty-print the metric interval
                println!("{:?}", interval);
                // wait 500ms
                tokio::time::sleep(Duration::from_millis(500)).await;
            }
        });
    }

    // instrument some tasks and await them
    // note that the same TaskMonitor can be used for multiple tasks
    tokio::join![
        metrics_monitor.instrument(do_work()),
        metrics_monitor.instrument(do_work()),
        metrics_monitor.instrument(do_work())
    ];
}

async fn do_work() {
    for _ in 0..25 {
        tokio::task::yield_now().await;
        tokio::time::sleep(Duration::from_millis(100)).await;
    }
}

§What should I instrument?

In most cases, you should construct a distinct TaskMonitor for each kind of key task.

§Instrumenting a web application

For instance, a web service should have a distinct TaskMonitor for each endpoint. Within each endpoint, it’s prudent to additionally instrument major sub-tasks, each with their own distinct TaskMonitors. Why are my tasks slow? explores a debugging scenario for a web service that takes this approach to instrumentation. This approach is exemplified in the below example:

// The unabridged version of this snippet is in the examples directory of this crate.

#[tokio::main]
async fn main() {
    // construct a TaskMonitor for root endpoint
    let monitor_root = tokio_metrics::TaskMonitor::new();

    // construct TaskMonitors for create_users endpoint
    let monitor_create_user = CreateUserMonitors {
        // monitor for the entire endpoint
        route: tokio_metrics::TaskMonitor::new(),
        // monitor for database insertion subtask
        insert: tokio_metrics::TaskMonitor::new(),
    };

    // build our application with two instrumented endpoints
    let app = axum::Router::new()
        // `GET /` goes to `root`
        .route("/", axum::routing::get({
            let monitor = monitor_root.clone();
            move || monitor.instrument(async { "Hello, World!" })
        }))
        // `POST /users` goes to `create_user`
        .route("/users", axum::routing::post({
            let monitors = monitor_create_user.clone();
            let route = monitors.route.clone();
            move |payload| {
                route.instrument(create_user(payload, monitors))
            }
        }));

    // print task metrics for each endpoint every 1s
    let metrics_frequency = std::time::Duration::from_secs(1);
    tokio::spawn(async move {
        let root_intervals = monitor_root.intervals();
        let create_user_route_intervals =
            monitor_create_user.route.intervals();
        let create_user_insert_intervals =
            monitor_create_user.insert.intervals();
        let create_user_intervals =
            create_user_route_intervals.zip(create_user_insert_intervals);

        let intervals = root_intervals.zip(create_user_intervals);
        for (root_route, (create_user_route, create_user_insert)) in intervals {
            println!("root_route = {:#?}", root_route);
            println!("create_user_route = {:#?}", create_user_route);
            println!("create_user_insert = {:#?}", create_user_insert);
            tokio::time::sleep(metrics_frequency).await;
        }
    });

    // run the server
    let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3000));
    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn create_user(
    axum::Json(payload): axum::Json<CreateUser>,
    monitors: CreateUserMonitors,
) -> impl axum::response::IntoResponse {
    let user = User { id: 1337, username: payload.username, };
    // instrument inserting the user into the db:
    let _ = monitors.insert.instrument(insert_user(user.clone())).await;
    (axum::http::StatusCode::CREATED, axum::Json(user))
}

/* definitions of CreateUserMonitors, CreateUser and User omitted for brevity */

// insert the user into the database
async fn insert_user(_: User) {
    /* implementation details elided */
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

§Why are my tasks slow?

Scenario: You track key, high-level metrics about the customer response time. An alarm warns you that P90 latency for an endpoint exceeds your targets. What is causing the increase?

§Identifying the high-level culprits

A set of tasks will appear to execute more slowly if:

  • they are taking longer to poll (i.e., they consume too much CPU time)
  • they are waiting longer to be polled (e.g., they’re waiting longer in tokio’s scheduling queues)
  • they are waiting longer on external events to complete (e.g., asynchronous network requests)

The culprits, at a high level, may be some combination of these sources of latency. Fortunately, you have instrumented the key tasks of each of your endpoints with distinct TaskMonitors. Using the monitors on the endpoint experiencing elevated latency, you begin by answering:

§Are my tasks taking longer to poll?
  • Did mean_poll_duration increase?
    This metric reflects the mean poll duration. If it increased, it means that, on average, individual polls tended to take longer. However, this does not necessarily imply increased task latency: An increase in poll durations could be offset by fewer polls.
  • Did slow_poll_ratio increase?
    This metric reflects the proportion of polls that were ‘slow’. If it increased, it means that a greater proportion of polls performed excessive computation before yielding. This does not necessarily imply increased task latency: An increase in the proportion of slow polls could be offset by fewer or faster polls.
  • Did mean_slow_poll_duration increase?
    This metric reflects the mean duration of slow polls. If it increased, it means that, on average, slow polls got slower. This does not necessarily imply increased task latency: An increase in average slow poll duration could be offset by fewer or faster polls.

If so, why are my tasks taking longer to poll?

§Are my tasks spending more time waiting to be polled?
  • Did mean_first_poll_delay increase?
    This metric reflects the mean delay between the instant a task is first instrumented and the instant it is first polled. If it increases, it means that, on average, tasks spent longer waiting to be initially run.
  • Did mean_scheduled_duration increase?
    This metric reflects the mean duration that tasks spent in the scheduled state. The ‘scheduled’ state of a task is the duration between the instant a task is awoken and the instant it is subsequently polled. If this metric increases, it means that, on average, tasks spent longer in tokio’s queues before being polled.
  • Did long_delay_ratio increase? This metric reflects the proportion of scheduling delays which were ‘long’. If it increased, it means that a greater proportion of tasks experienced excessive delays before they could execute after being woken. This does not necessarily indicate an increase in latency, as this could be offset by fewer or faster task polls.
  • Did mean_long_delay_duration increase? This metric reflects the mean duration of long delays. If it increased, it means that, on average, long delays got even longer. This does not necessarily imply increased task latency: an increase in average long delay duration could be offset by fewer or faster polls or more short schedules.

If so, why are my tasks spending more time waiting to be polled?

§Are my tasks spending more time waiting on external events to complete?
  • Did mean_idle_duration increase?
    This metric reflects the mean duration that tasks spent in the idle state. The idle state is the duration spanning the instant a task completes a poll, and the instant that it is next awoken. Tasks inhabit this state when they are waiting for task-external events to complete (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this metric increases, tasks, in aggregate, spent more time waiting for task-external events to complete.

If so, why are my tasks spending more time waiting on external events to complete?

§Digging deeper

Having established the high-level culprits, you now search for further explanation…

§Why are my tasks taking longer to poll?

You observed that your tasks are taking longer to poll. The culprit is likely some combination of:

  • Your tasks are accidentally blocking. Common culprits include:
    1. Using the Rust standard library’s filesystem or networking APIs.
      These APIs are synchronous; use tokio’s filesystem and networking APIs, instead.
    2. Calling block_on.
    3. Invoking println! or other synchronous logging routines.
      Invocations of println! involve acquiring an exclusive lock on stdout, followed by a synchronous write to stdout.
  1. Your tasks are computationally expensive. Common culprits include:
    1. TLS/cryptographic routines
    2. doing a lot of processing on bytes
    3. calling non-Tokio resources
§Why are my tasks spending more time waiting to be polled?

You observed that your tasks are spending more time waiting to be polled suggesting some combination of:

  • Your application is inflating the time elapsed between instrumentation and first poll.
  • Your tasks are being scheduled into tokio’s injection queue.
  • Other tasks are spending too long without yielding, thus backing up tokio’s queues.

Start by asking: Is time-to-first-poll unusually high?

§Why are my tasks spending more time waiting on external events to complete?

You observed that your tasks are spending more time waiting waiting on external events to complete. But what event? Fortunately, within the task experiencing increased idle times, you monitored several sub-tasks with distinct TaskMonitors. For each of these sub-tasks, you you try to identify the performance culprits…

§Digging even deeper
§Is time-to-first-poll unusually high?

Contrast these two metrics:

  • mean_first_poll_delay
    This metric reflects the mean delay between the instant a task is first instrumented and the instant it is first polled.
  • mean_scheduled_duration
    This metric reflects the mean delay between the instant when tasks were awoken and the instant they were subsequently polled.

If the former metric exceeds the latter (or increased unexpectedly more than the latter), then start by investigating if your application is artificially delaying the time-to-first-poll.

Otherwise, investigate if other tasks are polling too long without yielding.

§Is my application delaying the time-to-first-poll?

You observed that mean_first_poll_delay increased, more than mean_scheduled_duration. Your application may be needlessly inflating the time elapsed between instrumentation and first poll. Are you constructing (and instrumenting) tasks separately from awaiting or spawning them?

For instance, in the below example, the application induces 1 second delay between when task is instrumented and when it is awaited:

#[tokio::main]
async fn main() {
    use tokio::time::Duration;
    let monitor = tokio_metrics::TaskMonitor::new();

    let task = monitor.instrument(async move {});

    let one_sec = Duration::from_secs(1);
    tokio::time::sleep(one_sec).await;

    let _ = tokio::spawn(task).await;

    assert!(monitor.cumulative().total_first_poll_delay >= one_sec);
}

Otherwise, mean_first_poll_delay might be unusually high because your application is spawning key tasks into tokio’s injection queue…

§Is my application spawning more tasks into tokio’s injection queue?

Tasks awoken from threads not managed by the tokio runtime are scheduled with a slower, global “injection” queue.

You may be notifying runtime tasks from off-runtime. For instance, Given the following:

#[tokio::main]
async fn main() {
    for _ in 0..100 {
        let (tx, rx) = oneshot::channel();
        tokio::spawn(async move {
            tx.send(());
        })
         
        rx.await;
    }
}

One would expect this to run efficiently, however, the main task is run off the main runtime and the spawned tasks are on runtime, which means the snippet will run much slower than:

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        for _ in 0..100 {
            let (tx, rx) = oneshot::channel();
            tokio::spawn(async move {
                tx.send(());
            })

            rx.await;
        }
    }).await;
}

The slowdown is caused by a higher time between the rx task being notified (in tx.send()) and the task being polled.

§Are other tasks polling too long without yielding?

You suspect that your tasks are slow because they’re backed up in tokio’s scheduling queues. For each of your application’s TaskMonitors you check to see if their associated tasks are taking longer to poll…

§Limitations

The TaskMetrics type uses u64 to represent both event counters and durations (measured in nanoseconds). Consequently, event counters are accurate for ≤ u64::MAX events, and durations are accurate for ≤ u64::MAX nanoseconds.

The counters and durations of TaskMetrics produced by TaskMonitor::cumulative increase monotonically with each successive invocation of TaskMonitor::cumulative. Upon overflow, counters and durations wrap.

The counters and durations of TaskMetrics produced by TaskMonitor::intervals are calculated by computing the difference of metrics in successive invocations of TaskMonitor::cumulative. If, within a monitoring interval, an event occurs more than u64::MAX times, or a monitored duration exceeds u64::MAX nanoseconds, the metrics for that interval will overflow and not be accurate.

§Examples at the limits

Consider the TaskMetrics::total_first_poll_delay metric. This metric accurately reflects delays between instrumentation and first-poll ≤ u64::MAX nanoseconds:

use tokio::time::Duration;

#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
    let monitor = tokio_metrics::TaskMonitor::new();
    let mut interval = monitor.intervals();
    let mut next_interval = || interval.next().unwrap();

    // construct and instrument a task, but do not `await` it
    let task = monitor.instrument(async {});

    // this is the maximum duration representable by tokio_metrics
    let max_duration = Duration::from_nanos(u64::MAX);

    // let's advance the clock by this amount and poll `task`
    let _ = tokio::time::advance(max_duration).await;
    task.await;

    // durations ≤ `max_duration` are accurately reflected in this metric
    assert_eq!(next_interval().total_first_poll_delay, max_duration);
    assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
}

If the total delay between instrumentation and first poll exceeds u64::MAX nanoseconds, total_first_poll_delay will overflow:

 // construct and instrument a task, but do not `await` it
 let task_a = monitor.instrument(async {});
 let task_b = monitor.instrument(async {});

 // this is the maximum duration representable by tokio_metrics
 let max_duration = Duration::from_nanos(u64::MAX);

 // let's advance the clock by 1.5x this amount and await `task`
 let _ = tokio::time::advance(3 * (max_duration / 2)).await;
 task_a.await;
 task_b.await;

 // the `total_first_poll_delay` has overflowed
 assert!(monitor.cumulative().total_first_poll_delay < max_duration);

If many tasks are spawned, it will take far less than a u64::MAX-nanosecond delay to bring this metric to the precipice of overflow:

// construct and instrument u16::MAX tasks, but do not `await` them
let first_poll_count = u16::MAX as u64;
let mut tasks = Vec::with_capacity(first_poll_count as usize);
for _ in 0..first_poll_count { tasks.push(monitor.instrument(async {})); }

// this is the maximum duration representable by tokio_metrics
let max_duration = u64::MAX;

// let's advance the clock justenough such that all of the time-to-first-poll
// delays summed nearly equals `max_duration_nanos`, less some remainder...
let iffy_delay = max_duration / (first_poll_count as u64);
let small_remainder = max_duration % first_poll_count;
let _ = tokio::time::advance(Duration::from_nanos(iffy_delay)).await;

// ...then poll all of the instrumented tasks:
for task in tasks { task.await; }

// `total_first_poll_delay` is at the precipice of overflowing!
assert_eq!(
    next_interval().total_first_poll_delay.as_nanos(),
    (max_duration - small_remainder) as u128
);
assert_eq!(
    monitor.cumulative().total_first_poll_delay.as_nanos(),
    (max_duration - small_remainder) as u128
);

Frequent, interval-sampled metrics will retain their accuracy, even if the cumulative metrics counter overflows at most once in the midst of an interval:

 let first_poll_count = u16::MAX as u64;
 let batch_size = first_poll_count / 3;

 let max_duration_ns = u64::MAX;
 let iffy_delay_ns = max_duration_ns / first_poll_count;

 // Instrument `batch_size` number of tasks, wait for `delay` nanoseconds,
 // then await the instrumented tasks.
 async fn run_batch(monitor: &TaskMonitor, batch_size: usize, delay: u64) {
     let mut tasks = Vec::with_capacity(batch_size);
     for _ in 0..batch_size { tasks.push(monitor.instrument(async {})); }
     let _ = tokio::time::advance(Duration::from_nanos(delay)).await;
     for task in tasks { task.await; }
 }

 // this is how much `total_time_to_first_poll_ns` will
 // increase with each batch we run
 let batch_delay = iffy_delay_ns * batch_size;

 // run batches 1, 2, and 3
 for i in 1..=3 {
     run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
     assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
     assert_eq!(i * batch_delay as u128, monitor.cumulative().total_first_poll_delay.as_nanos());
 }

 /* now, the `total_time_to_first_poll_ns` counter is at the precipice of overflow */
 assert_eq!(monitor.cumulative().total_first_poll_delay.as_nanos(), max_duration_ns as u128);

 // run batch 4
 run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
 // the interval counter remains accurate
 assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
 // but the cumulative counter has overflowed
 assert_eq!(batch_delay as u128 - 1, monitor.cumulative().total_first_poll_delay.as_nanos());

If a cumulative metric overflows more than once in the midst of an interval, its interval-sampled counterpart will also overflow.

Implementations§

Source§

impl TaskMonitor

Source

pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = _

The default duration at which polls cross the threshold into being categorized as ‘slow’ is 50μs.

Source

pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = _

The default duration at which schedules cross the threshold into being categorized as ‘long’ is 50μs.

Source

pub fn new() -> TaskMonitor

Constructs a new task monitor.

Uses Self::DEFAULT_SLOW_POLL_THRESHOLD as the threshold at which polls will be considered ‘slow’.

Uses Self::DEFAULT_LONG_DELAY_THRESHOLD as the threshold at which scheduling will be considered ‘long’.

Source

pub fn builder() -> TaskMonitorBuilder

Constructs a builder for a task monitor.

Source

pub fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitor

Constructs a new task monitor with a given threshold at which polls are considered ‘slow’.

§Selecting an appropriate threshold

TODO. What advice can we give here?

§Examples

In the below example, low-threshold and high-threshold monitors are constructed and instrument identical tasks; the low-threshold monitor reports4 slow polls, and the high-threshold monitor reports only 2 slow polls:

use std::future::Future;
use std::time::Duration;
use tokio_metrics::TaskMonitor;

#[tokio::main]
async fn main() {
    let lo_threshold = Duration::from_micros(10);
    let hi_threshold = Duration::from_millis(10);

    let lo_monitor = TaskMonitor::with_slow_poll_threshold(lo_threshold);
    let hi_monitor = TaskMonitor::with_slow_poll_threshold(hi_threshold);

    let make_task = || async {
        spin_for(lo_threshold).await; // faster poll 1
        spin_for(lo_threshold).await; // faster poll 2
        spin_for(hi_threshold).await; // slower poll 3
        spin_for(hi_threshold).await  // slower poll 4
    };

    lo_monitor.instrument(make_task()).await;
    hi_monitor.instrument(make_task()).await;

    // the low-threshold monitor reported 4 slow polls:
    assert_eq!(lo_monitor.cumulative().total_slow_poll_count, 4);
    // the high-threshold monitor reported only 2 slow polls:
    assert_eq!(hi_monitor.cumulative().total_slow_poll_count, 2);
}

/// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
fn spin_for(duration: Duration) -> impl Future<Output=()> {
    let start = tokio::time::Instant::now();
    while start.elapsed() <= duration {}
    tokio::task::yield_now()
}
Source

pub fn slow_poll_threshold(&self) -> Duration

Produces the duration greater-than-or-equal-to at which polls are categorized as slow.

§Examples

In the below example, TaskMonitor is initialized with TaskMonitor::new; consequently, its slow-poll threshold equals TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD:

use tokio_metrics::TaskMonitor;

#[tokio::main]
async fn main() {
    let metrics_monitor = TaskMonitor::new();

    assert_eq!(
        metrics_monitor.slow_poll_threshold(),
        TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD
    );
}
Source

pub fn long_delay_threshold(&self) -> Duration

Produces the duration greater-than-or-equal-to at which scheduling delays are categorized as long.

Source

pub fn instrument<F>(&self, task: F) -> Instrumented<F>

Produces an instrumented façade around a given async task.

§Examples

Instrument an async task by passing it to TaskMonitor::instrument:

#[tokio::main]
async fn main() {
    let metrics_monitor = tokio_metrics::TaskMonitor::new();

    // 0 tasks have been instrumented, much less polled
    assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);

    // instrument a task and poll it to completion
    metrics_monitor.instrument(async {}).await;

    // 1 task has been instrumented and polled
    assert_eq!(metrics_monitor.cumulative().first_poll_count, 1);

    // instrument a task and poll it to completion
    metrics_monitor.instrument(async {}).await;

    // 2 tasks have been instrumented and polled
    assert_eq!(metrics_monitor.cumulative().first_poll_count, 2);
}

An aync task may be tracked by multiple TaskMonitors; e.g.:

#[tokio::main]
async fn main() {
    let monitor_a = tokio_metrics::TaskMonitor::new();
    let monitor_b = tokio_metrics::TaskMonitor::new();

    // 0 tasks have been instrumented, much less polled
    assert_eq!(monitor_a.cumulative().first_poll_count, 0);
    assert_eq!(monitor_b.cumulative().first_poll_count, 0);

    // instrument a task and poll it to completion
    monitor_a.instrument(monitor_b.instrument(async {})).await;

    // 1 task has been instrumented and polled
    assert_eq!(monitor_a.cumulative().first_poll_count, 1);
    assert_eq!(monitor_b.cumulative().first_poll_count, 1);
}

It is also possible (but probably undesirable) to instrument an async task multiple times with the same TaskMonitor; e.g.:

#[tokio::main]
async fn main() {
    let monitor = tokio_metrics::TaskMonitor::new();

    // 0 tasks have been instrumented, much less polled
    assert_eq!(monitor.cumulative().first_poll_count, 0);

    // instrument a task and poll it to completion
    monitor.instrument(monitor.instrument(async {})).await;

    // 2 tasks have been instrumented and polled, supposedly
    assert_eq!(monitor.cumulative().first_poll_count, 2);
}
Source

pub fn cumulative(&self) -> TaskMetrics

Produces TaskMetrics for the tasks instrumented by this TaskMonitor, collected since the construction of TaskMonitor.

§See also
§Examples

In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur within the second sampling interval, and 2 slow polls occur within the third sampling interval; five slow polls occur across all sampling intervals:

use std::future::Future;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let metrics_monitor = tokio_metrics::TaskMonitor::new();

    // initialize a stream of sampling intervals
    let mut intervals = metrics_monitor.intervals();
    // each call of `next_interval` will produce metrics for the last sampling interval
    let mut next_interval = || intervals.next().unwrap();

    let slow = 10 * metrics_monitor.slow_poll_threshold();

    // this task completes in three slow polls
    let _ = metrics_monitor.instrument(async {
        spin_for(slow).await; // slow poll 1
        spin_for(slow).await; // slow poll 2
        spin_for(slow)        // slow poll 3
    }).await;

    // in the previous sampling interval, there were 3 slow polls
    assert_eq!(next_interval().total_slow_poll_count, 3);
    assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 3);

    // this task completes in two slow polls
    let _ = metrics_monitor.instrument(async {
        spin_for(slow).await; // slow poll 1
        spin_for(slow)        // slow poll 2
    }).await;

    // in the previous sampling interval, there were 2 slow polls
    assert_eq!(next_interval().total_slow_poll_count, 2);

    // across all sampling interval, there were a total of 5 slow polls
    assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
}

/// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
fn spin_for(duration: Duration) -> impl Future<Output=()> {
    let start = tokio::time::Instant::now();
    while start.elapsed() <= duration {}
    tokio::task::yield_now()
}
Source

pub fn intervals(&self) -> impl Iterator<Item = TaskMetrics>

Produces an unending iterator of metric sampling intervals.

Each sampling interval is defined by the time elapsed between advancements of the iterator produced by TaskMonitor::intervals. The item type of this iterator is TaskMetrics, which is a bundle of task metrics that describe only events occurring within that sampling interval.

§Examples

In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur within the second sampling interval, and 2 slow polls occur within the third sampling interval; five slow polls occur across all sampling intervals:

use std::future::Future;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let metrics_monitor = tokio_metrics::TaskMonitor::new();

    // initialize a stream of sampling intervals
    let mut intervals = metrics_monitor.intervals();
    // each call of `next_interval` will produce metrics for the last sampling interval
    let mut next_interval = || intervals.next().unwrap();

    let slow = 10 * metrics_monitor.slow_poll_threshold();

    // this task completes in three slow polls
    let _ = metrics_monitor.instrument(async {
        spin_for(slow).await; // slow poll 1
        spin_for(slow).await; // slow poll 2
        spin_for(slow)        // slow poll 3
    }).await;

    // in the previous sampling interval, there were 3 slow polls
    assert_eq!(next_interval().total_slow_poll_count, 3);

    // this task completes in two slow polls
    let _ = metrics_monitor.instrument(async {
        spin_for(slow).await; // slow poll 1
        spin_for(slow)        // slow poll 2
    }).await;

    // in the previous sampling interval, there were 2 slow polls
    assert_eq!(next_interval().total_slow_poll_count, 2);

    // across all sampling intervals, there were a total of 5 slow polls
    assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
}

/// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
fn spin_for(duration: Duration) -> impl Future<Output=()> {
    let start = tokio::time::Instant::now();
    while start.elapsed() <= duration {}
    tokio::task::yield_now()
}

Trait Implementations§

Source§

impl Clone for TaskMonitor

Source§

fn clone(&self) -> TaskMonitor

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for TaskMonitor

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for TaskMonitor

Source§

fn default() -> TaskMonitor

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dst: *mut T)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more