mz_storage_controller/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A tokio task (and support machinery) for producing storage statistics.
11
12use std::any::Any;
13use std::collections::BTreeMap;
14use std::fmt::Debug;
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17
18use differential_dataflow::consolidation;
19use differential_dataflow::lattice::Lattice;
20use itertools::Itertools;
21use mz_ore::now::EpochMillis;
22use mz_persist_types::Codec64;
23use mz_repr::{Diff, TimestampManipulation};
24use mz_repr::{GlobalId, Row};
25use mz_storage_client::statistics::{PackableStats, SourceStatisticsUpdate, WebhookStatistics};
26use timely::progress::ChangeBatch;
27use timely::progress::Timestamp;
28use tokio::sync::oneshot;
29use tokio::sync::watch::Receiver;
30
31use crate::collection_mgmt::CollectionManager;
32use StatsState::*;
33
34#[derive(Debug)]
35pub(super) enum StatsState<Stat> {
36    /// This stat is fully initialized.
37    Initialized(Stat),
38    /// This stat has not been initialized. We must write a `zero_value`
39    /// in one interval, before writing the first real update.
40    NeedsInit {
41        zero_value: Stat,
42        future_update: Stat,
43    },
44}
45
46impl<Stat: Clone> StatsState<Stat> {
47    pub(super) fn new(zero_value: Stat) -> Self {
48        let future_update = zero_value.clone();
49        StatsState::NeedsInit {
50            zero_value,
51            future_update,
52        }
53    }
54
55    /// Get a copy of the stat value that should be emitted to the statistics table.
56    /// After calling this, you must call `mark_initialized`. (These are separate
57    /// to avoid some clones).
58    fn stat_to_emit(&mut self) -> &mut Stat {
59        match self {
60            Initialized(stat) => stat,
61            NeedsInit { zero_value, .. } => zero_value,
62        }
63    }
64
65    /// Make this stat as initialized.
66    fn mark_initialized(&mut self) {
67        match self {
68            Initialized(_) => {}
69            NeedsInit { future_update, .. } => *self = Initialized(future_update.clone()),
70        }
71    }
72
73    /// Get a reference to a stat for additional incorporation.
74    pub(super) fn stat(&mut self) -> &mut Stat {
75        match self {
76            Initialized(stat) => stat,
77            NeedsInit { future_update, .. } => future_update,
78        }
79    }
80}
81
82/// Conversion trait to allow multiple shapes of data in [`spawn_statistics_scraper`].
83pub(super) trait AsStats<Stats> {
84    fn as_stats(&self) -> &BTreeMap<GlobalId, StatsState<Stats>>;
85    fn as_mut_stats(&mut self) -> &mut BTreeMap<GlobalId, StatsState<Stats>>;
86}
87
88impl<Stats> AsStats<Stats> for BTreeMap<GlobalId, StatsState<Stats>> {
89    fn as_stats(&self) -> &BTreeMap<GlobalId, StatsState<Stats>> {
90        self
91    }
92
93    fn as_mut_stats(&mut self) -> &mut BTreeMap<GlobalId, StatsState<Stats>> {
94        self
95    }
96}
97
98/// Spawns a task that continually (at an interval) writes statistics from storaged's
99/// that are consolidated in shared memory in the controller.
100pub(super) fn spawn_statistics_scraper<StatsWrapper, Stats, T>(
101    statistics_collection_id: GlobalId,
102    collection_mgmt: CollectionManager<T>,
103    shared_stats: Arc<Mutex<StatsWrapper>>,
104    previous_values: Vec<Row>,
105    initial_interval: Duration,
106    mut interval_updated: Receiver<Duration>,
107    metrics: mz_storage_client::metrics::StorageControllerMetrics,
108) -> Box<dyn Any + Send + Sync>
109where
110    StatsWrapper: AsStats<Stats> + Debug + Send + 'static,
111    Stats: PackableStats + Clone + Debug + Send + 'static,
112    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
113{
114    let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
115
116    mz_ore::task::spawn(|| "statistics_scraper", async move {
117        // Keep track of what we think is the contents of the output
118        // collection, so that we can emit the required retractions/updates
119        // when we learn about new metrics.
120        //
121        // We assume that `shared_stats` is kept up-to-date (and initialized)
122        // by the controller.
123        let mut current_metrics = <ChangeBatch<_>>::new();
124
125        let mut correction = Vec::new();
126        {
127            let mut shared_stats = shared_stats.lock().expect("poisoned");
128            for row in previous_values {
129                let current = Stats::unpack(row, &metrics);
130
131                shared_stats
132                    .as_mut_stats()
133                    .insert(current.0, StatsState::Initialized(current.1));
134            }
135
136            let mut row_buf = Row::default();
137            for (_, stats) in shared_stats.as_stats().iter() {
138                if let StatsState::Initialized(stats) = stats {
139                    stats.pack(row_buf.packer());
140                    correction.push((row_buf.clone(), Diff::ONE));
141                }
142            }
143        }
144
145        tracing::debug!(%statistics_collection_id, ?correction, "seeding stats collection");
146
147        // Make sure that the desired state matches what is already there, when
148        // we start up!
149        if !correction.is_empty() {
150            current_metrics.extend(correction.iter().map(|(r, d)| (r.clone(), d.into_inner())));
151
152            collection_mgmt.differential_append(statistics_collection_id, correction);
153        }
154
155        let mut interval = tokio::time::interval(initial_interval);
156        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
157
158        loop {
159            tokio::select! {
160                _msg = &mut shutdown_rx => {
161                    break;
162                }
163
164               _ = interval_updated.changed() => {
165                    let new_interval = *interval_updated.borrow_and_update();
166                    if new_interval != interval.period() {
167                        interval = tokio::time::interval(new_interval);
168                        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
169                        // Note that the next interval will tick immediately. This is fine.
170                    }
171                }
172
173                _ = interval.tick() => {
174                    let mut row_buf = Row::default();
175                    let mut correction = current_metrics
176                        .iter()
177                        .cloned()
178                        .map(|(row, diff)| (row, -diff))
179                        .collect_vec();
180
181                    // Ideally we move quickly when holding the lock here, as it can hold
182                    // up the coordinator. Because we are just moving some data around, we should
183                    // be fine!
184                    {
185                        let mut shared_stats = shared_stats.lock().expect("poisoned");
186                        for (_, stats) in shared_stats.as_mut_stats().iter_mut() {
187                            let stat = stats.stat_to_emit();
188                            stat.pack(row_buf.packer());
189                            correction.push((row_buf.clone(), 1));
190                            stats.mark_initialized();
191                        }
192                    }
193
194                    consolidation::consolidate(&mut correction);
195
196                    tracing::trace!(%statistics_collection_id, ?correction, "updating stats collection");
197
198                    // Update our view of the output collection and write updates
199                    // out to the collection.
200                    if !correction.is_empty() {
201                        current_metrics.extend(correction.iter().cloned());
202                        collection_mgmt
203                            .differential_append(statistics_collection_id, correction.into_iter().map(|(r, d)| (r, d.into())).collect());
204                    }
205                }
206            }
207        }
208
209        tracing::info!("shutting down statistics sender task");
210    });
211
212    Box::new(shutdown_tx)
213}
214
215/// A wrapper around source and webhook statistics maps so we can hold them within a single lock.
216#[derive(Debug)]
217pub(super) struct SourceStatistics {
218    /// Statistics-per-source that will be emitted to the source statistics table with
219    /// the [`spawn_statistics_scraper`] above. Values are optional as cluster-hosted
220    /// sources must initialize the first values, but we need `None` to mark a source
221    /// having been created vs dropped (particularly when a cluster can race and report
222    /// statistics for a dropped source, which we must ignore).
223    pub source_statistics: BTreeMap<GlobalId, StatsState<SourceStatisticsUpdate>>,
224    /// A shared map with atomics for webhook appenders to update the (currently 4)
225    /// statistics that can meaningfully produce. These are periodically
226    /// copied into `source_statistics` [`spawn_webhook_statistics_scraper`] to avoid
227    /// contention.
228    pub webhook_statistics: BTreeMap<GlobalId, Arc<WebhookStatistics>>,
229}
230
231impl AsStats<SourceStatisticsUpdate> for SourceStatistics {
232    fn as_stats(&self) -> &BTreeMap<GlobalId, StatsState<SourceStatisticsUpdate>> {
233        &self.source_statistics
234    }
235
236    fn as_mut_stats(&mut self) -> &mut BTreeMap<GlobalId, StatsState<SourceStatisticsUpdate>> {
237        &mut self.source_statistics
238    }
239}
240
241/// Spawns a task that continually drains webhook statistics into `shared_stats.
242pub(super) fn spawn_webhook_statistics_scraper(
243    shared_stats: Arc<Mutex<SourceStatistics>>,
244    initial_interval: Duration,
245    mut interval_updated: Receiver<Duration>,
246) -> Box<dyn Any + Send + Sync> {
247    let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
248
249    mz_ore::task::spawn(|| "webhook_statistics_scraper", async move {
250        let mut interval = tokio::time::interval(initial_interval);
251        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
252
253        loop {
254            tokio::select! {
255                _msg = &mut shutdown_rx => {
256                    break;
257                }
258
259               _ = interval_updated.changed() => {
260                    let new_interval = *interval_updated.borrow_and_update();
261                    if new_interval != interval.period() {
262                        interval = tokio::time::interval(new_interval);
263                        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
264                        // Note that the next interval will tick immediately. This is fine.
265                    }
266                },
267
268                _ = interval.tick() => {
269                    let mut shared_stats = shared_stats.lock().expect("poisoned");
270                    let shared_stats = &mut *shared_stats;
271                    for (id, ws) in shared_stats.webhook_statistics.iter() {
272                        // Don't override it if its been removed.
273                        shared_stats
274                            .source_statistics
275                            .entry(*id)
276                            .and_modify(|current| current.stat().incorporate(ws.drain_into_update(*id)));
277                    }
278                }
279            }
280        }
281
282        tracing::info!("shutting down statistics sender task");
283    });
284
285    Box::new(shutdown_tx)
286}