console_subscriber/aggregator/
id_data.rs
1use super::{shrink::ShrinkMap, Id, ToProto};
2use crate::stats::{DroppedAt, TimeAnchor, Unsent};
3use std::collections::HashMap;
4use std::time::{Duration, Instant};
5
6pub(crate) struct IdData<T> {
7 data: ShrinkMap<Id, T>,
8}
9
10#[derive(Copy, Clone, Eq, PartialEq)]
11pub(crate) enum Include {
12 All,
13 UpdatedOnly,
14}
15
16impl<T> Default for IdData<T> {
19 fn default() -> Self {
20 IdData {
21 data: ShrinkMap::<Id, T>::new(),
22 }
23 }
24}
25
26impl<T: Unsent> IdData<T> {
27 pub(crate) fn insert(&mut self, id: Id, data: T) {
28 self.data.insert(id, data);
29 }
30
31 pub(crate) fn since_last_update(&mut self) -> impl Iterator<Item = (&Id, &mut T)> {
32 self.data.iter_mut().filter_map(|(id, data)| {
33 if data.take_unsent() {
34 Some((id, data))
35 } else {
36 None
37 }
38 })
39 }
40
41 pub(crate) fn all(&self) -> impl Iterator<Item = (&Id, &T)> {
42 self.data.iter()
43 }
44
45 pub(crate) fn get(&self, id: &Id) -> Option<&T> {
46 self.data.get(id)
47 }
48
49 pub(crate) fn as_proto_list(
50 &mut self,
51 include: Include,
52 base_time: &TimeAnchor,
53 ) -> Vec<T::Output>
54 where
55 T: ToProto,
56 {
57 match include {
58 Include::UpdatedOnly => self
59 .since_last_update()
60 .map(|(_, d)| d.to_proto(base_time))
61 .collect(),
62 Include::All => self.all().map(|(_, d)| d.to_proto(base_time)).collect(),
63 }
64 }
65
66 pub(crate) fn as_proto(
67 &mut self,
68 include: Include,
69 base_time: &TimeAnchor,
70 ) -> HashMap<u64, T::Output>
71 where
72 T: ToProto,
73 {
74 match include {
75 Include::UpdatedOnly => self
76 .since_last_update()
77 .map(|(id, d)| (id.into_u64(), d.to_proto(base_time)))
78 .collect(),
79 Include::All => self
80 .all()
81 .map(|(id, d)| (id.into_u64(), d.to_proto(base_time)))
82 .collect(),
83 }
84 }
85
86 pub(crate) fn drop_closed<R: DroppedAt + Unsent>(
87 &mut self,
88 stats: &mut IdData<R>,
89 now: Instant,
90 retention: Duration,
91 has_watchers: bool,
92 ) {
93 let _span = tracing::debug_span!(
94 "drop_closed",
95 entity = %std::any::type_name::<T>(),
96 stats = %std::any::type_name::<R>(),
97 )
98 .entered();
99
100 tracing::trace!(?retention, has_watchers, "dropping closed");
102
103 stats.data.retain_and_shrink(|id, stats| {
104 if let Some(dropped_at) = stats.dropped_at() {
105 let dropped_for = now.checked_duration_since(dropped_at).unwrap_or_default();
106 let dirty = stats.is_unsent();
107 let should_retain =
108 (dirty && has_watchers)
110 || dropped_for <= retention;
111 tracing::trace!(
112 stats.id = ?id,
113 stats.dropped_at = ?dropped_at,
114 stats.dropped_for = ?dropped_for,
115 stats.dirty = dirty,
116 should_retain,
117 );
118 return should_retain;
119 }
120
121 true
122 });
123
124 self.data
126 .retain_and_shrink(|id, _| stats.data.contains_key(id));
127 }
128}