mz_storage_controller/
statistics.rs1use std::any::Any;
13use std::collections::{BTreeMap, btree_map};
14use std::fmt::Debug;
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use differential_dataflow::consolidation;
19use itertools::Itertools;
20use mz_cluster_client::ReplicaId;
21use mz_repr::Diff;
22use mz_repr::{GlobalId, Row};
23use mz_storage_client::statistics::{
24 ControllerSourceStatistics, ExpirableStats, ZeroInitializedStats,
25};
26use mz_storage_client::statistics::{PackableStats, WebhookStatistics};
27use timely::progress::ChangeBatch;
28use tokio::sync::oneshot;
29use tokio::sync::watch::Receiver;
30
31use crate::collection_mgmt::CollectionManager;
32
33pub(super) trait AsStats<Stats> {
35 fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), Stats>;
36 fn as_mut_stats(&mut self) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), Stats>;
37}
38
39impl<Stats> AsStats<Stats> for BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
40 fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
41 self
42 }
43
44 fn as_mut_stats(&mut self) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
45 self
46 }
47}
48
49pub(super) fn spawn_statistics_scraper<StatsWrapper, Stats>(
52 statistics_collection_id: GlobalId,
53 collection_mgmt: CollectionManager,
54 shared_stats: Arc<Mutex<StatsWrapper>>,
55 previous_values: Vec<Row>,
56 initial_interval: Duration,
57 mut interval_updated: Receiver<Duration>,
58 statistics_retention_duration: Duration,
59 metrics: mz_storage_client::metrics::StorageControllerMetrics,
60) -> Box<dyn Any + Send + Sync>
61where
62 StatsWrapper: AsStats<Stats> + Debug + Send + 'static,
63 Stats: PackableStats + ExpirableStats + ZeroInitializedStats + Clone + Debug + Send + 'static,
64{
65 let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
66
67 mz_ore::task::spawn(|| "statistics_scraper", async move {
68 let mut current_metrics = <ChangeBatch<_>>::new();
75
76 let mut correction = Vec::new();
77 {
78 let mut shared_stats = shared_stats.lock().expect("poisoned");
79 for row in previous_values {
80 let (collection_id, replica_id, current_stats) = Stats::unpack(row, &metrics);
81
82 shared_stats
83 .as_mut_stats()
84 .insert((collection_id, replica_id), current_stats);
85 }
86
87 let mut row_buf = Row::default();
88 for (_, stats) in shared_stats.as_stats().iter() {
89 stats.pack(row_buf.packer());
90 correction.push((row_buf.clone(), Diff::ONE));
91 }
92 }
93
94 tracing::debug!(%statistics_collection_id, ?correction, "seeding stats collection");
95 if !correction.is_empty() {
98 current_metrics.extend(correction.iter().map(|(r, d)| (r.clone(), d.into_inner())));
99
100 collection_mgmt.differential_append(statistics_collection_id, correction);
101 }
102
103 let mut interval = tokio::time::interval(initial_interval);
104 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
105
106 loop {
107 tokio::select! {
108 _msg = &mut shutdown_rx => {
109 break;
110 }
111
112 _ = interval_updated.changed() => {
113 let new_interval = *interval_updated.borrow_and_update();
114 if new_interval != interval.period() {
115 interval = tokio::time::interval(new_interval);
116 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
117 }
119 }
120
121 _ = interval.tick() => {
122 let mut row_buf = Row::default();
123 let mut correction = current_metrics
124 .iter()
125 .cloned()
126 .map(|(row, diff)| (row, -diff))
127 .collect_vec();
128
129 {
133 let mut shared_stats = shared_stats.lock().expect("poisoned");
134
135 let now = Instant::now();
136 shared_stats.as_mut_stats().retain(|_key, stat| {
137 let inactive_time = now - stat.last_updated();
138 inactive_time < statistics_retention_duration
139 });
140
141 for (_, stats) in shared_stats.as_mut_stats().iter_mut() {
142 if stats.needs_zero_initialization() {
143 stats.zero_stat().pack(row_buf.packer());
144 stats.mark_zero_initialized();
145 } else {
146 stats.pack(row_buf.packer());
147 }
148 correction.push((row_buf.clone(), 1));
149 }
150 }
151
152 consolidation::consolidate(&mut correction);
153
154 tracing::trace!(%statistics_collection_id, ?correction, "updating stats collection");
155
156 if !correction.is_empty() {
159 current_metrics.extend(correction.iter().cloned());
160 let updates = correction
161 .into_iter()
162 .map(|(r, d)| (r, d.into()))
163 .collect();
164 collection_mgmt.differential_append(
165 statistics_collection_id,
166 updates,
167 );
168 }
169 }
170 }
171 }
172
173 tracing::info!("shutting down statistics sender task");
174 });
175
176 Box::new(shutdown_tx)
177}
178
179#[derive(Debug)]
181pub(super) struct SourceStatistics {
182 pub source_statistics: BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics>,
185 pub webhook_statistics: BTreeMap<GlobalId, Arc<WebhookStatistics>>,
190}
191
192impl AsStats<ControllerSourceStatistics> for SourceStatistics {
193 fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics> {
194 &self.source_statistics
195 }
196
197 fn as_mut_stats(
198 &mut self,
199 ) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics> {
200 &mut self.source_statistics
201 }
202}
203
204pub(super) fn spawn_webhook_statistics_scraper(
206 shared_stats: Arc<Mutex<SourceStatistics>>,
207 initial_interval: Duration,
208 mut interval_updated: Receiver<Duration>,
209) -> Box<dyn Any + Send + Sync> {
210 let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
211
212 mz_ore::task::spawn(|| "webhook_statistics_scraper", async move {
213 let mut interval = tokio::time::interval(initial_interval);
214 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
215
216 loop {
217 tokio::select! {
218 _msg = &mut shutdown_rx => {
219 break;
220 }
221
222 _ = interval_updated.changed() => {
223 let new_interval = *interval_updated.borrow_and_update();
224 if new_interval != interval.period() {
225 interval = tokio::time::interval(new_interval);
226 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
227 }
229 },
230
231 _ = interval.tick() => {
232 let mut shared_stats = shared_stats.lock().expect("poisoned");
233 let shared_stats = &mut *shared_stats;
234
235 for (id, ws) in shared_stats.webhook_statistics.iter() {
236 let entry = shared_stats
237 .source_statistics
238 .entry((*id, None));
239
240 let update = ws.drain_into_update(*id);
241
242 match entry {
243 btree_map::Entry::Vacant(vacant_entry) => {
244 let mut stats = ControllerSourceStatistics::new(*id, None);
245 stats.incorporate(update);
246 vacant_entry.insert(stats);
247 }
248 btree_map::Entry::Occupied(mut occupied_entry) => {
249 occupied_entry.get_mut().incorporate(update);
250 }
251 }
252 }
253 }
254 }
255 }
256
257 tracing::info!("shutting down statistics sender task");
258 });
259
260 Box::new(shutdown_tx)
261}