mz_storage_controller/
statistics.rs1use std::any::Any;
13use std::collections::{BTreeMap, btree_map};
14use std::fmt::Debug;
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use differential_dataflow::consolidation;
19use differential_dataflow::lattice::Lattice;
20use itertools::Itertools;
21use mz_cluster_client::ReplicaId;
22use mz_ore::now::EpochMillis;
23use mz_persist_types::Codec64;
24use mz_repr::{Diff, TimestampManipulation};
25use mz_repr::{GlobalId, Row};
26use mz_storage_client::statistics::{
27 ControllerSourceStatistics, ExpirableStats, ZeroInitializedStats,
28};
29use mz_storage_client::statistics::{PackableStats, WebhookStatistics};
30use timely::progress::ChangeBatch;
31use timely::progress::Timestamp;
32use tokio::sync::oneshot;
33use tokio::sync::watch::Receiver;
34
35use crate::collection_mgmt::CollectionManager;
36
37pub(super) trait AsStats<Stats> {
39 fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), Stats>;
40 fn as_mut_stats(&mut self) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), Stats>;
41}
42
43impl<Stats> AsStats<Stats> for BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
44 fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
45 self
46 }
47
48 fn as_mut_stats(&mut self) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
49 self
50 }
51}
52
53pub(super) fn spawn_statistics_scraper<StatsWrapper, Stats, T>(
56 statistics_collection_id: GlobalId,
57 collection_mgmt: CollectionManager<T>,
58 shared_stats: Arc<Mutex<StatsWrapper>>,
59 previous_values: Vec<Row>,
60 initial_interval: Duration,
61 mut interval_updated: Receiver<Duration>,
62 statistics_retention_duration: Duration,
63 metrics: mz_storage_client::metrics::StorageControllerMetrics,
64) -> Box<dyn Any + Send + Sync>
65where
66 StatsWrapper: AsStats<Stats> + Debug + Send + 'static,
67 Stats: PackableStats + ExpirableStats + ZeroInitializedStats + Clone + Debug + Send + 'static,
68 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
69{
70 let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
71
72 mz_ore::task::spawn(|| "statistics_scraper", async move {
73 let mut current_metrics = <ChangeBatch<_>>::new();
80
81 let mut correction = Vec::new();
82 {
83 let mut shared_stats = shared_stats.lock().expect("poisoned");
84 for row in previous_values {
85 let (collection_id, replica_id, current_stats) = Stats::unpack(row, &metrics);
86
87 shared_stats
88 .as_mut_stats()
89 .insert((collection_id, replica_id), current_stats);
90 }
91
92 let mut row_buf = Row::default();
93 for (_, stats) in shared_stats.as_stats().iter() {
94 stats.pack(row_buf.packer());
95 correction.push((row_buf.clone(), Diff::ONE));
96 }
97 }
98
99 tracing::debug!(%statistics_collection_id, ?correction, "seeding stats collection");
100 if !correction.is_empty() {
103 current_metrics.extend(correction.iter().map(|(r, d)| (r.clone(), d.into_inner())));
104
105 collection_mgmt.differential_append(statistics_collection_id, correction);
106 }
107
108 let mut interval = tokio::time::interval(initial_interval);
109 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
110
111 loop {
112 tokio::select! {
113 _msg = &mut shutdown_rx => {
114 break;
115 }
116
117 _ = interval_updated.changed() => {
118 let new_interval = *interval_updated.borrow_and_update();
119 if new_interval != interval.period() {
120 interval = tokio::time::interval(new_interval);
121 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
122 }
124 }
125
126 _ = interval.tick() => {
127 let mut row_buf = Row::default();
128 let mut correction = current_metrics
129 .iter()
130 .cloned()
131 .map(|(row, diff)| (row, -diff))
132 .collect_vec();
133
134 {
138 let mut shared_stats = shared_stats.lock().expect("poisoned");
139
140 let now = Instant::now();
141 shared_stats.as_mut_stats().retain(|_key, stat| {
142 let inactive_time = now - stat.last_updated();
143 inactive_time < statistics_retention_duration
144 });
145
146 for (_, stats) in shared_stats.as_mut_stats().iter_mut() {
147 if stats.needs_zero_initialization() {
148 stats.zero_stat().pack(row_buf.packer());
149 stats.mark_zero_initialized();
150 } else {
151 stats.pack(row_buf.packer());
152 }
153 correction.push((row_buf.clone(), 1));
154 }
155 }
156
157 consolidation::consolidate(&mut correction);
158
159 tracing::trace!(%statistics_collection_id, ?correction, "updating stats collection");
160
161 if !correction.is_empty() {
164 current_metrics.extend(correction.iter().cloned());
165 collection_mgmt
166 .differential_append(statistics_collection_id, correction.into_iter().map(|(r, d)| (r, d.into())).collect());
167 }
168 }
169 }
170 }
171
172 tracing::info!("shutting down statistics sender task");
173 });
174
175 Box::new(shutdown_tx)
176}
177
178#[derive(Debug)]
180pub(super) struct SourceStatistics {
181 pub source_statistics: BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics>,
184 pub webhook_statistics: BTreeMap<GlobalId, Arc<WebhookStatistics>>,
189}
190
191impl AsStats<ControllerSourceStatistics> for SourceStatistics {
192 fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics> {
193 &self.source_statistics
194 }
195
196 fn as_mut_stats(
197 &mut self,
198 ) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics> {
199 &mut self.source_statistics
200 }
201}
202
203pub(super) fn spawn_webhook_statistics_scraper(
205 shared_stats: Arc<Mutex<SourceStatistics>>,
206 initial_interval: Duration,
207 mut interval_updated: Receiver<Duration>,
208) -> Box<dyn Any + Send + Sync> {
209 let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
210
211 mz_ore::task::spawn(|| "webhook_statistics_scraper", async move {
212 let mut interval = tokio::time::interval(initial_interval);
213 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
214
215 loop {
216 tokio::select! {
217 _msg = &mut shutdown_rx => {
218 break;
219 }
220
221 _ = interval_updated.changed() => {
222 let new_interval = *interval_updated.borrow_and_update();
223 if new_interval != interval.period() {
224 interval = tokio::time::interval(new_interval);
225 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
226 }
228 },
229
230 _ = interval.tick() => {
231 let mut shared_stats = shared_stats.lock().expect("poisoned");
232 let shared_stats = &mut *shared_stats;
233
234 for (id, ws) in shared_stats.webhook_statistics.iter() {
235 let entry = shared_stats
236 .source_statistics
237 .entry((*id, None));
238
239 let update = ws.drain_into_update(*id);
240
241 match entry {
242 btree_map::Entry::Vacant(vacant_entry) => {
243 let mut stats = ControllerSourceStatistics::new(*id, None);
244 stats.incorporate(update);
245 vacant_entry.insert(stats);
246 }
247 btree_map::Entry::Occupied(mut occupied_entry) => {
248 occupied_entry.get_mut().incorporate(update);
249 }
250 }
251 }
252 }
253 }
254 }
255
256 tracing::info!("shutting down statistics sender task");
257 });
258
259 Box::new(shutdown_tx)
260}