1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering::*},
4 Arc,
5 },
6 time::{Duration, Instant},
7};
8
9use console_api as proto;
10use prost::Message;
11use proto::resources::resource;
12use tokio::sync::{mpsc, Notify};
13use tracing_core::{span::Id, Metadata};
14
15use super::{Command, Event, Shared, Watch};
16use crate::{
17 stats::{self, Unsent},
18 ToProto, WatchRequest,
19};
20
21mod id_data;
22mod shrink;
23use self::id_data::{IdData, Include};
24use self::shrink::{ShrinkMap, ShrinkVec};
25
26const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
28
29pub struct Aggregator {
36 events: mpsc::Receiver<Event>,
38
39 rpcs: mpsc::Receiver<Command>,
41
42 publish_interval: Duration,
44
45 retention: Duration,
47
48 shared: Arc<Shared>,
51
52 watchers: ShrinkVec<Watch<proto::instrument::Update>>,
54
55 details_watchers: ShrinkMap<Id, Vec<Watch<proto::tasks::TaskDetails>>>,
57
58 all_metadata: ShrinkVec<proto::register_metadata::NewMetadata>,
62
63 new_metadata: Vec<proto::register_metadata::NewMetadata>,
67
68 tasks: IdData<Task>,
70
71 task_stats: IdData<Arc<stats::TaskStats>>,
73
74 resources: IdData<Resource>,
76
77 resource_stats: IdData<Arc<stats::ResourceStats>>,
79
80 async_ops: IdData<AsyncOp>,
82
83 async_op_stats: IdData<Arc<stats::AsyncOpStats>>,
85
86 poll_ops: Vec<proto::resources::PollOp>,
90
91 temporality: Temporality,
93
94 base_time: stats::TimeAnchor,
97}
98
99#[derive(Debug, Default)]
100pub(crate) struct Flush {
101 pub(crate) should_flush: Notify,
102 triggered: AtomicBool,
103}
104
105#[derive(Debug)]
106enum Temporality {
107 Live,
108 Paused,
109}
110struct Resource {
112 id: Id,
113 is_dirty: AtomicBool,
114 parent_id: Option<Id>,
115 metadata: &'static Metadata<'static>,
116 concrete_type: String,
117 kind: resource::Kind,
118 location: Option<proto::Location>,
119 is_internal: bool,
120}
121
122struct Task {
124 id: Id,
125 is_dirty: AtomicBool,
126 metadata: &'static Metadata<'static>,
127 fields: Vec<proto::Field>,
128 location: Option<proto::Location>,
129}
130
131struct AsyncOp {
132 id: Id,
133 is_dirty: AtomicBool,
134 parent_id: Option<Id>,
135 resource_id: Id,
136 metadata: &'static Metadata<'static>,
137 source: String,
138}
139
140impl Aggregator {
141 pub(crate) fn new(
142 events: mpsc::Receiver<Event>,
143 rpcs: mpsc::Receiver<Command>,
144 builder: &crate::Builder,
145 shared: Arc<crate::Shared>,
146 base_time: stats::TimeAnchor,
147 ) -> Self {
148 Self {
149 shared,
150 rpcs,
151 publish_interval: builder.publish_interval,
152 retention: builder.retention,
153 events,
154 watchers: Default::default(),
155 details_watchers: Default::default(),
156 all_metadata: Default::default(),
157 new_metadata: Default::default(),
158 tasks: IdData::default(),
159 task_stats: IdData::default(),
160 resources: IdData::default(),
161 resource_stats: IdData::default(),
162 async_ops: IdData::default(),
163 async_op_stats: IdData::default(),
164 poll_ops: Default::default(),
165 temporality: Temporality::Live,
166 base_time,
167 }
168 }
169
170 pub async fn run(mut self) {
176 let mut publish = tokio::time::interval(self.publish_interval);
177 loop {
178 let should_send = tokio::select! {
179 _ = publish.tick() => {
181 match self.temporality {
182 Temporality::Live => true,
183 Temporality::Paused => false,
184 }
185 }
186
187 _ = self.shared.flush.should_flush.notified() => {
189 tracing::debug!("approaching capacity; draining buffer");
190 false
191 }
192
193 cmd = self.rpcs.recv() => {
195 match cmd {
196 Some(Command::Instrument(subscription)) => {
197 self.add_instrument_subscription(subscription);
198 },
199 Some(Command::WatchTaskDetail(watch_request)) => {
200 self.add_task_detail_subscription(watch_request);
201 },
202 Some(Command::Pause) => {
203 self.temporality = Temporality::Paused;
204 }
205 Some(Command::Resume) => {
206 self.temporality = Temporality::Live;
207 }
208 None => {
209 tracing::debug!("rpc channel closed, terminating");
210 return;
211 }
212 };
213
214 false
215 }
216
217 };
218
219 let mut drained = false;
228 let mut counts = EventCounts::new();
229 while let Some(event) = recv_now_or_never(&mut self.events) {
230 match event {
231 Some(event) => {
232 counts.update(&event);
233 self.update_state(event);
234 drained = true;
235 }
236 None => {
239 tracing::debug!("event channel closed; terminating");
240 return;
241 }
242 };
243 }
244 tracing::debug!(
245 async_resource_ops = counts.async_resource_op,
246 metadatas = counts.metadata,
247 poll_ops = counts.poll_op,
248 resources = counts.resource,
249 spawns = counts.spawn,
250 total = counts.total(),
251 "event channel drain loop",
252 );
253
254 if !self.watchers.is_empty() && should_send {
257 self.publish();
258 }
259 self.cleanup_closed();
260 if drained {
261 self.shared.flush.has_flushed();
262 }
263 }
264 }
265
266 fn cleanup_closed(&mut self) {
267 let now = Instant::now();
270 let has_watchers = !self.watchers.is_empty();
271 self.tasks
272 .drop_closed(&mut self.task_stats, now, self.retention, has_watchers);
273 self.resources
274 .drop_closed(&mut self.resource_stats, now, self.retention, has_watchers);
275 self.async_ops
276 .drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers);
277 if !has_watchers {
278 self.poll_ops.clear();
279 }
280 }
281
282 fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
284 tracing::debug!("new instrument subscription");
285 let now = Instant::now();
286
287 let update = loop {
288 let update = proto::instrument::Update {
289 task_update: Some(self.task_update(Include::All)),
290 resource_update: Some(self.resource_update(Include::All)),
291 async_op_update: Some(self.async_op_update(Include::All)),
292 now: Some(self.base_time.to_timestamp(now)),
293 new_metadata: Some(proto::RegisterMetadata {
294 metadata: (*self.all_metadata).clone(),
295 }),
296 };
297 let message_size = update.encoded_len();
298 if message_size < MAX_MESSAGE_SIZE {
299 break Some(update);
301 }
302 self.retention /= 2;
305 self.cleanup_closed();
306 tracing::debug!(
307 retention = ?self.retention,
308 message_size,
309 max_message_size = MAX_MESSAGE_SIZE,
310 "Message too big, reduced retention",
311 );
312
313 if self.retention <= self.publish_interval {
314 self.retention = self.publish_interval;
315 break None;
316 }
317 };
318
319 match update {
320 Some(update) => {
322 if !subscription.update(&update) {
323 return;
326 }
327 }
328 None => tracing::error!(
330 min_retention = ?self.publish_interval,
331 "Message too big. Start with smaller retention.",
332 ),
333 }
334
335 self.watchers.push(subscription);
336 }
337
338 fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {
339 proto::tasks::TaskUpdate {
340 new_tasks: self.tasks.as_proto_list(include, &self.base_time),
341 stats_update: self.task_stats.as_proto(include, &self.base_time),
342 dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
343 }
344 }
345
346 fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
347 proto::resources::ResourceUpdate {
348 new_resources: self.resources.as_proto_list(include, &self.base_time),
349 stats_update: self.resource_stats.as_proto(include, &self.base_time),
350 new_poll_ops: std::mem::take(&mut self.poll_ops),
351 dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
352 }
353 }
354
355 fn async_op_update(&mut self, include: Include) -> proto::async_ops::AsyncOpUpdate {
356 proto::async_ops::AsyncOpUpdate {
357 new_async_ops: self.async_ops.as_proto_list(include, &self.base_time),
358 stats_update: self.async_op_stats.as_proto(include, &self.base_time),
359 dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
360 }
361 }
362
363 fn add_task_detail_subscription(
366 &mut self,
367 watch_request: WatchRequest<proto::tasks::TaskDetails>,
368 ) {
369 let WatchRequest {
370 id,
371 stream_sender,
372 buffer,
373 } = watch_request;
374 tracing::debug!(id = ?id, "new task details subscription");
375 if let Some(stats) = self.task_stats.get(&id) {
376 let (tx, rx) = mpsc::channel(buffer);
377 let subscription = Watch(tx);
378 let now = Some(self.base_time.to_timestamp(Instant::now()));
379 if stream_sender.send(rx).is_ok()
382 && subscription.update(&proto::tasks::TaskDetails {
383 task_id: Some(id.clone().into()),
384 now,
385 poll_times_histogram: Some(stats.poll_duration_histogram()),
386 scheduled_times_histogram: Some(stats.scheduled_duration_histogram()),
387 })
388 {
389 self.details_watchers
390 .entry(id.clone())
391 .or_default()
392 .push(subscription);
393 }
394 }
395 }
397
398 fn publish(&mut self) {
403 let new_metadata = if !self.new_metadata.is_empty() {
404 Some(proto::RegisterMetadata {
405 metadata: std::mem::take(&mut self.new_metadata),
406 })
407 } else {
408 None
409 };
410 let task_update = Some(self.task_update(Include::UpdatedOnly));
411 let resource_update = Some(self.resource_update(Include::UpdatedOnly));
412 let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));
413
414 let update = proto::instrument::Update {
415 now: Some(self.base_time.to_timestamp(Instant::now())),
416 new_metadata,
417 task_update,
418 resource_update,
419 async_op_update,
420 };
421
422 self.watchers
423 .retain_and_shrink(|watch: &Watch<proto::instrument::Update>| watch.update(&update));
424
425 let stats = &self.task_stats;
426 self.details_watchers.retain_and_shrink(|id, watchers| {
429 if let Some(task_stats) = stats.get(id) {
430 let details = proto::tasks::TaskDetails {
431 task_id: Some(id.clone().into()),
432 now: Some(self.base_time.to_timestamp(Instant::now())),
433 poll_times_histogram: Some(task_stats.poll_duration_histogram()),
434 scheduled_times_histogram: Some(task_stats.scheduled_duration_histogram()),
435 };
436 watchers.retain(|watch| watch.update(&details));
437 !watchers.is_empty()
438 } else {
439 false
440 }
441 });
442 }
443
444 fn update_state(&mut self, event: Event) {
446 match event {
448 Event::Metadata(meta) => {
449 self.all_metadata.push(meta.into());
450 self.new_metadata.push(meta.into());
451 }
452
453 Event::Spawn {
454 id,
455 metadata,
456 stats,
457 fields,
458 location,
459 } => {
460 self.tasks.insert(
461 id.clone(),
462 Task {
463 id: id.clone(),
464 is_dirty: AtomicBool::new(true),
465 metadata,
466 fields,
467 location,
468 },
470 );
471
472 self.task_stats.insert(id, stats);
473 }
474
475 Event::Resource {
476 id,
477 parent_id,
478 metadata,
479 kind,
480 concrete_type,
481 location,
482 is_internal,
483 stats,
484 } => {
485 self.resources.insert(
486 id.clone(),
487 Resource {
488 id: id.clone(),
489 is_dirty: AtomicBool::new(true),
490 parent_id,
491 kind,
492 metadata,
493 concrete_type,
494 location,
495 is_internal,
496 },
497 );
498
499 self.resource_stats.insert(id, stats);
500 }
501
502 Event::PollOp {
503 metadata,
504 resource_id,
505 op_name,
506 async_op_id,
507 task_id,
508 is_ready,
509 } => {
510 if self.watchers.is_empty() {
512 return;
513 }
514 let poll_op = proto::resources::PollOp {
515 metadata: Some(metadata.into()),
516 resource_id: Some(resource_id.into()),
517 name: op_name,
518 task_id: Some(task_id.into()),
519 async_op_id: Some(async_op_id.into()),
520 is_ready,
521 };
522
523 self.poll_ops.push(poll_op);
524 }
525
526 Event::AsyncResourceOp {
527 id,
528 source,
529 resource_id,
530 metadata,
531 parent_id,
532 stats,
533 } => {
534 self.async_ops.insert(
535 id.clone(),
536 AsyncOp {
537 id: id.clone(),
538 is_dirty: AtomicBool::new(true),
539 resource_id,
540 metadata,
541 source,
542 parent_id,
543 },
544 );
545
546 self.async_op_stats.insert(id, stats);
547 }
548 }
549 }
550}
551
552fn recv_now_or_never<T>(receiver: &mut mpsc::Receiver<T>) -> Option<Option<T>> {
553 let waker = futures_task::noop_waker();
554 let mut cx = std::task::Context::from_waker(&waker);
555
556 match receiver.poll_recv(&mut cx) {
557 std::task::Poll::Ready(opt) => Some(opt),
558 std::task::Poll::Pending => None,
559 }
560}
561
562struct EventCounts {
564 async_resource_op: usize,
565 metadata: usize,
566 poll_op: usize,
567 resource: usize,
568 spawn: usize,
569}
570
571impl EventCounts {
572 fn new() -> Self {
573 Self {
574 async_resource_op: 0,
575 metadata: 0,
576 poll_op: 0,
577 resource: 0,
578 spawn: 0,
579 }
580 }
581
582 fn update(&mut self, event: &Event) {
584 match event {
585 Event::AsyncResourceOp { .. } => self.async_resource_op += 1,
586 Event::Metadata(_) => self.metadata += 1,
587 Event::PollOp { .. } => self.poll_op += 1,
588 Event::Resource { .. } => self.resource += 1,
589 Event::Spawn { .. } => self.spawn += 1,
590 }
591 }
592
593 fn total(&self) -> usize {
595 self.async_resource_op + self.metadata + self.poll_op + self.resource + self.spawn
596 }
597}
598
599impl Flush {
602 pub(crate) fn trigger(&self) {
603 if self
604 .triggered
605 .compare_exchange(false, true, AcqRel, Acquire)
606 .is_ok()
607 {
608 self.should_flush.notify_one();
609 } else {
610 }
612 }
613
614 fn has_flushed(&self) {
616 let _ = self
617 .triggered
618 .compare_exchange(true, false, AcqRel, Acquire);
619 }
620}
621
622impl<T: Clone> Watch<T> {
623 fn update(&self, update: &T) -> bool {
624 if let Ok(reserve) = self.0.try_reserve() {
625 reserve.send(Ok(update.clone()));
626 true
627 } else {
628 false
629 }
630 }
631}
632
633impl ToProto for Task {
634 type Output = proto::tasks::Task;
635
636 fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
637 proto::tasks::Task {
638 id: Some(self.id.clone().into()),
639 kind: proto::tasks::task::Kind::Spawn as i32,
641 metadata: Some(self.metadata.into()),
642 parents: Vec::new(), fields: self.fields.clone(),
644 location: self.location.clone(),
645 }
646 }
647}
648
649impl Unsent for Task {
650 fn take_unsent(&self) -> bool {
651 self.is_dirty.swap(false, AcqRel)
652 }
653
654 fn is_unsent(&self) -> bool {
655 self.is_dirty.load(Acquire)
656 }
657}
658
659impl ToProto for Resource {
660 type Output = proto::resources::Resource;
661
662 fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
663 proto::resources::Resource {
664 id: Some(self.id.clone().into()),
665 parent_resource_id: self.parent_id.clone().map(Into::into),
666 kind: Some(self.kind.clone()),
667 metadata: Some(self.metadata.into()),
668 concrete_type: self.concrete_type.clone(),
669 location: self.location.clone(),
670 is_internal: self.is_internal,
671 }
672 }
673}
674
675impl Unsent for Resource {
676 fn take_unsent(&self) -> bool {
677 self.is_dirty.swap(false, AcqRel)
678 }
679
680 fn is_unsent(&self) -> bool {
681 self.is_dirty.load(Acquire)
682 }
683}
684
685impl ToProto for AsyncOp {
686 type Output = proto::async_ops::AsyncOp;
687
688 fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
689 proto::async_ops::AsyncOp {
690 id: Some(self.id.clone().into()),
691 metadata: Some(self.metadata.into()),
692 resource_id: Some(self.resource_id.clone().into()),
693 source: self.source.clone(),
694 parent_async_op_id: self.parent_id.clone().map(Into::into),
695 }
696 }
697}
698
699impl Unsent for AsyncOp {
700 fn take_unsent(&self) -> bool {
701 self.is_dirty.swap(false, AcqRel)
702 }
703
704 fn is_unsent(&self) -> bool {
705 self.is_dirty.load(Acquire)
706 }
707}