mz_storage_controller/
statistics.rs1use std::any::Any;
13use std::collections::{BTreeMap, btree_map};
14use std::fmt::Debug;
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use differential_dataflow::consolidation;
19use differential_dataflow::lattice::Lattice;
20use itertools::Itertools;
21use mz_cluster_client::ReplicaId;
22use mz_ore::now::EpochMillis;
23use mz_persist_types::Codec64;
24use mz_repr::{Diff, TimestampManipulation};
25use mz_repr::{GlobalId, Row};
26use mz_storage_client::statistics::{
27    ControllerSourceStatistics, ExpirableStats, ZeroInitializedStats,
28};
29use mz_storage_client::statistics::{PackableStats, WebhookStatistics};
30use timely::progress::ChangeBatch;
31use timely::progress::Timestamp;
32use tokio::sync::oneshot;
33use tokio::sync::watch::Receiver;
34
35use crate::collection_mgmt::CollectionManager;
36
37pub(super) trait AsStats<Stats> {
39    fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), Stats>;
40    fn as_mut_stats(&mut self) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), Stats>;
41}
42
43impl<Stats> AsStats<Stats> for BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
44    fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
45        self
46    }
47
48    fn as_mut_stats(&mut self) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
49        self
50    }
51}
52
53pub(super) fn spawn_statistics_scraper<StatsWrapper, Stats, T>(
56    statistics_collection_id: GlobalId,
57    collection_mgmt: CollectionManager<T>,
58    shared_stats: Arc<Mutex<StatsWrapper>>,
59    previous_values: Vec<Row>,
60    initial_interval: Duration,
61    mut interval_updated: Receiver<Duration>,
62    statistics_retention_duration: Duration,
63    metrics: mz_storage_client::metrics::StorageControllerMetrics,
64) -> Box<dyn Any + Send + Sync>
65where
66    StatsWrapper: AsStats<Stats> + Debug + Send + 'static,
67    Stats: PackableStats + ExpirableStats + ZeroInitializedStats + Clone + Debug + Send + 'static,
68    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
69{
70    let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
71
72    mz_ore::task::spawn(|| "statistics_scraper", async move {
73        let mut current_metrics = <ChangeBatch<_>>::new();
80
81        let mut correction = Vec::new();
82        {
83            let mut shared_stats = shared_stats.lock().expect("poisoned");
84            for row in previous_values {
85                let (collection_id, replica_id, current_stats) = Stats::unpack(row, &metrics);
86
87                shared_stats
88                    .as_mut_stats()
89                    .insert((collection_id, replica_id), current_stats);
90            }
91
92            let mut row_buf = Row::default();
93            for (_, stats) in shared_stats.as_stats().iter() {
94                stats.pack(row_buf.packer());
95                correction.push((row_buf.clone(), Diff::ONE));
96            }
97        }
98
99        tracing::debug!(%statistics_collection_id, ?correction, "seeding stats collection");
100        if !correction.is_empty() {
103            current_metrics.extend(correction.iter().map(|(r, d)| (r.clone(), d.into_inner())));
104
105            collection_mgmt.differential_append(statistics_collection_id, correction);
106        }
107
108        let mut interval = tokio::time::interval(initial_interval);
109        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
110
111        loop {
112            tokio::select! {
113                _msg = &mut shutdown_rx => {
114                    break;
115                }
116
117               _ = interval_updated.changed() => {
118                    let new_interval = *interval_updated.borrow_and_update();
119                    if new_interval != interval.period() {
120                        interval = tokio::time::interval(new_interval);
121                        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
122                        }
124                }
125
126                _ = interval.tick() => {
127                    let mut row_buf = Row::default();
128                    let mut correction = current_metrics
129                        .iter()
130                        .cloned()
131                        .map(|(row, diff)| (row, -diff))
132                        .collect_vec();
133
134                    {
138                        let mut shared_stats = shared_stats.lock().expect("poisoned");
139
140                        let now = Instant::now();
141                        shared_stats.as_mut_stats().retain(|_key, stat| {
142                            let inactive_time = now - stat.last_updated();
143                            inactive_time < statistics_retention_duration
144                        });
145
146                        for (_, stats) in shared_stats.as_mut_stats().iter_mut() {
147                            if stats.needs_zero_initialization() {
148                                stats.zero_stat().pack(row_buf.packer());
149                                stats.mark_zero_initialized();
150                            } else {
151                                stats.pack(row_buf.packer());
152                            }
153                            correction.push((row_buf.clone(), 1));
154                        }
155                    }
156
157                    consolidation::consolidate(&mut correction);
158
159                    tracing::trace!(%statistics_collection_id, ?correction, "updating stats collection");
160
161                    if !correction.is_empty() {
164                        current_metrics.extend(correction.iter().cloned());
165                        collection_mgmt
166                            .differential_append(statistics_collection_id, correction.into_iter().map(|(r, d)| (r, d.into())).collect());
167                    }
168                }
169            }
170        }
171
172        tracing::info!("shutting down statistics sender task");
173    });
174
175    Box::new(shutdown_tx)
176}
177
178#[derive(Debug)]
180pub(super) struct SourceStatistics {
181    pub source_statistics: BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics>,
184    pub webhook_statistics: BTreeMap<GlobalId, Arc<WebhookStatistics>>,
189}
190
191impl AsStats<ControllerSourceStatistics> for SourceStatistics {
192    fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics> {
193        &self.source_statistics
194    }
195
196    fn as_mut_stats(
197        &mut self,
198    ) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics> {
199        &mut self.source_statistics
200    }
201}
202
203pub(super) fn spawn_webhook_statistics_scraper(
205    shared_stats: Arc<Mutex<SourceStatistics>>,
206    initial_interval: Duration,
207    mut interval_updated: Receiver<Duration>,
208) -> Box<dyn Any + Send + Sync> {
209    let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
210
211    mz_ore::task::spawn(|| "webhook_statistics_scraper", async move {
212        let mut interval = tokio::time::interval(initial_interval);
213        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
214
215        loop {
216            tokio::select! {
217                _msg = &mut shutdown_rx => {
218                    break;
219                }
220
221               _ = interval_updated.changed() => {
222                    let new_interval = *interval_updated.borrow_and_update();
223                    if new_interval != interval.period() {
224                        interval = tokio::time::interval(new_interval);
225                        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
226                        }
228                },
229
230                _ = interval.tick() => {
231                    let mut shared_stats = shared_stats.lock().expect("poisoned");
232                    let shared_stats = &mut *shared_stats;
233
234                    for (id, ws) in shared_stats.webhook_statistics.iter() {
235                        let entry = shared_stats
236                            .source_statistics
237                            .entry((*id, None));
238
239                        let update = ws.drain_into_update(*id);
240
241                        match entry {
242                            btree_map::Entry::Vacant(vacant_entry) => {
243                                let mut stats = ControllerSourceStatistics::new(*id, None);
244                                stats.incorporate(update);
245                                vacant_entry.insert(stats);
246                            }
247                            btree_map::Entry::Occupied(mut occupied_entry) => {
248                                occupied_entry.get_mut().incorporate(update);
249                            }
250                        }
251                    }
252                }
253            }
254        }
255
256        tracing::info!("shutting down statistics sender task");
257    });
258
259    Box::new(shutdown_tx)
260}