console_subscriber/aggregator/
mod.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, Ordering::*},
4        Arc,
5    },
6    time::{Duration, Instant},
7};
8
9use console_api as proto;
10use prost::Message;
11use proto::resources::resource;
12use tokio::sync::{mpsc, Notify};
13use tracing_core::{span::Id, Metadata};
14
15use super::{Command, Event, Shared, Watch};
16use crate::{
17    stats::{self, Unsent},
18    ToProto, WatchRequest,
19};
20
21mod id_data;
22mod shrink;
23use self::id_data::{IdData, Include};
24use self::shrink::{ShrinkMap, ShrinkVec};
25
26/// Should match tonic's (private) codec::DEFAULT_MAX_RECV_MESSAGE_SIZE
27const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
28
29/// Aggregates instrumentation traces and prepares state for the instrument
30/// server.
31///
32/// The `Aggregator` is responsible for receiving and organizing the
33/// instrumentated events and preparing the data to be served to a instrument
34/// client.
35pub struct Aggregator {
36    /// Channel of incoming events emitted by `TaskLayer`s.
37    events: mpsc::Receiver<Event>,
38
39    /// New incoming RPCs.
40    rpcs: mpsc::Receiver<Command>,
41
42    /// The interval at which new data updates are pushed to clients.
43    publish_interval: Duration,
44
45    /// How long to keep task data after a task has completed.
46    retention: Duration,
47
48    /// Shared state, including a `Notify` that triggers a flush when the event
49    /// buffer is approaching capacity.
50    shared: Arc<Shared>,
51
52    /// Currently active RPCs streaming task events.
53    watchers: ShrinkVec<Watch<proto::instrument::Update>>,
54
55    /// Currently active RPCs streaming task details events, by task ID.
56    details_watchers: ShrinkMap<Id, Vec<Watch<proto::tasks::TaskDetails>>>,
57
58    /// *All* metadata for task spans and user-defined spans that we care about.
59    ///
60    /// This is sent to new clients as part of the initial state.
61    all_metadata: ShrinkVec<proto::register_metadata::NewMetadata>,
62
63    /// *New* metadata that was registered since the last state update.
64    ///
65    /// This is emptied on every state update.
66    new_metadata: Vec<proto::register_metadata::NewMetadata>,
67
68    /// Map of task IDs to task static data.
69    tasks: IdData<Task>,
70
71    /// Map of task IDs to task stats.
72    task_stats: IdData<Arc<stats::TaskStats>>,
73
74    /// Map of resource IDs to resource static data.
75    resources: IdData<Resource>,
76
77    /// Map of resource IDs to resource stats.
78    resource_stats: IdData<Arc<stats::ResourceStats>>,
79
80    /// Map of AsyncOp IDs to AsyncOp static data.
81    async_ops: IdData<AsyncOp>,
82
83    /// Map of AsyncOp IDs to AsyncOp stats.
84    async_op_stats: IdData<Arc<stats::AsyncOpStats>>,
85
86    /// `PollOp `events that have occurred since the last update
87    ///
88    /// This is emptied on every state update.
89    poll_ops: Vec<proto::resources::PollOp>,
90
91    /// The time "state" of the aggregator, such as paused or live.
92    temporality: Temporality,
93
94    /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
95    /// timestamp that can be sent over the wire.
96    base_time: stats::TimeAnchor,
97}
98
99#[derive(Debug, Default)]
100pub(crate) struct Flush {
101    pub(crate) should_flush: Notify,
102    triggered: AtomicBool,
103}
104
105#[derive(Debug)]
106enum Temporality {
107    Live,
108    Paused,
109}
110// Represent static data for resources
111struct Resource {
112    id: Id,
113    is_dirty: AtomicBool,
114    parent_id: Option<Id>,
115    metadata: &'static Metadata<'static>,
116    concrete_type: String,
117    kind: resource::Kind,
118    location: Option<proto::Location>,
119    is_internal: bool,
120}
121
122/// Represents static data for tasks
123struct Task {
124    id: Id,
125    is_dirty: AtomicBool,
126    metadata: &'static Metadata<'static>,
127    fields: Vec<proto::Field>,
128    location: Option<proto::Location>,
129}
130
131struct AsyncOp {
132    id: Id,
133    is_dirty: AtomicBool,
134    parent_id: Option<Id>,
135    resource_id: Id,
136    metadata: &'static Metadata<'static>,
137    source: String,
138}
139
140impl Aggregator {
141    pub(crate) fn new(
142        events: mpsc::Receiver<Event>,
143        rpcs: mpsc::Receiver<Command>,
144        builder: &crate::Builder,
145        shared: Arc<crate::Shared>,
146        base_time: stats::TimeAnchor,
147    ) -> Self {
148        Self {
149            shared,
150            rpcs,
151            publish_interval: builder.publish_interval,
152            retention: builder.retention,
153            events,
154            watchers: Default::default(),
155            details_watchers: Default::default(),
156            all_metadata: Default::default(),
157            new_metadata: Default::default(),
158            tasks: IdData::default(),
159            task_stats: IdData::default(),
160            resources: IdData::default(),
161            resource_stats: IdData::default(),
162            async_ops: IdData::default(),
163            async_op_stats: IdData::default(),
164            poll_ops: Default::default(),
165            temporality: Temporality::Live,
166            base_time,
167        }
168    }
169
170    /// Runs the aggregator.
171    ///
172    /// This method will start the aggregator loop and should run as long as
173    /// the instrument server is running. If the instrument server stops,
174    /// this future can be aborted.
175    pub async fn run(mut self) {
176        let mut publish = tokio::time::interval(self.publish_interval);
177        loop {
178            let should_send = tokio::select! {
179                // if the flush interval elapses, flush data to the client
180                _ = publish.tick() => {
181                    match self.temporality {
182                        Temporality::Live => true,
183                        Temporality::Paused => false,
184                    }
185                }
186
187                // triggered when the event buffer is approaching capacity
188                _ = self.shared.flush.should_flush.notified() => {
189                    tracing::debug!("approaching capacity; draining buffer");
190                    false
191                }
192
193                // a new command from a client
194                cmd = self.rpcs.recv() => {
195                    match cmd {
196                        Some(Command::Instrument(subscription)) => {
197                            self.add_instrument_subscription(subscription);
198                        },
199                        Some(Command::WatchTaskDetail(watch_request)) => {
200                            self.add_task_detail_subscription(watch_request);
201                        },
202                        Some(Command::Pause) => {
203                            self.temporality = Temporality::Paused;
204                        }
205                        Some(Command::Resume) => {
206                            self.temporality = Temporality::Live;
207                        }
208                        None => {
209                            tracing::debug!("rpc channel closed, terminating");
210                            return;
211                        }
212                    };
213
214                    false
215                }
216
217            };
218
219            // drain and aggregate buffered events.
220            //
221            // Note: we *don't* want to actually await the call to `recv` --- we
222            // don't want the aggregator task to be woken on every event,
223            // because it will then be woken when its own `poll` calls are
224            // exited. that would result in a busy-loop. instead, we only want
225            // to be woken when the flush interval has elapsed, or when the
226            // channel is almost full.
227            let mut drained = false;
228            let mut counts = EventCounts::new();
229            while let Some(event) = recv_now_or_never(&mut self.events) {
230                match event {
231                    Some(event) => {
232                        counts.update(&event);
233                        self.update_state(event);
234                        drained = true;
235                    }
236                    // The channel closed, no more events will be emitted...time
237                    // to stop aggregating.
238                    None => {
239                        tracing::debug!("event channel closed; terminating");
240                        return;
241                    }
242                };
243            }
244            tracing::debug!(
245                async_resource_ops = counts.async_resource_op,
246                metadatas = counts.metadata,
247                poll_ops = counts.poll_op,
248                resources = counts.resource,
249                spawns = counts.spawn,
250                total = counts.total(),
251                "event channel drain loop",
252            );
253
254            // flush data to clients, if there are any currently subscribed
255            // watchers and we should send a new update.
256            if !self.watchers.is_empty() && should_send {
257                self.publish();
258            }
259            self.cleanup_closed();
260            if drained {
261                self.shared.flush.has_flushed();
262            }
263        }
264    }
265
266    fn cleanup_closed(&mut self) {
267        // drop all closed have that has completed *and* whose final data has already
268        // been sent off.
269        let now = Instant::now();
270        let has_watchers = !self.watchers.is_empty();
271        self.tasks
272            .drop_closed(&mut self.task_stats, now, self.retention, has_watchers);
273        self.resources
274            .drop_closed(&mut self.resource_stats, now, self.retention, has_watchers);
275        self.async_ops
276            .drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers);
277        if !has_watchers {
278            self.poll_ops.clear();
279        }
280    }
281
282    /// Add the task subscription to the watchers after sending the first update
283    fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
284        tracing::debug!("new instrument subscription");
285        let now = Instant::now();
286
287        let update = loop {
288            let update = proto::instrument::Update {
289                task_update: Some(self.task_update(Include::All)),
290                resource_update: Some(self.resource_update(Include::All)),
291                async_op_update: Some(self.async_op_update(Include::All)),
292                now: Some(self.base_time.to_timestamp(now)),
293                new_metadata: Some(proto::RegisterMetadata {
294                    metadata: (*self.all_metadata).clone(),
295                }),
296            };
297            let message_size = update.encoded_len();
298            if message_size < MAX_MESSAGE_SIZE {
299                // normal case
300                break Some(update);
301            }
302            // If the grpc message is bigger than tokio-console will accept, throw away the oldest
303            // inactive data and try again
304            self.retention /= 2;
305            self.cleanup_closed();
306            tracing::debug!(
307                retention = ?self.retention,
308                message_size,
309                max_message_size = MAX_MESSAGE_SIZE,
310                "Message too big, reduced retention",
311            );
312
313            if self.retention <= self.publish_interval {
314                self.retention = self.publish_interval;
315                break None;
316            }
317        };
318
319        match update {
320            // Send the initial state
321            Some(update) => {
322                if !subscription.update(&update) {
323                    // If sending the initial update fails, the subscription is already dead,
324                    // so don't add it to `watchers`.
325                    return;
326                }
327            }
328            // User will only get updates.
329            None => tracing::error!(
330                min_retention = ?self.publish_interval,
331                "Message too big. Start with smaller retention.",
332            ),
333        }
334
335        self.watchers.push(subscription);
336    }
337
338    fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {
339        proto::tasks::TaskUpdate {
340            new_tasks: self.tasks.as_proto_list(include, &self.base_time),
341            stats_update: self.task_stats.as_proto(include, &self.base_time),
342            dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
343        }
344    }
345
346    fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
347        proto::resources::ResourceUpdate {
348            new_resources: self.resources.as_proto_list(include, &self.base_time),
349            stats_update: self.resource_stats.as_proto(include, &self.base_time),
350            new_poll_ops: std::mem::take(&mut self.poll_ops),
351            dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
352        }
353    }
354
355    fn async_op_update(&mut self, include: Include) -> proto::async_ops::AsyncOpUpdate {
356        proto::async_ops::AsyncOpUpdate {
357            new_async_ops: self.async_ops.as_proto_list(include, &self.base_time),
358            stats_update: self.async_op_stats.as_proto(include, &self.base_time),
359            dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
360        }
361    }
362
363    /// Add the task details subscription to the watchers after sending the first update,
364    /// if the task is found.
365    fn add_task_detail_subscription(
366        &mut self,
367        watch_request: WatchRequest<proto::tasks::TaskDetails>,
368    ) {
369        let WatchRequest {
370            id,
371            stream_sender,
372            buffer,
373        } = watch_request;
374        tracing::debug!(id = ?id, "new task details subscription");
375        if let Some(stats) = self.task_stats.get(&id) {
376            let (tx, rx) = mpsc::channel(buffer);
377            let subscription = Watch(tx);
378            let now = Some(self.base_time.to_timestamp(Instant::now()));
379            // Send back the stream receiver.
380            // Then send the initial state --- if this fails, the subscription is already dead.
381            if stream_sender.send(rx).is_ok()
382                && subscription.update(&proto::tasks::TaskDetails {
383                    task_id: Some(id.clone().into()),
384                    now,
385                    poll_times_histogram: Some(stats.poll_duration_histogram()),
386                    scheduled_times_histogram: Some(stats.scheduled_duration_histogram()),
387                })
388            {
389                self.details_watchers
390                    .entry(id.clone())
391                    .or_default()
392                    .push(subscription);
393            }
394        }
395        // If the task is not found, drop `stream_sender` which will result in a not found error
396    }
397
398    /// Publish the current state to all active watchers.
399    ///
400    /// This drops any watchers which have closed the RPC, or whose update
401    /// channel has filled up.
402    fn publish(&mut self) {
403        let new_metadata = if !self.new_metadata.is_empty() {
404            Some(proto::RegisterMetadata {
405                metadata: std::mem::take(&mut self.new_metadata),
406            })
407        } else {
408            None
409        };
410        let task_update = Some(self.task_update(Include::UpdatedOnly));
411        let resource_update = Some(self.resource_update(Include::UpdatedOnly));
412        let async_op_update = Some(self.async_op_update(Include::UpdatedOnly));
413
414        let update = proto::instrument::Update {
415            now: Some(self.base_time.to_timestamp(Instant::now())),
416            new_metadata,
417            task_update,
418            resource_update,
419            async_op_update,
420        };
421
422        self.watchers
423            .retain_and_shrink(|watch: &Watch<proto::instrument::Update>| watch.update(&update));
424
425        let stats = &self.task_stats;
426        // Assuming there are much fewer task details subscribers than there are
427        // stats updates, iterate over `details_watchers` and compact the map.
428        self.details_watchers.retain_and_shrink(|id, watchers| {
429            if let Some(task_stats) = stats.get(id) {
430                let details = proto::tasks::TaskDetails {
431                    task_id: Some(id.clone().into()),
432                    now: Some(self.base_time.to_timestamp(Instant::now())),
433                    poll_times_histogram: Some(task_stats.poll_duration_histogram()),
434                    scheduled_times_histogram: Some(task_stats.scheduled_duration_histogram()),
435                };
436                watchers.retain(|watch| watch.update(&details));
437                !watchers.is_empty()
438            } else {
439                false
440            }
441        });
442    }
443
444    /// Update the current state with data from a single event.
445    fn update_state(&mut self, event: Event) {
446        // do state update
447        match event {
448            Event::Metadata(meta) => {
449                self.all_metadata.push(meta.into());
450                self.new_metadata.push(meta.into());
451            }
452
453            Event::Spawn {
454                id,
455                metadata,
456                stats,
457                fields,
458                location,
459            } => {
460                self.tasks.insert(
461                    id.clone(),
462                    Task {
463                        id: id.clone(),
464                        is_dirty: AtomicBool::new(true),
465                        metadata,
466                        fields,
467                        location,
468                        // TODO: parents
469                    },
470                );
471
472                self.task_stats.insert(id, stats);
473            }
474
475            Event::Resource {
476                id,
477                parent_id,
478                metadata,
479                kind,
480                concrete_type,
481                location,
482                is_internal,
483                stats,
484            } => {
485                self.resources.insert(
486                    id.clone(),
487                    Resource {
488                        id: id.clone(),
489                        is_dirty: AtomicBool::new(true),
490                        parent_id,
491                        kind,
492                        metadata,
493                        concrete_type,
494                        location,
495                        is_internal,
496                    },
497                );
498
499                self.resource_stats.insert(id, stats);
500            }
501
502            Event::PollOp {
503                metadata,
504                resource_id,
505                op_name,
506                async_op_id,
507                task_id,
508                is_ready,
509            } => {
510                // CLI doesn't show historical poll ops, so don't save them if no-one is watching
511                if self.watchers.is_empty() {
512                    return;
513                }
514                let poll_op = proto::resources::PollOp {
515                    metadata: Some(metadata.into()),
516                    resource_id: Some(resource_id.into()),
517                    name: op_name,
518                    task_id: Some(task_id.into()),
519                    async_op_id: Some(async_op_id.into()),
520                    is_ready,
521                };
522
523                self.poll_ops.push(poll_op);
524            }
525
526            Event::AsyncResourceOp {
527                id,
528                source,
529                resource_id,
530                metadata,
531                parent_id,
532                stats,
533            } => {
534                self.async_ops.insert(
535                    id.clone(),
536                    AsyncOp {
537                        id: id.clone(),
538                        is_dirty: AtomicBool::new(true),
539                        resource_id,
540                        metadata,
541                        source,
542                        parent_id,
543                    },
544                );
545
546                self.async_op_stats.insert(id, stats);
547            }
548        }
549    }
550}
551
552fn recv_now_or_never<T>(receiver: &mut mpsc::Receiver<T>) -> Option<Option<T>> {
553    let waker = futures_task::noop_waker();
554    let mut cx = std::task::Context::from_waker(&waker);
555
556    match receiver.poll_recv(&mut cx) {
557        std::task::Poll::Ready(opt) => Some(opt),
558        std::task::Poll::Pending => None,
559    }
560}
561
562/// Count of events received in each aggregator drain cycle.
563struct EventCounts {
564    async_resource_op: usize,
565    metadata: usize,
566    poll_op: usize,
567    resource: usize,
568    spawn: usize,
569}
570
571impl EventCounts {
572    fn new() -> Self {
573        Self {
574            async_resource_op: 0,
575            metadata: 0,
576            poll_op: 0,
577            resource: 0,
578            spawn: 0,
579        }
580    }
581
582    /// Count the event based on its variant.
583    fn update(&mut self, event: &Event) {
584        match event {
585            Event::AsyncResourceOp { .. } => self.async_resource_op += 1,
586            Event::Metadata(_) => self.metadata += 1,
587            Event::PollOp { .. } => self.poll_op += 1,
588            Event::Resource { .. } => self.resource += 1,
589            Event::Spawn { .. } => self.spawn += 1,
590        }
591    }
592
593    /// Total number of events recorded.
594    fn total(&self) -> usize {
595        self.async_resource_op + self.metadata + self.poll_op + self.resource + self.spawn
596    }
597}
598
599// ==== impl Flush ===
600
601impl Flush {
602    pub(crate) fn trigger(&self) {
603        if self
604            .triggered
605            .compare_exchange(false, true, AcqRel, Acquire)
606            .is_ok()
607        {
608            self.should_flush.notify_one();
609        } else {
610            // someone else already did it, that's fine...
611        }
612    }
613
614    /// Indicates that the buffer has been successfully flushed.
615    fn has_flushed(&self) {
616        let _ = self
617            .triggered
618            .compare_exchange(true, false, AcqRel, Acquire);
619    }
620}
621
622impl<T: Clone> Watch<T> {
623    fn update(&self, update: &T) -> bool {
624        if let Ok(reserve) = self.0.try_reserve() {
625            reserve.send(Ok(update.clone()));
626            true
627        } else {
628            false
629        }
630    }
631}
632
633impl ToProto for Task {
634    type Output = proto::tasks::Task;
635
636    fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
637        proto::tasks::Task {
638            id: Some(self.id.clone().into()),
639            // TODO: more kinds of tasks...
640            kind: proto::tasks::task::Kind::Spawn as i32,
641            metadata: Some(self.metadata.into()),
642            parents: Vec::new(), // TODO: implement parents nicely
643            fields: self.fields.clone(),
644            location: self.location.clone(),
645        }
646    }
647}
648
649impl Unsent for Task {
650    fn take_unsent(&self) -> bool {
651        self.is_dirty.swap(false, AcqRel)
652    }
653
654    fn is_unsent(&self) -> bool {
655        self.is_dirty.load(Acquire)
656    }
657}
658
659impl ToProto for Resource {
660    type Output = proto::resources::Resource;
661
662    fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
663        proto::resources::Resource {
664            id: Some(self.id.clone().into()),
665            parent_resource_id: self.parent_id.clone().map(Into::into),
666            kind: Some(self.kind.clone()),
667            metadata: Some(self.metadata.into()),
668            concrete_type: self.concrete_type.clone(),
669            location: self.location.clone(),
670            is_internal: self.is_internal,
671        }
672    }
673}
674
675impl Unsent for Resource {
676    fn take_unsent(&self) -> bool {
677        self.is_dirty.swap(false, AcqRel)
678    }
679
680    fn is_unsent(&self) -> bool {
681        self.is_dirty.load(Acquire)
682    }
683}
684
685impl ToProto for AsyncOp {
686    type Output = proto::async_ops::AsyncOp;
687
688    fn to_proto(&self, _: &stats::TimeAnchor) -> Self::Output {
689        proto::async_ops::AsyncOp {
690            id: Some(self.id.clone().into()),
691            metadata: Some(self.metadata.into()),
692            resource_id: Some(self.resource_id.clone().into()),
693            source: self.source.clone(),
694            parent_async_op_id: self.parent_id.clone().map(Into::into),
695        }
696    }
697}
698
699impl Unsent for AsyncOp {
700    fn take_unsent(&self) -> bool {
701        self.is_dirty.swap(false, AcqRel)
702    }
703
704    fn is_unsent(&self) -> bool {
705        self.is_dirty.load(Acquire)
706    }
707}