console_subscriber/
stats.rs

1use crate::{attribute, sync::Mutex, ToProto};
2use crossbeam_utils::atomic::AtomicCell;
3use hdrhistogram::{
4    self,
5    serialization::{Serializer, V2Serializer},
6};
7use std::cmp;
8use std::sync::{
9    atomic::{AtomicBool, AtomicUsize, Ordering::*},
10    Arc,
11};
12use std::time::{Duration, Instant, SystemTime};
13use tracing::span::Id;
14
15use console_api as proto;
16
17/// A type which records whether it has unsent updates.
18///
19/// If something implementing this trait has been changed since the last time
20/// data was sent to a client, it will indicate that it is "dirty". If it has
21/// not been changed, it does not have to be included in the current update.
22pub(crate) trait Unsent {
23    /// Returns `true` if this type has unsent updates, and if it does, clears
24    /// the flag indicating there are unsent updates.
25    ///
26    /// This is called when filtering which stats need to be included in the
27    /// current update. If this returns `true`, it will be included, so it
28    /// becomes no longer dirty.
29    fn take_unsent(&self) -> bool;
30
31    /// Returns `true` if this type has unsent updates, without changing the
32    /// flag.
33    fn is_unsent(&self) -> bool;
34}
35
36// An entity (e.g Task, Resource) that at some point in
37// time can be dropped. This generally refers to spans that
38// have been closed indicating that a task, async op or a
39// resource is not in use anymore
40pub(crate) trait DroppedAt {
41    fn dropped_at(&self) -> Option<Instant>;
42}
43
44/// Anchors an `Instant` with a `SystemTime` timestamp to allow converting
45/// monotonic `Instant`s into timestamps that can be sent over the wire.
46#[derive(Debug, Clone)]
47pub(crate) struct TimeAnchor {
48    mono: Instant,
49    sys: SystemTime,
50}
51
52/// Stats associated with a task.
53#[derive(Debug)]
54pub(crate) struct TaskStats {
55    is_dirty: AtomicBool,
56    is_dropped: AtomicBool,
57    // task stats
58    pub(crate) created_at: Instant,
59    dropped_at: Mutex<Option<Instant>>,
60
61    // waker stats
62    wakes: AtomicUsize,
63    waker_clones: AtomicUsize,
64    waker_drops: AtomicUsize,
65    self_wakes: AtomicUsize,
66
67    /// Poll durations and other stats.
68    poll_stats: PollStats<Histogram>,
69}
70
71/// Stats associated with an async operation.
72///
73/// This shares all of the same fields as [`ResourceStats]`, with the addition
74/// of [`PollStats`] tracking when the async operation is polled, and the task
75/// ID of the last task to poll the async op.
76#[derive(Debug)]
77pub(crate) struct AsyncOpStats {
78    /// The task ID of the last task to poll this async op.
79    ///
80    /// This is set every time the async op is polled, in case a future is
81    /// passed between tasks.
82    task_id: AtomicCell<u64>,
83
84    /// Fields shared with `ResourceStats`.
85    pub(crate) stats: ResourceStats,
86
87    /// Poll durations and other stats.
88    poll_stats: PollStats<()>,
89}
90
91/// Stats associated with a resource.
92#[derive(Debug)]
93pub(crate) struct ResourceStats {
94    is_dirty: AtomicBool,
95    is_dropped: AtomicBool,
96    created_at: Instant,
97    dropped_at: Mutex<Option<Instant>>,
98    attributes: Mutex<attribute::Attributes>,
99    pub(crate) inherit_child_attributes: bool,
100    pub(crate) parent_id: Option<Id>,
101}
102
103#[derive(Debug, Default)]
104struct PollStats<H> {
105    /// The number of polls in progress
106    current_polls: AtomicUsize,
107    /// The total number of polls
108    polls: AtomicUsize,
109    timestamps: Mutex<PollTimestamps<H>>,
110}
111
112#[derive(Debug, Default)]
113struct PollTimestamps<H> {
114    first_poll: Option<Instant>,
115    last_wake: Option<Instant>,
116    last_poll_started: Option<Instant>,
117    last_poll_ended: Option<Instant>,
118    busy_time: Duration,
119    scheduled_time: Duration,
120    poll_histogram: H,
121    scheduled_histogram: H,
122}
123
124#[derive(Debug)]
125struct Histogram {
126    histogram: hdrhistogram::Histogram<u64>,
127    max: u64,
128    outliers: u64,
129    max_outlier: Option<u64>,
130}
131
132trait RecordDuration {
133    fn record_duration(&mut self, duration: Duration);
134}
135
136impl TimeAnchor {
137    pub(crate) fn new() -> Self {
138        Self {
139            mono: Instant::now(),
140            sys: SystemTime::now(),
141        }
142    }
143
144    pub(crate) fn to_system_time(&self, t: Instant) -> SystemTime {
145        let dur = t
146            .checked_duration_since(self.mono)
147            .unwrap_or_else(|| Duration::from_secs(0));
148        self.sys + dur
149    }
150
151    pub(crate) fn to_timestamp(&self, t: Instant) -> prost_types::Timestamp {
152        self.to_system_time(t).into()
153    }
154}
155
156impl TaskStats {
157    pub(crate) fn new(
158        poll_duration_max: u64,
159        scheduled_duration_max: u64,
160        created_at: Instant,
161    ) -> Self {
162        Self {
163            is_dirty: AtomicBool::new(true),
164            is_dropped: AtomicBool::new(false),
165            created_at,
166            dropped_at: Mutex::new(None),
167            poll_stats: PollStats {
168                timestamps: Mutex::new(PollTimestamps {
169                    poll_histogram: Histogram::new(poll_duration_max),
170                    scheduled_histogram: Histogram::new(scheduled_duration_max),
171                    first_poll: None,
172                    last_wake: None,
173                    last_poll_started: None,
174                    last_poll_ended: None,
175                    busy_time: Duration::new(0, 0),
176                    scheduled_time: Duration::new(0, 0),
177                }),
178                current_polls: AtomicUsize::new(0),
179                polls: AtomicUsize::new(0),
180            },
181            wakes: AtomicUsize::new(0),
182            waker_clones: AtomicUsize::new(0),
183            waker_drops: AtomicUsize::new(0),
184            self_wakes: AtomicUsize::new(0),
185        }
186    }
187
188    pub(crate) fn record_wake_op(&self, op: crate::WakeOp, at: Instant) {
189        use crate::WakeOp;
190        match op {
191            WakeOp::Clone => {
192                self.waker_clones.fetch_add(1, Release);
193            }
194            WakeOp::Drop => {
195                self.waker_drops.fetch_add(1, Release);
196            }
197            WakeOp::WakeByRef { self_wake } => self.wake(at, self_wake),
198            WakeOp::Wake { self_wake } => {
199                // Note: `Waker::wake` does *not* call the `drop`
200                // implementation, so waking by value doesn't
201                // trigger a drop event. so, count this as a `drop`
202                // to ensure the task's number of wakers can be
203                // calculated as `clones` - `drops`.
204                //
205                // see
206                // https://github.com/rust-lang/rust/blob/673d0db5e393e9c64897005b470bfeb6d5aec61b/library/core/src/task/wake.rs#L211-L212
207                self.waker_drops.fetch_add(1, Release);
208
209                self.wake(at, self_wake)
210            }
211        }
212        self.make_dirty();
213    }
214
215    fn wake(&self, at: Instant, self_wake: bool) {
216        self.poll_stats.wake(at);
217
218        self.wakes.fetch_add(1, Release);
219        if self_wake {
220            self.self_wakes.fetch_add(1, Release);
221        }
222
223        self.make_dirty();
224    }
225
226    pub(crate) fn start_poll(&self, at: Instant) {
227        self.poll_stats.start_poll(at);
228        self.make_dirty();
229    }
230
231    pub(crate) fn end_poll(&self, at: Instant) {
232        self.poll_stats.end_poll(at);
233        self.make_dirty();
234    }
235
236    pub(crate) fn drop_task(&self, dropped_at: Instant) {
237        if self.is_dropped.swap(true, AcqRel) {
238            // The task was already dropped.
239            // TODO(eliza): this could maybe panic in debug mode...
240            return;
241        }
242
243        let _prev = self.dropped_at.lock().replace(dropped_at);
244        debug_assert_eq!(_prev, None, "tried to drop a task twice; this is a bug!");
245        self.make_dirty();
246    }
247
248    pub(crate) fn poll_duration_histogram(&self) -> proto::tasks::task_details::PollTimesHistogram {
249        let hist = self.poll_stats.timestamps.lock().poll_histogram.to_proto();
250        proto::tasks::task_details::PollTimesHistogram::Histogram(hist)
251    }
252
253    pub(crate) fn scheduled_duration_histogram(&self) -> proto::tasks::DurationHistogram {
254        self.poll_stats
255            .timestamps
256            .lock()
257            .scheduled_histogram
258            .to_proto()
259    }
260
261    #[inline]
262    fn make_dirty(&self) {
263        self.is_dirty.swap(true, AcqRel);
264    }
265}
266
267impl ToProto for TaskStats {
268    type Output = proto::tasks::Stats;
269
270    fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
271        let poll_stats = Some(self.poll_stats.to_proto(base_time));
272        let timestamps = self.poll_stats.timestamps.lock();
273        proto::tasks::Stats {
274            poll_stats,
275            created_at: Some(base_time.to_timestamp(self.created_at)),
276            dropped_at: self.dropped_at.lock().map(|at| base_time.to_timestamp(at)),
277            wakes: self.wakes.load(Acquire) as u64,
278            waker_clones: self.waker_clones.load(Acquire) as u64,
279            self_wakes: self.self_wakes.load(Acquire) as u64,
280            waker_drops: self.waker_drops.load(Acquire) as u64,
281            last_wake: timestamps.last_wake.map(|at| base_time.to_timestamp(at)),
282            scheduled_time: Some(
283                timestamps
284                    .scheduled_time
285                    .try_into()
286                    .unwrap_or_else(|error| {
287                        eprintln!(
288                            "failed to convert `scheduled_time` to protobuf duration: {}",
289                            error
290                        );
291                        Default::default()
292                    }),
293            ),
294        }
295    }
296}
297
298impl Unsent for TaskStats {
299    #[inline]
300    fn take_unsent(&self) -> bool {
301        self.is_dirty.swap(false, AcqRel)
302    }
303
304    fn is_unsent(&self) -> bool {
305        self.is_dirty.load(Acquire)
306    }
307}
308
309impl DroppedAt for TaskStats {
310    fn dropped_at(&self) -> Option<Instant> {
311        // avoid acquiring the lock if we know we haven't tried to drop this
312        // thing yet
313        if self.is_dropped.load(Acquire) {
314            return *self.dropped_at.lock();
315        }
316
317        None
318    }
319}
320
321// === impl AsyncOpStats ===
322
323impl AsyncOpStats {
324    pub(crate) fn new(
325        created_at: Instant,
326        inherit_child_attributes: bool,
327        parent_id: Option<Id>,
328    ) -> Self {
329        Self {
330            task_id: AtomicCell::new(0),
331            stats: ResourceStats::new(created_at, inherit_child_attributes, parent_id),
332            poll_stats: PollStats::default(),
333        }
334    }
335
336    pub(crate) fn task_id(&self) -> Option<u64> {
337        let id = self.task_id.load();
338        if id > 0 {
339            Some(id)
340        } else {
341            None
342        }
343    }
344
345    pub(crate) fn set_task_id(&self, id: &tracing::span::Id) {
346        self.task_id.store(id.into_u64());
347        self.make_dirty();
348    }
349
350    pub(crate) fn drop_async_op(&self, dropped_at: Instant) {
351        self.stats.drop_resource(dropped_at)
352    }
353
354    pub(crate) fn start_poll(&self, at: Instant) {
355        self.poll_stats.start_poll(at);
356        self.make_dirty();
357    }
358
359    pub(crate) fn end_poll(&self, at: Instant) {
360        self.poll_stats.end_poll(at);
361        self.make_dirty();
362    }
363
364    #[inline]
365    fn make_dirty(&self) {
366        self.stats.make_dirty()
367    }
368}
369
370impl Unsent for AsyncOpStats {
371    #[inline]
372    fn take_unsent(&self) -> bool {
373        self.stats.take_unsent()
374    }
375
376    #[inline]
377    fn is_unsent(&self) -> bool {
378        self.stats.is_unsent()
379    }
380}
381
382impl DroppedAt for AsyncOpStats {
383    fn dropped_at(&self) -> Option<Instant> {
384        self.stats.dropped_at()
385    }
386}
387
388impl ToProto for AsyncOpStats {
389    type Output = proto::async_ops::Stats;
390
391    fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
392        let attributes = self.stats.attributes.lock().values().cloned().collect();
393        proto::async_ops::Stats {
394            poll_stats: Some(self.poll_stats.to_proto(base_time)),
395            created_at: Some(base_time.to_timestamp(self.stats.created_at)),
396            dropped_at: self
397                .stats
398                .dropped_at
399                .lock()
400                .map(|at| base_time.to_timestamp(at)),
401            task_id: self.task_id().map(Into::into),
402            attributes,
403        }
404    }
405}
406
407// === impl ResourceStats ===
408
409impl ResourceStats {
410    pub(crate) fn new(
411        created_at: Instant,
412        inherit_child_attributes: bool,
413        parent_id: Option<Id>,
414    ) -> Self {
415        Self {
416            is_dirty: AtomicBool::new(true),
417            is_dropped: AtomicBool::new(false),
418            created_at,
419            dropped_at: Mutex::new(None),
420            attributes: Default::default(),
421            inherit_child_attributes,
422            parent_id,
423        }
424    }
425
426    pub(crate) fn update_attribute(&self, id: &Id, update: &attribute::Update) {
427        self.attributes.lock().update(id, update);
428        self.make_dirty();
429    }
430
431    #[inline]
432    pub(crate) fn drop_resource(&self, dropped_at: Instant) {
433        if self.is_dropped.swap(true, AcqRel) {
434            // The task was already dropped.
435            // TODO(eliza): this could maybe panic in debug mode...
436            return;
437        }
438
439        let mut timestamp = self.dropped_at.lock();
440        let _prev = timestamp.replace(dropped_at);
441        debug_assert_eq!(
442            _prev, None,
443            "tried to drop a resource/async op twice; this is a bug!"
444        );
445        self.make_dirty();
446    }
447
448    #[inline]
449    fn make_dirty(&self) {
450        self.is_dirty.swap(true, AcqRel);
451    }
452}
453
454impl Unsent for ResourceStats {
455    #[inline]
456    fn take_unsent(&self) -> bool {
457        self.is_dirty.swap(false, AcqRel)
458    }
459
460    fn is_unsent(&self) -> bool {
461        self.is_dirty.load(Acquire)
462    }
463}
464
465impl DroppedAt for ResourceStats {
466    fn dropped_at(&self) -> Option<Instant> {
467        // avoid acquiring the lock if we know we haven't tried to drop this
468        // thing yet
469        if self.is_dropped.load(Acquire) {
470            return *self.dropped_at.lock();
471        }
472
473        None
474    }
475}
476
477impl ToProto for ResourceStats {
478    type Output = proto::resources::Stats;
479
480    fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
481        let attributes = self.attributes.lock().values().cloned().collect();
482        proto::resources::Stats {
483            created_at: Some(base_time.to_timestamp(self.created_at)),
484            dropped_at: self.dropped_at.lock().map(|at| base_time.to_timestamp(at)),
485            attributes,
486        }
487    }
488}
489
490// === impl PollStats ===
491
492impl<H: RecordDuration> PollStats<H> {
493    fn wake(&self, at: Instant) {
494        let mut timestamps = self.timestamps.lock();
495        timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
496    }
497
498    fn start_poll(&self, at: Instant) {
499        if self.current_polls.fetch_add(1, AcqRel) > 0 {
500            return;
501        }
502
503        // We are starting the first poll
504        let mut timestamps = self.timestamps.lock();
505        if timestamps.first_poll.is_none() {
506            timestamps.first_poll = Some(at);
507        }
508
509        timestamps.last_poll_started = Some(at);
510
511        self.polls.fetch_add(1, Release);
512
513        // If the last poll ended after the last wake then it was likely
514        // a self-wake, so we measure from the end of the last poll instead.
515        // This also ensures that `busy_time` and `scheduled_time` don't overlap.
516        let scheduled = match std::cmp::max(timestamps.last_wake, timestamps.last_poll_ended) {
517            Some(scheduled) => scheduled,
518            None => return, // Async operations record polls, but not wakes
519        };
520
521        // `at < scheduled` is possible when a task switches threads between polls.
522        let elapsed = at.saturating_duration_since(scheduled);
523
524        // if we have a scheduled time histogram, add the timestamp
525        timestamps.scheduled_histogram.record_duration(elapsed);
526
527        timestamps.scheduled_time += elapsed;
528    }
529
530    fn end_poll(&self, at: Instant) {
531        // Are we ending the last current poll?
532        if self.current_polls.fetch_sub(1, AcqRel) > 1 {
533            return;
534        }
535
536        let mut timestamps = self.timestamps.lock();
537        let started = match timestamps.last_poll_started {
538            Some(last_poll) => last_poll,
539            None => {
540                eprintln!(
541                    "a poll ended, but start timestamp was recorded. \
542                     this is probably a `console-subscriber` bug"
543                );
544                return;
545            }
546        };
547
548        timestamps.last_poll_ended = Some(at);
549        let elapsed = match at.checked_duration_since(started) {
550            Some(elapsed) => elapsed,
551            None => {
552                eprintln!(
553                    "possible Instant clock skew detected: a poll's end timestamp \
554                    was before its start timestamp\nstart = {:?}\n  end = {:?}",
555                    started, at
556                );
557                return;
558            }
559        };
560
561        // if we have a poll time histogram, add the timestamp
562        timestamps.poll_histogram.record_duration(elapsed);
563
564        timestamps.busy_time += elapsed;
565    }
566}
567
568impl<H> ToProto for PollStats<H> {
569    type Output = proto::PollStats;
570
571    fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
572        let timestamps = self.timestamps.lock();
573        proto::PollStats {
574            polls: self.polls.load(Acquire) as u64,
575            first_poll: timestamps.first_poll.map(|at| base_time.to_timestamp(at)),
576            last_poll_started: timestamps
577                .last_poll_started
578                .map(|at| base_time.to_timestamp(at)),
579            last_poll_ended: timestamps
580                .last_poll_ended
581                .map(|at| base_time.to_timestamp(at)),
582            busy_time: Some(timestamps.busy_time.try_into().unwrap_or_else(|error| {
583                eprintln!(
584                    "failed to convert `busy_time` to protobuf duration: {}",
585                    error
586                );
587                Default::default()
588            })),
589        }
590    }
591}
592
593// === impl Arc ===
594
595impl<T: DroppedAt> DroppedAt for Arc<T> {
596    fn dropped_at(&self) -> Option<Instant> {
597        T::dropped_at(self)
598    }
599}
600
601impl<T: Unsent> Unsent for Arc<T> {
602    fn take_unsent(&self) -> bool {
603        T::take_unsent(self)
604    }
605
606    fn is_unsent(&self) -> bool {
607        T::is_unsent(self)
608    }
609}
610
611impl<T: ToProto> ToProto for Arc<T> {
612    type Output = T::Output;
613    fn to_proto(&self, base_time: &TimeAnchor) -> T::Output {
614        T::to_proto(self, base_time)
615    }
616}
617
618// === impl Histogram ===
619
620impl Histogram {
621    fn new(max: u64) -> Self {
622        // significant figures should be in the [0-5] range and memory usage
623        // grows exponentially with higher a sigfig
624        let histogram = hdrhistogram::Histogram::new_with_max(max, 2).unwrap();
625        Self {
626            histogram,
627            max,
628            max_outlier: None,
629            outliers: 0,
630        }
631    }
632
633    fn to_proto(&self) -> proto::tasks::DurationHistogram {
634        let mut serializer = V2Serializer::new();
635        let mut raw_histogram = Vec::new();
636        serializer
637            .serialize(&self.histogram, &mut raw_histogram)
638            .expect("histogram failed to serialize");
639        proto::tasks::DurationHistogram {
640            raw_histogram,
641            max_value: self.max,
642            high_outliers: self.outliers,
643            highest_outlier: self.max_outlier,
644        }
645    }
646}
647
648impl RecordDuration for Histogram {
649    fn record_duration(&mut self, duration: Duration) {
650        let mut duration_ns = duration.as_nanos() as u64;
651
652        // clamp the duration to the histogram's max value
653        if duration_ns > self.max {
654            self.outliers += 1;
655            self.max_outlier = cmp::max(self.max_outlier, Some(duration_ns));
656            duration_ns = self.max;
657        }
658
659        self.histogram
660            .record(duration_ns)
661            .expect("duration has already been clamped to histogram max value")
662    }
663}
664
665impl RecordDuration for () {
666    fn record_duration(&mut self, _: Duration) {
667        // do nothing
668    }
669}