Skip to main content

mz_storage_controller/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A tokio task (and support machinery) for producing storage statistics.
11
12use std::any::Any;
13use std::collections::{BTreeMap, btree_map};
14use std::fmt::Debug;
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use differential_dataflow::consolidation;
19use itertools::Itertools;
20use mz_cluster_client::ReplicaId;
21use mz_repr::Diff;
22use mz_repr::{GlobalId, Row};
23use mz_storage_client::statistics::{
24    ControllerSourceStatistics, ExpirableStats, ZeroInitializedStats,
25};
26use mz_storage_client::statistics::{PackableStats, WebhookStatistics};
27use timely::progress::ChangeBatch;
28use tokio::sync::oneshot;
29use tokio::sync::watch::Receiver;
30
31use crate::collection_mgmt::CollectionManager;
32
33/// Conversion trait to allow multiple shapes of data in [`spawn_statistics_scraper`].
34pub(super) trait AsStats<Stats> {
35    fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), Stats>;
36    fn as_mut_stats(&mut self) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), Stats>;
37}
38
39impl<Stats> AsStats<Stats> for BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
40    fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
41        self
42    }
43
44    fn as_mut_stats(&mut self) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), Stats> {
45        self
46    }
47}
48
49/// Spawns a task that continually (at an interval) writes statistics from storaged's
50/// that are consolidated in shared memory in the controller.
51pub(super) fn spawn_statistics_scraper<StatsWrapper, Stats>(
52    statistics_collection_id: GlobalId,
53    collection_mgmt: CollectionManager,
54    shared_stats: Arc<Mutex<StatsWrapper>>,
55    previous_values: Vec<Row>,
56    initial_interval: Duration,
57    mut interval_updated: Receiver<Duration>,
58    statistics_retention_duration: Duration,
59    metrics: mz_storage_client::metrics::StorageControllerMetrics,
60) -> Box<dyn Any + Send + Sync>
61where
62    StatsWrapper: AsStats<Stats> + Debug + Send + 'static,
63    Stats: PackableStats + ExpirableStats + ZeroInitializedStats + Clone + Debug + Send + 'static,
64{
65    let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
66
67    mz_ore::task::spawn(|| "statistics_scraper", async move {
68        // Keep track of what we think is the contents of the output
69        // collection, so that we can emit the required retractions/updates
70        // when we learn about new metrics.
71        //
72        // We assume that `shared_stats` is kept up-to-date (and initialized)
73        // by the controller.
74        let mut current_metrics = <ChangeBatch<_>>::new();
75
76        let mut correction = Vec::new();
77        {
78            let mut shared_stats = shared_stats.lock().expect("poisoned");
79            for row in previous_values {
80                let (collection_id, replica_id, current_stats) = Stats::unpack(row, &metrics);
81
82                shared_stats
83                    .as_mut_stats()
84                    .insert((collection_id, replica_id), current_stats);
85            }
86
87            let mut row_buf = Row::default();
88            for (_, stats) in shared_stats.as_stats().iter() {
89                stats.pack(row_buf.packer());
90                correction.push((row_buf.clone(), Diff::ONE));
91            }
92        }
93
94        tracing::debug!(%statistics_collection_id, ?correction, "seeding stats collection");
95        // Make sure that the desired state matches what is already there, when
96        // we start up!
97        if !correction.is_empty() {
98            current_metrics.extend(correction.iter().map(|(r, d)| (r.clone(), d.into_inner())));
99
100            collection_mgmt.differential_append(statistics_collection_id, correction);
101        }
102
103        let mut interval = tokio::time::interval(initial_interval);
104        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
105
106        loop {
107            tokio::select! {
108                _msg = &mut shutdown_rx => {
109                    break;
110                }
111
112               _ = interval_updated.changed() => {
113                    let new_interval = *interval_updated.borrow_and_update();
114                    if new_interval != interval.period() {
115                        interval = tokio::time::interval(new_interval);
116                        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
117                        // Note that the next interval will tick immediately. This is fine.
118                    }
119                }
120
121                _ = interval.tick() => {
122                    let mut row_buf = Row::default();
123                    let mut correction = current_metrics
124                        .iter()
125                        .cloned()
126                        .map(|(row, diff)| (row, -diff))
127                        .collect_vec();
128
129                    // Ideally we move quickly when holding the lock here, as it can hold
130                    // up the coordinator. Because we are just moving some data around, we should
131                    // be fine!
132                    {
133                        let mut shared_stats = shared_stats.lock().expect("poisoned");
134
135                        let now = Instant::now();
136                        shared_stats.as_mut_stats().retain(|_key, stat| {
137                            let inactive_time = now - stat.last_updated();
138                            inactive_time < statistics_retention_duration
139                        });
140
141                        for (_, stats) in shared_stats.as_mut_stats().iter_mut() {
142                            if stats.needs_zero_initialization() {
143                                stats.zero_stat().pack(row_buf.packer());
144                                stats.mark_zero_initialized();
145                            } else {
146                                stats.pack(row_buf.packer());
147                            }
148                            correction.push((row_buf.clone(), 1));
149                        }
150                    }
151
152                    consolidation::consolidate(&mut correction);
153
154                    tracing::trace!(%statistics_collection_id, ?correction, "updating stats collection");
155
156                    // Update our view of the output collection and write updates
157                    // out to the collection.
158                    if !correction.is_empty() {
159                        current_metrics.extend(correction.iter().cloned());
160                        let updates = correction
161                            .into_iter()
162                            .map(|(r, d)| (r, d.into()))
163                            .collect();
164                        collection_mgmt.differential_append(
165                            statistics_collection_id,
166                            updates,
167                        );
168                    }
169                }
170            }
171        }
172
173        tracing::info!("shutting down statistics sender task");
174    });
175
176    Box::new(shutdown_tx)
177}
178
179/// A wrapper around source and webhook statistics maps so we can hold them within a single lock.
180#[derive(Debug)]
181pub(super) struct SourceStatistics {
182    /// Statistics-per-source that will be emitted to the source statistics table with
183    /// the [`spawn_statistics_scraper`] above.
184    pub source_statistics: BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics>,
185    /// A shared map with atomics for webhook appenders to update the (currently 4)
186    /// statistics that can meaningfully produce. These are periodically
187    /// copied into `source_statistics` [`spawn_webhook_statistics_scraper`] to avoid
188    /// contention.
189    pub webhook_statistics: BTreeMap<GlobalId, Arc<WebhookStatistics>>,
190}
191
192impl AsStats<ControllerSourceStatistics> for SourceStatistics {
193    fn as_stats(&self) -> &BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics> {
194        &self.source_statistics
195    }
196
197    fn as_mut_stats(
198        &mut self,
199    ) -> &mut BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSourceStatistics> {
200        &mut self.source_statistics
201    }
202}
203
204/// Spawns a task that continually drains webhook statistics into `shared_stats.
205pub(super) fn spawn_webhook_statistics_scraper(
206    shared_stats: Arc<Mutex<SourceStatistics>>,
207    initial_interval: Duration,
208    mut interval_updated: Receiver<Duration>,
209) -> Box<dyn Any + Send + Sync> {
210    let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
211
212    mz_ore::task::spawn(|| "webhook_statistics_scraper", async move {
213        let mut interval = tokio::time::interval(initial_interval);
214        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
215
216        loop {
217            tokio::select! {
218                _msg = &mut shutdown_rx => {
219                    break;
220                }
221
222               _ = interval_updated.changed() => {
223                    let new_interval = *interval_updated.borrow_and_update();
224                    if new_interval != interval.period() {
225                        interval = tokio::time::interval(new_interval);
226                        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
227                        // Note that the next interval will tick immediately. This is fine.
228                    }
229                },
230
231                _ = interval.tick() => {
232                    let mut shared_stats = shared_stats.lock().expect("poisoned");
233                    let shared_stats = &mut *shared_stats;
234
235                    for (id, ws) in shared_stats.webhook_statistics.iter() {
236                        let entry = shared_stats
237                            .source_statistics
238                            .entry((*id, None));
239
240                        let update = ws.drain_into_update(*id);
241
242                        match entry {
243                            btree_map::Entry::Vacant(vacant_entry) => {
244                                let mut stats = ControllerSourceStatistics::new(*id, None);
245                                stats.incorporate(update);
246                                vacant_entry.insert(stats);
247                            }
248                            btree_map::Entry::Occupied(mut occupied_entry) => {
249                                occupied_entry.get_mut().incorporate(update);
250                            }
251                        }
252                    }
253                }
254            }
255        }
256
257        tracing::info!("shutting down statistics sender task");
258    });
259
260    Box::new(shutdown_tx)
261}