Struct tokio_metrics::TaskMonitor
source · pub struct TaskMonitor { /* private fields */ }
Expand description
Monitors key metrics of instrumented tasks.
§Basic Usage
A TaskMonitor
tracks key metrics of async tasks that have been
instrumented with the monitor.
In the below example, a TaskMonitor
is constructed and used to
instrument three worker tasks; meanwhile, a fourth task
prints metrics in 500ms intervals.
use std::time::Duration;
#[tokio::main]
async fn main() {
// construct a metrics monitor
let metrics_monitor = tokio_metrics::TaskMonitor::new();
// print task metrics every 500ms
{
let metrics_monitor = metrics_monitor.clone();
tokio::spawn(async move {
for interval in metrics_monitor.intervals() {
// pretty-print the metric interval
println!("{:?}", interval);
// wait 500ms
tokio::time::sleep(Duration::from_millis(500)).await;
}
});
}
// instrument some tasks and await them
// note that the same TaskMonitor can be used for multiple tasks
tokio::join![
metrics_monitor.instrument(do_work()),
metrics_monitor.instrument(do_work()),
metrics_monitor.instrument(do_work())
];
}
async fn do_work() {
for _ in 0..25 {
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
§What should I instrument?
In most cases, you should construct a distinct TaskMonitor
for each kind of key task.
§Instrumenting a web application
For instance, a web service should have a distinct TaskMonitor
for each endpoint. Within
each endpoint, it’s prudent to additionally instrument major sub-tasks, each with their own
distinct TaskMonitor
s. Why are my tasks slow? explores a
debugging scenario for a web service that takes this approach to instrumentation. This
approach is exemplified in the below example:
// The unabridged version of this snippet is in the examples directory of this crate.
#[tokio::main]
async fn main() {
// construct a TaskMonitor for root endpoint
let monitor_root = tokio_metrics::TaskMonitor::new();
// construct TaskMonitors for create_users endpoint
let monitor_create_user = CreateUserMonitors {
// monitor for the entire endpoint
route: tokio_metrics::TaskMonitor::new(),
// monitor for database insertion subtask
insert: tokio_metrics::TaskMonitor::new(),
};
// build our application with two instrumented endpoints
let app = axum::Router::new()
// `GET /` goes to `root`
.route("/", axum::routing::get({
let monitor = monitor_root.clone();
move || monitor.instrument(async { "Hello, World!" })
}))
// `POST /users` goes to `create_user`
.route("/users", axum::routing::post({
let monitors = monitor_create_user.clone();
let route = monitors.route.clone();
move |payload| {
route.instrument(create_user(payload, monitors))
}
}));
// print task metrics for each endpoint every 1s
let metrics_frequency = std::time::Duration::from_secs(1);
tokio::spawn(async move {
let root_intervals = monitor_root.intervals();
let create_user_route_intervals =
monitor_create_user.route.intervals();
let create_user_insert_intervals =
monitor_create_user.insert.intervals();
let create_user_intervals =
create_user_route_intervals.zip(create_user_insert_intervals);
let intervals = root_intervals.zip(create_user_intervals);
for (root_route, (create_user_route, create_user_insert)) in intervals {
println!("root_route = {:#?}", root_route);
println!("create_user_route = {:#?}", create_user_route);
println!("create_user_insert = {:#?}", create_user_insert);
tokio::time::sleep(metrics_frequency).await;
}
});
// run the server
let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3000));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
async fn create_user(
axum::Json(payload): axum::Json<CreateUser>,
monitors: CreateUserMonitors,
) -> impl axum::response::IntoResponse {
let user = User { id: 1337, username: payload.username, };
// instrument inserting the user into the db:
let _ = monitors.insert.instrument(insert_user(user.clone())).await;
(axum::http::StatusCode::CREATED, axum::Json(user))
}
/* definitions of CreateUserMonitors, CreateUser and User omitted for brevity */
// insert the user into the database
async fn insert_user(_: User) {
/* implementation details elided */
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
§Why are my tasks slow?
Scenario: You track key, high-level metrics about the customer response time. An alarm warns you that P90 latency for an endpoint exceeds your targets. What is causing the increase?
§Identifying the high-level culprits
A set of tasks will appear to execute more slowly if:
- they are taking longer to poll (i.e., they consume too much CPU time)
- they are waiting longer to be polled (e.g., they’re waiting longer in tokio’s scheduling queues)
- they are waiting longer on external events to complete (e.g., asynchronous network requests)
The culprits, at a high level, may be some combination of these sources of latency. Fortunately,
you have instrumented the key tasks of each of your endpoints with distinct TaskMonitor
s.
Using the monitors on the endpoint experiencing elevated latency, you begin by answering:
- Are my tasks taking longer to poll?
- Are my tasks spending more time waiting to be polled?
- Are my tasks spending more time waiting on external events to complete?
§Are my tasks taking longer to poll?
- Did
mean_poll_duration
increase?
This metric reflects the mean poll duration. If it increased, it means that, on average, individual polls tended to take longer. However, this does not necessarily imply increased task latency: An increase in poll durations could be offset by fewer polls. - Did
slow_poll_ratio
increase?
This metric reflects the proportion of polls that were ‘slow’. If it increased, it means that a greater proportion of polls performed excessive computation before yielding. This does not necessarily imply increased task latency: An increase in the proportion of slow polls could be offset by fewer or faster polls. - Did
mean_slow_poll_duration
increase?
This metric reflects the mean duration of slow polls. If it increased, it means that, on average, slow polls got slower. This does not necessarily imply increased task latency: An increase in average slow poll duration could be offset by fewer or faster polls.
If so, why are my tasks taking longer to poll?
§Are my tasks spending more time waiting to be polled?
- Did
mean_first_poll_delay
increase?
This metric reflects the mean delay between the instant a task is first instrumented and the instant it is first polled. If it increases, it means that, on average, tasks spent longer waiting to be initially run. - Did
mean_scheduled_duration
increase?
This metric reflects the mean duration that tasks spent in the scheduled state. The ‘scheduled’ state of a task is the duration between the instant a task is awoken and the instant it is subsequently polled. If this metric increases, it means that, on average, tasks spent longer in tokio’s queues before being polled. - Did
long_delay_ratio
increase? This metric reflects the proportion of scheduling delays which were ‘long’. If it increased, it means that a greater proportion of tasks experienced excessive delays before they could execute after being woken. This does not necessarily indicate an increase in latency, as this could be offset by fewer or faster task polls. - Did
mean_long_delay_duration
increase? This metric reflects the mean duration of long delays. If it increased, it means that, on average, long delays got even longer. This does not necessarily imply increased task latency: an increase in average long delay duration could be offset by fewer or faster polls or more short schedules.
If so, why are my tasks spending more time waiting to be polled?
§Are my tasks spending more time waiting on external events to complete?
- Did
mean_idle_duration
increase?
This metric reflects the mean duration that tasks spent in the idle state. The idle state is the duration spanning the instant a task completes a poll, and the instant that it is next awoken. Tasks inhabit this state when they are waiting for task-external events to complete (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this metric increases, tasks, in aggregate, spent more time waiting for task-external events to complete.
If so, why are my tasks spending more time waiting on external events to complete?
§Digging deeper
Having established the high-level culprits, you now search for further explanation…
§Why are my tasks taking longer to poll?
You observed that your tasks are taking longer to poll. The culprit is likely some combination of:
- Your tasks are accidentally blocking. Common culprits include:
- Using the Rust standard library’s filesystem or
networking APIs.
These APIs are synchronous; use tokio’s filesystem and networking APIs, instead. - Calling
block_on
. - Invoking
println!
or other synchronous logging routines.
Invocations ofprintln!
involve acquiring an exclusive lock on stdout, followed by a synchronous write to stdout.
- Using the Rust standard library’s filesystem or
networking APIs.
- Your tasks are computationally expensive. Common culprits include:
- TLS/cryptographic routines
- doing a lot of processing on bytes
- calling non-Tokio resources
§Why are my tasks spending more time waiting to be polled?
You observed that your tasks are spending more time waiting to be polled suggesting some combination of:
- Your application is inflating the time elapsed between instrumentation and first poll.
- Your tasks are being scheduled into tokio’s injection queue.
- Other tasks are spending too long without yielding, thus backing up tokio’s queues.
Start by asking: Is time-to-first-poll unusually high?
§Why are my tasks spending more time waiting on external events to complete?
You observed that your tasks are spending more time waiting waiting on external events to
complete. But what
event? Fortunately, within the task experiencing increased idle times, you monitored several
sub-tasks with distinct TaskMonitor
s. For each of these sub-tasks, you you try to identify
the performance culprits…
§Digging even deeper
§Is time-to-first-poll unusually high?
Contrast these two metrics:
mean_first_poll_delay
This metric reflects the mean delay between the instant a task is first instrumented and the instant it is first polled.mean_scheduled_duration
This metric reflects the mean delay between the instant when tasks were awoken and the instant they were subsequently polled.
If the former metric exceeds the latter (or increased unexpectedly more than the latter), then start by investigating if your application is artificially delaying the time-to-first-poll.
Otherwise, investigate if other tasks are polling too long without yielding.
§Is my application delaying the time-to-first-poll?
You observed that mean_first_poll_delay
increased, more
than mean_scheduled_duration
. Your application may be
needlessly inflating the time elapsed between instrumentation and first poll. Are you
constructing (and instrumenting) tasks separately from awaiting or spawning them?
For instance, in the below example, the application induces 1 second delay between when task
is instrumented and when it is awaited:
#[tokio::main]
async fn main() {
use tokio::time::Duration;
let monitor = tokio_metrics::TaskMonitor::new();
let task = monitor.instrument(async move {});
let one_sec = Duration::from_secs(1);
tokio::time::sleep(one_sec).await;
let _ = tokio::spawn(task).await;
assert!(monitor.cumulative().total_first_poll_delay >= one_sec);
}
Otherwise, mean_first_poll_delay
might be unusually high
because your application is spawning key tasks into tokio’s injection queue…
§Is my application spawning more tasks into tokio’s injection queue?
Tasks awoken from threads not managed by the tokio runtime are scheduled with a slower, global “injection” queue.
You may be notifying runtime tasks from off-runtime. For instance, Given the following:
#[tokio::main]
async fn main() {
for _ in 0..100 {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send(());
})
rx.await;
}
}
One would expect this to run efficiently, however, the main task is run off the main runtime and the spawned tasks are on runtime, which means the snippet will run much slower than:
#[tokio::main]
async fn main() {
tokio::spawn(async {
for _ in 0..100 {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send(());
})
rx.await;
}
}).await;
}
The slowdown is caused by a higher time between the rx
task being notified (in tx.send()
)
and the task being polled.
§Are other tasks polling too long without yielding?
You suspect that your tasks are slow because they’re backed up in tokio’s scheduling queues. For
each of your application’s TaskMonitor
s you check to see if their associated tasks are
taking longer to poll…
§Limitations
The TaskMetrics
type uses u64
to represent both event counters and durations (measured
in nanoseconds). Consequently, event counters are accurate for ≤ u64::MAX
events, and
durations are accurate for ≤ u64::MAX
nanoseconds.
The counters and durations of TaskMetrics
produced by TaskMonitor::cumulative
increase
monotonically with each successive invocation of TaskMonitor::cumulative
. Upon overflow,
counters and durations wrap.
The counters and durations of TaskMetrics
produced by TaskMonitor::intervals
are
calculated by computing the difference of metrics in successive invocations of
TaskMonitor::cumulative
. If, within a monitoring interval, an event occurs more than
u64::MAX
times, or a monitored duration exceeds u64::MAX
nanoseconds, the metrics for
that interval will overflow and not be accurate.
§Examples at the limits
Consider the TaskMetrics::total_first_poll_delay
metric. This metric accurately reflects
delays between instrumentation and first-poll ≤ u64::MAX
nanoseconds:
use tokio::time::Duration;
#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
let monitor = tokio_metrics::TaskMonitor::new();
let mut interval = monitor.intervals();
let mut next_interval = || interval.next().unwrap();
// construct and instrument a task, but do not `await` it
let task = monitor.instrument(async {});
// this is the maximum duration representable by tokio_metrics
let max_duration = Duration::from_nanos(u64::MAX);
// let's advance the clock by this amount and poll `task`
let _ = tokio::time::advance(max_duration).await;
task.await;
// durations ≤ `max_duration` are accurately reflected in this metric
assert_eq!(next_interval().total_first_poll_delay, max_duration);
assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
}
If the total delay between instrumentation and first poll exceeds u64::MAX
nanoseconds,
total_first_poll_delay
will overflow:
// construct and instrument a task, but do not `await` it
let task_a = monitor.instrument(async {});
let task_b = monitor.instrument(async {});
// this is the maximum duration representable by tokio_metrics
let max_duration = Duration::from_nanos(u64::MAX);
// let's advance the clock by 1.5x this amount and await `task`
let _ = tokio::time::advance(3 * (max_duration / 2)).await;
task_a.await;
task_b.await;
// the `total_first_poll_delay` has overflowed
assert!(monitor.cumulative().total_first_poll_delay < max_duration);
If many tasks are spawned, it will take far less than a u64::MAX
-nanosecond delay to bring
this metric to the precipice of overflow:
// construct and instrument u16::MAX tasks, but do not `await` them
let first_poll_count = u16::MAX as u64;
let mut tasks = Vec::with_capacity(first_poll_count as usize);
for _ in 0..first_poll_count { tasks.push(monitor.instrument(async {})); }
// this is the maximum duration representable by tokio_metrics
let max_duration = u64::MAX;
// let's advance the clock justenough such that all of the time-to-first-poll
// delays summed nearly equals `max_duration_nanos`, less some remainder...
let iffy_delay = max_duration / (first_poll_count as u64);
let small_remainder = max_duration % first_poll_count;
let _ = tokio::time::advance(Duration::from_nanos(iffy_delay)).await;
// ...then poll all of the instrumented tasks:
for task in tasks { task.await; }
// `total_first_poll_delay` is at the precipice of overflowing!
assert_eq!(
next_interval().total_first_poll_delay.as_nanos(),
(max_duration - small_remainder) as u128
);
assert_eq!(
monitor.cumulative().total_first_poll_delay.as_nanos(),
(max_duration - small_remainder) as u128
);
Frequent, interval-sampled metrics will retain their accuracy, even if the cumulative metrics counter overflows at most once in the midst of an interval:
let first_poll_count = u16::MAX as u64;
let batch_size = first_poll_count / 3;
let max_duration_ns = u64::MAX;
let iffy_delay_ns = max_duration_ns / first_poll_count;
// Instrument `batch_size` number of tasks, wait for `delay` nanoseconds,
// then await the instrumented tasks.
async fn run_batch(monitor: &TaskMonitor, batch_size: usize, delay: u64) {
let mut tasks = Vec::with_capacity(batch_size);
for _ in 0..batch_size { tasks.push(monitor.instrument(async {})); }
let _ = tokio::time::advance(Duration::from_nanos(delay)).await;
for task in tasks { task.await; }
}
// this is how much `total_time_to_first_poll_ns` will
// increase with each batch we run
let batch_delay = iffy_delay_ns * batch_size;
// run batches 1, 2, and 3
for i in 1..=3 {
run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
assert_eq!(i * batch_delay as u128, monitor.cumulative().total_first_poll_delay.as_nanos());
}
/* now, the `total_time_to_first_poll_ns` counter is at the precipice of overflow */
assert_eq!(monitor.cumulative().total_first_poll_delay.as_nanos(), max_duration_ns as u128);
// run batch 4
run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
// the interval counter remains accurate
assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
// but the cumulative counter has overflowed
assert_eq!(batch_delay as u128 - 1, monitor.cumulative().total_first_poll_delay.as_nanos());
If a cumulative metric overflows more than once in the midst of an interval, its interval-sampled counterpart will also overflow.
Implementations§
source§impl TaskMonitor
impl TaskMonitor
sourcepub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = _
pub const DEFAULT_SLOW_POLL_THRESHOLD: Duration = _
The default duration at which polls cross the threshold into being categorized as ‘slow’ is 50μs.
sourcepub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = _
pub const DEFAULT_LONG_DELAY_THRESHOLD: Duration = _
The default duration at which schedules cross the threshold into being categorized as ‘long’ is 50μs.
sourcepub fn new() -> TaskMonitor
pub fn new() -> TaskMonitor
Constructs a new task monitor.
Uses Self::DEFAULT_SLOW_POLL_THRESHOLD
as the threshold at which polls will be
considered ‘slow’.
Uses Self::DEFAULT_LONG_DELAY_THRESHOLD
as the threshold at which scheduling will be
considered ‘long’.
sourcepub fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitor
pub fn with_slow_poll_threshold(slow_poll_cut_off: Duration) -> TaskMonitor
Constructs a new task monitor with a given threshold at which polls are considered ‘slow’.
§Selecting an appropriate threshold
TODO. What advice can we give here?
§Examples
In the below example, low-threshold and high-threshold monitors are constructed and instrument identical tasks; the low-threshold monitor reports4 slow polls, and the high-threshold monitor reports only 2 slow polls:
use std::future::Future;
use std::time::Duration;
use tokio_metrics::TaskMonitor;
#[tokio::main]
async fn main() {
let lo_threshold = Duration::from_micros(10);
let hi_threshold = Duration::from_millis(10);
let lo_monitor = TaskMonitor::with_slow_poll_threshold(lo_threshold);
let hi_monitor = TaskMonitor::with_slow_poll_threshold(hi_threshold);
let make_task = || async {
spin_for(lo_threshold).await; // faster poll 1
spin_for(lo_threshold).await; // faster poll 2
spin_for(hi_threshold).await; // slower poll 3
spin_for(hi_threshold).await // slower poll 4
};
lo_monitor.instrument(make_task()).await;
hi_monitor.instrument(make_task()).await;
// the low-threshold monitor reported 4 slow polls:
assert_eq!(lo_monitor.cumulative().total_slow_poll_count, 4);
// the high-threshold monitor reported only 2 slow polls:
assert_eq!(hi_monitor.cumulative().total_slow_poll_count, 2);
}
/// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
fn spin_for(duration: Duration) -> impl Future<Output=()> {
let start = tokio::time::Instant::now();
while start.elapsed() <= duration {}
tokio::task::yield_now()
}
sourcepub fn slow_poll_threshold(&self) -> Duration
pub fn slow_poll_threshold(&self) -> Duration
Produces the duration greater-than-or-equal-to at which polls are categorized as slow.
§Examples
In the below example, TaskMonitor
is initialized with TaskMonitor::new
;
consequently, its slow-poll threshold equals TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD
:
use tokio_metrics::TaskMonitor;
#[tokio::main]
async fn main() {
let metrics_monitor = TaskMonitor::new();
assert_eq!(
metrics_monitor.slow_poll_threshold(),
TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD
);
}
sourcepub fn long_delay_threshold(&self) -> Duration
pub fn long_delay_threshold(&self) -> Duration
Produces the duration greater-than-or-equal-to at which scheduling delays are categorized as long.
sourcepub fn instrument<F>(&self, task: F) -> Instrumented<F> ⓘ
pub fn instrument<F>(&self, task: F) -> Instrumented<F> ⓘ
Produces an instrumented façade around a given async task.
§Examples
Instrument an async task by passing it to TaskMonitor::instrument
:
#[tokio::main]
async fn main() {
let metrics_monitor = tokio_metrics::TaskMonitor::new();
// 0 tasks have been instrumented, much less polled
assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
// instrument a task and poll it to completion
metrics_monitor.instrument(async {}).await;
// 1 task has been instrumented and polled
assert_eq!(metrics_monitor.cumulative().first_poll_count, 1);
// instrument a task and poll it to completion
metrics_monitor.instrument(async {}).await;
// 2 tasks have been instrumented and polled
assert_eq!(metrics_monitor.cumulative().first_poll_count, 2);
}
An aync task may be tracked by multiple TaskMonitor
s; e.g.:
#[tokio::main]
async fn main() {
let monitor_a = tokio_metrics::TaskMonitor::new();
let monitor_b = tokio_metrics::TaskMonitor::new();
// 0 tasks have been instrumented, much less polled
assert_eq!(monitor_a.cumulative().first_poll_count, 0);
assert_eq!(monitor_b.cumulative().first_poll_count, 0);
// instrument a task and poll it to completion
monitor_a.instrument(monitor_b.instrument(async {})).await;
// 1 task has been instrumented and polled
assert_eq!(monitor_a.cumulative().first_poll_count, 1);
assert_eq!(monitor_b.cumulative().first_poll_count, 1);
}
It is also possible (but probably undesirable) to instrument an async task multiple times
with the same TaskMonitor
; e.g.:
#[tokio::main]
async fn main() {
let monitor = tokio_metrics::TaskMonitor::new();
// 0 tasks have been instrumented, much less polled
assert_eq!(monitor.cumulative().first_poll_count, 0);
// instrument a task and poll it to completion
monitor.instrument(monitor.instrument(async {})).await;
// 2 tasks have been instrumented and polled, supposedly
assert_eq!(monitor.cumulative().first_poll_count, 2);
}
sourcepub fn cumulative(&self) -> TaskMetrics
pub fn cumulative(&self) -> TaskMetrics
Produces TaskMetrics
for the tasks instrumented by this TaskMonitor
, collected since
the construction of TaskMonitor
.
§See also
TaskMonitor::intervals
: producesTaskMetrics
for user-defined sampling intervals, instead of cumulatively
§Examples
In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur within the second sampling interval, and 2 slow polls occur within the third sampling interval; five slow polls occur across all sampling intervals:
use std::future::Future;
use std::time::Duration;
#[tokio::main]
async fn main() {
let metrics_monitor = tokio_metrics::TaskMonitor::new();
// initialize a stream of sampling intervals
let mut intervals = metrics_monitor.intervals();
// each call of `next_interval` will produce metrics for the last sampling interval
let mut next_interval = || intervals.next().unwrap();
let slow = 10 * metrics_monitor.slow_poll_threshold();
// this task completes in three slow polls
let _ = metrics_monitor.instrument(async {
spin_for(slow).await; // slow poll 1
spin_for(slow).await; // slow poll 2
spin_for(slow) // slow poll 3
}).await;
// in the previous sampling interval, there were 3 slow polls
assert_eq!(next_interval().total_slow_poll_count, 3);
assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 3);
// this task completes in two slow polls
let _ = metrics_monitor.instrument(async {
spin_for(slow).await; // slow poll 1
spin_for(slow) // slow poll 2
}).await;
// in the previous sampling interval, there were 2 slow polls
assert_eq!(next_interval().total_slow_poll_count, 2);
// across all sampling interval, there were a total of 5 slow polls
assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
}
/// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
fn spin_for(duration: Duration) -> impl Future<Output=()> {
let start = tokio::time::Instant::now();
while start.elapsed() <= duration {}
tokio::task::yield_now()
}
sourcepub fn intervals(&self) -> impl Iterator<Item = TaskMetrics>
pub fn intervals(&self) -> impl Iterator<Item = TaskMetrics>
Produces an unending iterator of metric sampling intervals.
Each sampling interval is defined by the time elapsed between advancements of the iterator
produced by TaskMonitor::intervals
. The item type of this iterator is TaskMetrics
,
which is a bundle of task metrics that describe only events occurring within that sampling
interval.
§Examples
In the below example, 0 polls occur within the first sampling interval, 3 slow polls occur within the second sampling interval, and 2 slow polls occur within the third sampling interval; five slow polls occur across all sampling intervals:
use std::future::Future;
use std::time::Duration;
#[tokio::main]
async fn main() {
let metrics_monitor = tokio_metrics::TaskMonitor::new();
// initialize a stream of sampling intervals
let mut intervals = metrics_monitor.intervals();
// each call of `next_interval` will produce metrics for the last sampling interval
let mut next_interval = || intervals.next().unwrap();
let slow = 10 * metrics_monitor.slow_poll_threshold();
// this task completes in three slow polls
let _ = metrics_monitor.instrument(async {
spin_for(slow).await; // slow poll 1
spin_for(slow).await; // slow poll 2
spin_for(slow) // slow poll 3
}).await;
// in the previous sampling interval, there were 3 slow polls
assert_eq!(next_interval().total_slow_poll_count, 3);
// this task completes in two slow polls
let _ = metrics_monitor.instrument(async {
spin_for(slow).await; // slow poll 1
spin_for(slow) // slow poll 2
}).await;
// in the previous sampling interval, there were 2 slow polls
assert_eq!(next_interval().total_slow_poll_count, 2);
// across all sampling intervals, there were a total of 5 slow polls
assert_eq!(metrics_monitor.cumulative().total_slow_poll_count, 5);
}
/// Block the current thread for a given `duration`, then (optionally) yield to the scheduler.
fn spin_for(duration: Duration) -> impl Future<Output=()> {
let start = tokio::time::Instant::now();
while start.elapsed() <= duration {}
tokio::task::yield_now()
}
Trait Implementations§
source§impl Clone for TaskMonitor
impl Clone for TaskMonitor
source§fn clone(&self) -> TaskMonitor
fn clone(&self) -> TaskMonitor
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresource§impl Debug for TaskMonitor
impl Debug for TaskMonitor
source§impl Default for TaskMonitor
impl Default for TaskMonitor
source§fn default() -> TaskMonitor
fn default() -> TaskMonitor
Auto Trait Implementations§
impl Freeze for TaskMonitor
impl RefUnwindSafe for TaskMonitor
impl Send for TaskMonitor
impl Sync for TaskMonitor
impl Unpin for TaskMonitor
impl UnwindSafe for TaskMonitor
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)