console_subscriber/aggregator/
id_data.rs

1use super::{shrink::ShrinkMap, Id, ToProto};
2use crate::stats::{DroppedAt, TimeAnchor, Unsent};
3use std::collections::HashMap;
4use std::time::{Duration, Instant};
5
6pub(crate) struct IdData<T> {
7    data: ShrinkMap<Id, T>,
8}
9
10#[derive(Copy, Clone, Eq, PartialEq)]
11pub(crate) enum Include {
12    All,
13    UpdatedOnly,
14}
15
16// === impl IdData ===
17
18impl<T> Default for IdData<T> {
19    fn default() -> Self {
20        IdData {
21            data: ShrinkMap::<Id, T>::new(),
22        }
23    }
24}
25
26impl<T: Unsent> IdData<T> {
27    pub(crate) fn insert(&mut self, id: Id, data: T) {
28        self.data.insert(id, data);
29    }
30
31    pub(crate) fn since_last_update(&mut self) -> impl Iterator<Item = (&Id, &mut T)> {
32        self.data.iter_mut().filter_map(|(id, data)| {
33            if data.take_unsent() {
34                Some((id, data))
35            } else {
36                None
37            }
38        })
39    }
40
41    pub(crate) fn all(&self) -> impl Iterator<Item = (&Id, &T)> {
42        self.data.iter()
43    }
44
45    pub(crate) fn get(&self, id: &Id) -> Option<&T> {
46        self.data.get(id)
47    }
48
49    pub(crate) fn as_proto_list(
50        &mut self,
51        include: Include,
52        base_time: &TimeAnchor,
53    ) -> Vec<T::Output>
54    where
55        T: ToProto,
56    {
57        match include {
58            Include::UpdatedOnly => self
59                .since_last_update()
60                .map(|(_, d)| d.to_proto(base_time))
61                .collect(),
62            Include::All => self.all().map(|(_, d)| d.to_proto(base_time)).collect(),
63        }
64    }
65
66    pub(crate) fn as_proto(
67        &mut self,
68        include: Include,
69        base_time: &TimeAnchor,
70    ) -> HashMap<u64, T::Output>
71    where
72        T: ToProto,
73    {
74        match include {
75            Include::UpdatedOnly => self
76                .since_last_update()
77                .map(|(id, d)| (id.into_u64(), d.to_proto(base_time)))
78                .collect(),
79            Include::All => self
80                .all()
81                .map(|(id, d)| (id.into_u64(), d.to_proto(base_time)))
82                .collect(),
83        }
84    }
85
86    pub(crate) fn drop_closed<R: DroppedAt + Unsent>(
87        &mut self,
88        stats: &mut IdData<R>,
89        now: Instant,
90        retention: Duration,
91        has_watchers: bool,
92    ) {
93        let _span = tracing::debug_span!(
94            "drop_closed",
95            entity = %std::any::type_name::<T>(),
96            stats = %std::any::type_name::<R>(),
97        )
98        .entered();
99
100        // drop closed entities
101        tracing::trace!(?retention, has_watchers, "dropping closed");
102
103        stats.data.retain_and_shrink(|id, stats| {
104            if let Some(dropped_at) = stats.dropped_at() {
105                let dropped_for = now.checked_duration_since(dropped_at).unwrap_or_default();
106                let dirty = stats.is_unsent();
107                let should_retain =
108                        // if there are any clients watching, retain all dirty tasks regardless of age
109                        (dirty && has_watchers)
110                        || dropped_for <= retention;
111                tracing::trace!(
112                    stats.id = ?id,
113                    stats.dropped_at = ?dropped_at,
114                    stats.dropped_for = ?dropped_for,
115                    stats.dirty = dirty,
116                    should_retain,
117                );
118                return should_retain;
119            }
120
121            true
122        });
123
124        // drop closed entities which no longer have stats.
125        self.data
126            .retain_and_shrink(|id, _| stats.data.contains_key(id));
127    }
128}