1use crate::{attribute, sync::Mutex, ToProto};
2use crossbeam_utils::atomic::AtomicCell;
3use hdrhistogram::{
4 self,
5 serialization::{Serializer, V2Serializer},
6};
7use std::cmp;
8use std::sync::{
9 atomic::{AtomicBool, AtomicUsize, Ordering::*},
10 Arc,
11};
12use std::time::{Duration, Instant, SystemTime};
13use tracing::span::Id;
14
15use console_api as proto;
16
17pub(crate) trait Unsent {
23 fn take_unsent(&self) -> bool;
30
31 fn is_unsent(&self) -> bool;
34}
35
36pub(crate) trait DroppedAt {
41 fn dropped_at(&self) -> Option<Instant>;
42}
43
44#[derive(Debug, Clone)]
47pub(crate) struct TimeAnchor {
48 mono: Instant,
49 sys: SystemTime,
50}
51
52#[derive(Debug)]
54pub(crate) struct TaskStats {
55 is_dirty: AtomicBool,
56 is_dropped: AtomicBool,
57 pub(crate) created_at: Instant,
59 dropped_at: Mutex<Option<Instant>>,
60
61 wakes: AtomicUsize,
63 waker_clones: AtomicUsize,
64 waker_drops: AtomicUsize,
65 self_wakes: AtomicUsize,
66
67 poll_stats: PollStats<Histogram>,
69}
70
71#[derive(Debug)]
77pub(crate) struct AsyncOpStats {
78 task_id: AtomicCell<u64>,
83
84 pub(crate) stats: ResourceStats,
86
87 poll_stats: PollStats<()>,
89}
90
91#[derive(Debug)]
93pub(crate) struct ResourceStats {
94 is_dirty: AtomicBool,
95 is_dropped: AtomicBool,
96 created_at: Instant,
97 dropped_at: Mutex<Option<Instant>>,
98 attributes: Mutex<attribute::Attributes>,
99 pub(crate) inherit_child_attributes: bool,
100 pub(crate) parent_id: Option<Id>,
101}
102
103#[derive(Debug, Default)]
104struct PollStats<H> {
105 current_polls: AtomicUsize,
107 polls: AtomicUsize,
109 timestamps: Mutex<PollTimestamps<H>>,
110}
111
112#[derive(Debug, Default)]
113struct PollTimestamps<H> {
114 first_poll: Option<Instant>,
115 last_wake: Option<Instant>,
116 last_poll_started: Option<Instant>,
117 last_poll_ended: Option<Instant>,
118 busy_time: Duration,
119 scheduled_time: Duration,
120 poll_histogram: H,
121 scheduled_histogram: H,
122}
123
124#[derive(Debug)]
125struct Histogram {
126 histogram: hdrhistogram::Histogram<u64>,
127 max: u64,
128 outliers: u64,
129 max_outlier: Option<u64>,
130}
131
132trait RecordDuration {
133 fn record_duration(&mut self, duration: Duration);
134}
135
136impl TimeAnchor {
137 pub(crate) fn new() -> Self {
138 Self {
139 mono: Instant::now(),
140 sys: SystemTime::now(),
141 }
142 }
143
144 pub(crate) fn to_system_time(&self, t: Instant) -> SystemTime {
145 let dur = t
146 .checked_duration_since(self.mono)
147 .unwrap_or_else(|| Duration::from_secs(0));
148 self.sys + dur
149 }
150
151 pub(crate) fn to_timestamp(&self, t: Instant) -> prost_types::Timestamp {
152 self.to_system_time(t).into()
153 }
154}
155
156impl TaskStats {
157 pub(crate) fn new(
158 poll_duration_max: u64,
159 scheduled_duration_max: u64,
160 created_at: Instant,
161 ) -> Self {
162 Self {
163 is_dirty: AtomicBool::new(true),
164 is_dropped: AtomicBool::new(false),
165 created_at,
166 dropped_at: Mutex::new(None),
167 poll_stats: PollStats {
168 timestamps: Mutex::new(PollTimestamps {
169 poll_histogram: Histogram::new(poll_duration_max),
170 scheduled_histogram: Histogram::new(scheduled_duration_max),
171 first_poll: None,
172 last_wake: None,
173 last_poll_started: None,
174 last_poll_ended: None,
175 busy_time: Duration::new(0, 0),
176 scheduled_time: Duration::new(0, 0),
177 }),
178 current_polls: AtomicUsize::new(0),
179 polls: AtomicUsize::new(0),
180 },
181 wakes: AtomicUsize::new(0),
182 waker_clones: AtomicUsize::new(0),
183 waker_drops: AtomicUsize::new(0),
184 self_wakes: AtomicUsize::new(0),
185 }
186 }
187
188 pub(crate) fn record_wake_op(&self, op: crate::WakeOp, at: Instant) {
189 use crate::WakeOp;
190 match op {
191 WakeOp::Clone => {
192 self.waker_clones.fetch_add(1, Release);
193 }
194 WakeOp::Drop => {
195 self.waker_drops.fetch_add(1, Release);
196 }
197 WakeOp::WakeByRef { self_wake } => self.wake(at, self_wake),
198 WakeOp::Wake { self_wake } => {
199 self.waker_drops.fetch_add(1, Release);
208
209 self.wake(at, self_wake)
210 }
211 }
212 self.make_dirty();
213 }
214
215 fn wake(&self, at: Instant, self_wake: bool) {
216 self.poll_stats.wake(at);
217
218 self.wakes.fetch_add(1, Release);
219 if self_wake {
220 self.self_wakes.fetch_add(1, Release);
221 }
222
223 self.make_dirty();
224 }
225
226 pub(crate) fn start_poll(&self, at: Instant) {
227 self.poll_stats.start_poll(at);
228 self.make_dirty();
229 }
230
231 pub(crate) fn end_poll(&self, at: Instant) {
232 self.poll_stats.end_poll(at);
233 self.make_dirty();
234 }
235
236 pub(crate) fn drop_task(&self, dropped_at: Instant) {
237 if self.is_dropped.swap(true, AcqRel) {
238 return;
241 }
242
243 let _prev = self.dropped_at.lock().replace(dropped_at);
244 debug_assert_eq!(_prev, None, "tried to drop a task twice; this is a bug!");
245 self.make_dirty();
246 }
247
248 pub(crate) fn poll_duration_histogram(&self) -> proto::tasks::task_details::PollTimesHistogram {
249 let hist = self.poll_stats.timestamps.lock().poll_histogram.to_proto();
250 proto::tasks::task_details::PollTimesHistogram::Histogram(hist)
251 }
252
253 pub(crate) fn scheduled_duration_histogram(&self) -> proto::tasks::DurationHistogram {
254 self.poll_stats
255 .timestamps
256 .lock()
257 .scheduled_histogram
258 .to_proto()
259 }
260
261 #[inline]
262 fn make_dirty(&self) {
263 self.is_dirty.swap(true, AcqRel);
264 }
265}
266
267impl ToProto for TaskStats {
268 type Output = proto::tasks::Stats;
269
270 fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
271 let poll_stats = Some(self.poll_stats.to_proto(base_time));
272 let timestamps = self.poll_stats.timestamps.lock();
273 proto::tasks::Stats {
274 poll_stats,
275 created_at: Some(base_time.to_timestamp(self.created_at)),
276 dropped_at: self.dropped_at.lock().map(|at| base_time.to_timestamp(at)),
277 wakes: self.wakes.load(Acquire) as u64,
278 waker_clones: self.waker_clones.load(Acquire) as u64,
279 self_wakes: self.self_wakes.load(Acquire) as u64,
280 waker_drops: self.waker_drops.load(Acquire) as u64,
281 last_wake: timestamps.last_wake.map(|at| base_time.to_timestamp(at)),
282 scheduled_time: Some(
283 timestamps
284 .scheduled_time
285 .try_into()
286 .unwrap_or_else(|error| {
287 eprintln!(
288 "failed to convert `scheduled_time` to protobuf duration: {}",
289 error
290 );
291 Default::default()
292 }),
293 ),
294 }
295 }
296}
297
298impl Unsent for TaskStats {
299 #[inline]
300 fn take_unsent(&self) -> bool {
301 self.is_dirty.swap(false, AcqRel)
302 }
303
304 fn is_unsent(&self) -> bool {
305 self.is_dirty.load(Acquire)
306 }
307}
308
309impl DroppedAt for TaskStats {
310 fn dropped_at(&self) -> Option<Instant> {
311 if self.is_dropped.load(Acquire) {
314 return *self.dropped_at.lock();
315 }
316
317 None
318 }
319}
320
321impl AsyncOpStats {
324 pub(crate) fn new(
325 created_at: Instant,
326 inherit_child_attributes: bool,
327 parent_id: Option<Id>,
328 ) -> Self {
329 Self {
330 task_id: AtomicCell::new(0),
331 stats: ResourceStats::new(created_at, inherit_child_attributes, parent_id),
332 poll_stats: PollStats::default(),
333 }
334 }
335
336 pub(crate) fn task_id(&self) -> Option<u64> {
337 let id = self.task_id.load();
338 if id > 0 {
339 Some(id)
340 } else {
341 None
342 }
343 }
344
345 pub(crate) fn set_task_id(&self, id: &tracing::span::Id) {
346 self.task_id.store(id.into_u64());
347 self.make_dirty();
348 }
349
350 pub(crate) fn drop_async_op(&self, dropped_at: Instant) {
351 self.stats.drop_resource(dropped_at)
352 }
353
354 pub(crate) fn start_poll(&self, at: Instant) {
355 self.poll_stats.start_poll(at);
356 self.make_dirty();
357 }
358
359 pub(crate) fn end_poll(&self, at: Instant) {
360 self.poll_stats.end_poll(at);
361 self.make_dirty();
362 }
363
364 #[inline]
365 fn make_dirty(&self) {
366 self.stats.make_dirty()
367 }
368}
369
370impl Unsent for AsyncOpStats {
371 #[inline]
372 fn take_unsent(&self) -> bool {
373 self.stats.take_unsent()
374 }
375
376 #[inline]
377 fn is_unsent(&self) -> bool {
378 self.stats.is_unsent()
379 }
380}
381
382impl DroppedAt for AsyncOpStats {
383 fn dropped_at(&self) -> Option<Instant> {
384 self.stats.dropped_at()
385 }
386}
387
388impl ToProto for AsyncOpStats {
389 type Output = proto::async_ops::Stats;
390
391 fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
392 let attributes = self.stats.attributes.lock().values().cloned().collect();
393 proto::async_ops::Stats {
394 poll_stats: Some(self.poll_stats.to_proto(base_time)),
395 created_at: Some(base_time.to_timestamp(self.stats.created_at)),
396 dropped_at: self
397 .stats
398 .dropped_at
399 .lock()
400 .map(|at| base_time.to_timestamp(at)),
401 task_id: self.task_id().map(Into::into),
402 attributes,
403 }
404 }
405}
406
407impl ResourceStats {
410 pub(crate) fn new(
411 created_at: Instant,
412 inherit_child_attributes: bool,
413 parent_id: Option<Id>,
414 ) -> Self {
415 Self {
416 is_dirty: AtomicBool::new(true),
417 is_dropped: AtomicBool::new(false),
418 created_at,
419 dropped_at: Mutex::new(None),
420 attributes: Default::default(),
421 inherit_child_attributes,
422 parent_id,
423 }
424 }
425
426 pub(crate) fn update_attribute(&self, id: &Id, update: &attribute::Update) {
427 self.attributes.lock().update(id, update);
428 self.make_dirty();
429 }
430
431 #[inline]
432 pub(crate) fn drop_resource(&self, dropped_at: Instant) {
433 if self.is_dropped.swap(true, AcqRel) {
434 return;
437 }
438
439 let mut timestamp = self.dropped_at.lock();
440 let _prev = timestamp.replace(dropped_at);
441 debug_assert_eq!(
442 _prev, None,
443 "tried to drop a resource/async op twice; this is a bug!"
444 );
445 self.make_dirty();
446 }
447
448 #[inline]
449 fn make_dirty(&self) {
450 self.is_dirty.swap(true, AcqRel);
451 }
452}
453
454impl Unsent for ResourceStats {
455 #[inline]
456 fn take_unsent(&self) -> bool {
457 self.is_dirty.swap(false, AcqRel)
458 }
459
460 fn is_unsent(&self) -> bool {
461 self.is_dirty.load(Acquire)
462 }
463}
464
465impl DroppedAt for ResourceStats {
466 fn dropped_at(&self) -> Option<Instant> {
467 if self.is_dropped.load(Acquire) {
470 return *self.dropped_at.lock();
471 }
472
473 None
474 }
475}
476
477impl ToProto for ResourceStats {
478 type Output = proto::resources::Stats;
479
480 fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
481 let attributes = self.attributes.lock().values().cloned().collect();
482 proto::resources::Stats {
483 created_at: Some(base_time.to_timestamp(self.created_at)),
484 dropped_at: self.dropped_at.lock().map(|at| base_time.to_timestamp(at)),
485 attributes,
486 }
487 }
488}
489
490impl<H: RecordDuration> PollStats<H> {
493 fn wake(&self, at: Instant) {
494 let mut timestamps = self.timestamps.lock();
495 timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
496 }
497
498 fn start_poll(&self, at: Instant) {
499 if self.current_polls.fetch_add(1, AcqRel) > 0 {
500 return;
501 }
502
503 let mut timestamps = self.timestamps.lock();
505 if timestamps.first_poll.is_none() {
506 timestamps.first_poll = Some(at);
507 }
508
509 timestamps.last_poll_started = Some(at);
510
511 self.polls.fetch_add(1, Release);
512
513 let scheduled = match std::cmp::max(timestamps.last_wake, timestamps.last_poll_ended) {
517 Some(scheduled) => scheduled,
518 None => return, };
520
521 let elapsed = at.saturating_duration_since(scheduled);
523
524 timestamps.scheduled_histogram.record_duration(elapsed);
526
527 timestamps.scheduled_time += elapsed;
528 }
529
530 fn end_poll(&self, at: Instant) {
531 if self.current_polls.fetch_sub(1, AcqRel) > 1 {
533 return;
534 }
535
536 let mut timestamps = self.timestamps.lock();
537 let started = match timestamps.last_poll_started {
538 Some(last_poll) => last_poll,
539 None => {
540 eprintln!(
541 "a poll ended, but start timestamp was recorded. \
542 this is probably a `console-subscriber` bug"
543 );
544 return;
545 }
546 };
547
548 timestamps.last_poll_ended = Some(at);
549 let elapsed = match at.checked_duration_since(started) {
550 Some(elapsed) => elapsed,
551 None => {
552 eprintln!(
553 "possible Instant clock skew detected: a poll's end timestamp \
554 was before its start timestamp\nstart = {:?}\n end = {:?}",
555 started, at
556 );
557 return;
558 }
559 };
560
561 timestamps.poll_histogram.record_duration(elapsed);
563
564 timestamps.busy_time += elapsed;
565 }
566}
567
568impl<H> ToProto for PollStats<H> {
569 type Output = proto::PollStats;
570
571 fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
572 let timestamps = self.timestamps.lock();
573 proto::PollStats {
574 polls: self.polls.load(Acquire) as u64,
575 first_poll: timestamps.first_poll.map(|at| base_time.to_timestamp(at)),
576 last_poll_started: timestamps
577 .last_poll_started
578 .map(|at| base_time.to_timestamp(at)),
579 last_poll_ended: timestamps
580 .last_poll_ended
581 .map(|at| base_time.to_timestamp(at)),
582 busy_time: Some(timestamps.busy_time.try_into().unwrap_or_else(|error| {
583 eprintln!(
584 "failed to convert `busy_time` to protobuf duration: {}",
585 error
586 );
587 Default::default()
588 })),
589 }
590 }
591}
592
593impl<T: DroppedAt> DroppedAt for Arc<T> {
596 fn dropped_at(&self) -> Option<Instant> {
597 T::dropped_at(self)
598 }
599}
600
601impl<T: Unsent> Unsent for Arc<T> {
602 fn take_unsent(&self) -> bool {
603 T::take_unsent(self)
604 }
605
606 fn is_unsent(&self) -> bool {
607 T::is_unsent(self)
608 }
609}
610
611impl<T: ToProto> ToProto for Arc<T> {
612 type Output = T::Output;
613 fn to_proto(&self, base_time: &TimeAnchor) -> T::Output {
614 T::to_proto(self, base_time)
615 }
616}
617
618impl Histogram {
621 fn new(max: u64) -> Self {
622 let histogram = hdrhistogram::Histogram::new_with_max(max, 2).unwrap();
625 Self {
626 histogram,
627 max,
628 max_outlier: None,
629 outliers: 0,
630 }
631 }
632
633 fn to_proto(&self) -> proto::tasks::DurationHistogram {
634 let mut serializer = V2Serializer::new();
635 let mut raw_histogram = Vec::new();
636 serializer
637 .serialize(&self.histogram, &mut raw_histogram)
638 .expect("histogram failed to serialize");
639 proto::tasks::DurationHistogram {
640 raw_histogram,
641 max_value: self.max,
642 high_outliers: self.outliers,
643 highest_outlier: self.max_outlier,
644 }
645 }
646}
647
648impl RecordDuration for Histogram {
649 fn record_duration(&mut self, duration: Duration) {
650 let mut duration_ns = duration.as_nanos() as u64;
651
652 if duration_ns > self.max {
654 self.outliers += 1;
655 self.max_outlier = cmp::max(self.max_outlier, Some(duration_ns));
656 duration_ns = self.max;
657 }
658
659 self.histogram
660 .record(duration_ns)
661 .expect("duration has already been clamped to histogram max value")
662 }
663}
664
665impl RecordDuration for () {
666 fn record_duration(&mut self, _: Duration) {
667 }
669}