mz_storage_controller/
statistics.rs1use std::any::Any;
13use std::collections::{BTreeMap, btree_map};
14use std::fmt::Debug;
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use differential_dataflow::consolidation;
19use differential_dataflow::lattice::Lattice;
20use itertools::Itertools;
21use mz_cluster_client::ReplicaId;
22use mz_ore::now::EpochMillis;
23use mz_persist_types::Codec64;
24use mz_repr::{Diff, TimestampManipulation};
25use mz_repr::{GlobalId, Row};
26use mz_storage_client::statistics::{
27 ControllerSourceStatistics, ExpirableStats, ZeroInitializedStats,
28};
29use mz_storage_client::statistics::{PackableStats, WebhookStatistics};
30use timely::progress::ChangeBatch;
31use timely::progress::Timestamp;
32use tokio::sync::oneshot;
33use tokio::sync::watch::Receiver;
34
35use crate::collection_mgmt::CollectionManager;
36
37pub(super) trait AsStats<Stats> {
39 fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), Stats>;
40 fn as_mut_stats(&mut self) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), Stats>;
41}
42
43impl<Stats> AsStats<Stats> for BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
44 fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
45 self
46 }
47
48 fn as_mut_stats(&mut self) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
49 self
50 }
51}
52
53pub(super) fn spawn_statistics_scraper<StatsWrapper, Stats, T>(
56 statistics_collection_id: GlobalId,
57 collection_mgmt: CollectionManager<T>,
58 shared_stats: Arc<Mutex<StatsWrapper>>,
59 previous_values: Vec<Row>,
60 initial_interval: Duration,
61 mut interval_updated: Receiver<Duration>,
62 statistics_retention_duration: Duration,
63 metrics: mz_storage_client::metrics::StorageControllerMetrics,
64) -> Box<dyn Any + Send + Sync>
65where
66 StatsWrapper: AsStats<Stats> + Debug + Send + 'static,
67 Stats: PackableStats + ExpirableStats + ZeroInitializedStats + Clone + Debug + Send + 'static,
68 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
69{
70 let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
71
72 mz_ore::task::spawn(|| "statistics_scraper", async move {
73 let mut current_metrics = <ChangeBatch<_>>::new();
80
81 let mut correction = Vec::new();
82 {
83 let mut shared_stats = shared_stats.lock().expect("poisoned");
84 for row in previous_values {
85 let (collection_id, replica_id, current_stats) = Stats::unpack(row, &metrics);
86
87 shared_stats
88 .as_mut_stats()
89 .insert((collection_id, replica_id), current_stats);
90 }
91
92 let mut row_buf = Row::default();
93 for (_, stats) in shared_stats.as_stats().iter() {
94 stats.pack(row_buf.packer());
95 correction.push((row_buf.clone(), Diff::ONE));
96 }
97 }
98
99 tracing::debug!(%statistics_collection_id, ?correction, "seeding stats collection");
100 if !correction.is_empty() {
103 current_metrics.extend(correction.iter().map(|(r, d)| (r.clone(), d.into_inner())));
104
105 collection_mgmt.differential_append(statistics_collection_id, correction);
106 }
107
108 let mut interval = tokio::time::interval(initial_interval);
109 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
110
111 loop {
112 tokio::select! {
113 _msg = &mut shutdown_rx => {
114 break;
115 }
116
117 _ = interval_updated.changed() => {
118 let new_interval = *interval_updated.borrow_and_update();
119 if new_interval != interval.period() {
120 interval = tokio::time::interval(new_interval);
121 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
122 }
124 }
125
126 _ = interval.tick() => {
127 let mut row_buf = Row::default();
128 let mut correction = current_metrics
129 .iter()
130 .cloned()
131 .map(|(row, diff)| (row, -diff))
132 .collect_vec();
133
134 {
138 let mut shared_stats = shared_stats.lock().expect("poisoned");
139
140 let now = Instant::now();
141 shared_stats.as_mut_stats().retain(|_key, stat| {
142 let inactive_time = now - stat.last_updated();
143 inactive_time < statistics_retention_duration
144 });
145
146 for (_, stats) in shared_stats.as_mut_stats().iter_mut() {
147 if stats.needs_zero_initialization() {
148 stats.zero_stat().pack(row_buf.packer());
149 stats.mark_zero_initialized();
150 } else {
151 stats.pack(row_buf.packer());
152 }
153 correction.push((row_buf.clone(), 1));
154 }
155 }
156
157 consolidation::consolidate(&mut correction);
158
159 tracing::trace!(%statistics_collection_id, ?correction, "updating stats collection");
160
161 if !correction.is_empty() {
164 current_metrics.extend(correction.iter().cloned());
165 let updates = correction
166 .into_iter()
167 .map(|(r, d)| (r, d.into()))
168 .collect();
169 collection_mgmt.differential_append(
170 statistics_collection_id,
171 updates,
172 );
173 }
174 }
175 }
176 }
177
178 tracing::info!("shutting down statistics sender task");
179 });
180
181 Box::new(shutdown_tx)
182}
183
184#[derive(Debug)]
186pub(super) struct SourceStatistics {
187 pub source_statistics: BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics>,
190 pub webhook_statistics: BTreeMap<GlobalId, Arc<WebhookStatistics>>,
195}
196
197impl AsStats<ControllerSourceStatistics> for SourceStatistics {
198 fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics> {
199 &self.source_statistics
200 }
201
202 fn as_mut_stats(
203 &mut self,
204 ) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics> {
205 &mut self.source_statistics
206 }
207}
208
209pub(super) fn spawn_webhook_statistics_scraper(
211 shared_stats: Arc<Mutex<SourceStatistics>>,
212 initial_interval: Duration,
213 mut interval_updated: Receiver<Duration>,
214) -> Box<dyn Any + Send + Sync> {
215 let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
216
217 mz_ore::task::spawn(|| "webhook_statistics_scraper", async move {
218 let mut interval = tokio::time::interval(initial_interval);
219 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
220
221 loop {
222 tokio::select! {
223 _msg = &mut shutdown_rx => {
224 break;
225 }
226
227 _ = interval_updated.changed() => {
228 let new_interval = *interval_updated.borrow_and_update();
229 if new_interval != interval.period() {
230 interval = tokio::time::interval(new_interval);
231 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
232 }
234 },
235
236 _ = interval.tick() => {
237 let mut shared_stats = shared_stats.lock().expect("poisoned");
238 let shared_stats = &mut *shared_stats;
239
240 for (id, ws) in shared_stats.webhook_statistics.iter() {
241 let entry = shared_stats
242 .source_statistics
243 .entry((*id, None));
244
245 let update = ws.drain_into_update(*id);
246
247 match entry {
248 btree_map::Entry::Vacant(vacant_entry) => {
249 let mut stats = ControllerSourceStatistics::new(*id, None);
250 stats.incorporate(update);
251 vacant_entry.insert(stats);
252 }
253 btree_map::Entry::Occupied(mut occupied_entry) => {
254 occupied_entry.get_mut().incorporate(update);
255 }
256 }
257 }
258 }
259 }
260 }
261
262 tracing::info!("shutting down statistics sender task");
263 });
264
265 Box::new(shutdown_tx)
266}