1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! A tokio task (and support machinery) for producing storage statistics.
1112use std::any::Any;
13use std::collections::BTreeMap;
14use std::fmt::Debug;
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
1718use differential_dataflow::consolidation;
19use differential_dataflow::lattice::Lattice;
20use itertools::Itertools;
21use mz_ore::now::EpochMillis;
22use mz_persist_types::Codec64;
23use mz_repr::{Diff, TimestampManipulation};
24use mz_repr::{GlobalId, Row};
25use mz_storage_client::statistics::{PackableStats, SourceStatisticsUpdate, WebhookStatistics};
26use timely::progress::ChangeBatch;
27use timely::progress::Timestamp;
28use tokio::sync::oneshot;
29use tokio::sync::watch::Receiver;
3031use crate::collection_mgmt::CollectionManager;
32use StatsState::*;
3334#[derive(Debug)]
35pub(super) enum StatsState<Stat> {
36/// This stat is fully initialized.
37Initialized(Stat),
38/// This stat has not been initialized. We must write a `zero_value`
39 /// in one interval, before writing the first real update.
40NeedsInit {
41 zero_value: Stat,
42 future_update: Stat,
43 },
44}
4546impl<Stat: Clone> StatsState<Stat> {
47pub(super) fn new(zero_value: Stat) -> Self {
48let future_update = zero_value.clone();
49 StatsState::NeedsInit {
50 zero_value,
51 future_update,
52 }
53 }
5455/// Get a copy of the stat value that should be emitted to the statistics table.
56 /// After calling this, you must call `mark_initialized`. (These are separate
57 /// to avoid some clones).
58fn stat_to_emit(&mut self) -> &mut Stat {
59match self {
60 Initialized(stat) => stat,
61 NeedsInit { zero_value, .. } => zero_value,
62 }
63 }
6465/// Make this stat as initialized.
66fn mark_initialized(&mut self) {
67match self {
68 Initialized(_) => {}
69 NeedsInit { future_update, .. } => *self = Initialized(future_update.clone()),
70 }
71 }
7273/// Get a reference to a stat for additional incorporation.
74pub(super) fn stat(&mut self) -> &mut Stat {
75match self {
76 Initialized(stat) => stat,
77 NeedsInit { future_update, .. } => future_update,
78 }
79 }
80}
8182/// Conversion trait to allow multiple shapes of data in [`spawn_statistics_scraper`].
83pub(super) trait AsStats<Stats> {
84fn as_stats(&self) -> &BTreeMap<GlobalId, StatsState<Stats>>;
85fn as_mut_stats(&mut self) -> &mut BTreeMap<GlobalId, StatsState<Stats>>;
86}
8788impl<Stats> AsStats<Stats> for BTreeMap<GlobalId, StatsState<Stats>> {
89fn as_stats(&self) -> &BTreeMap<GlobalId, StatsState<Stats>> {
90self
91}
9293fn as_mut_stats(&mut self) -> &mut BTreeMap<GlobalId, StatsState<Stats>> {
94self
95}
96}
9798/// Spawns a task that continually (at an interval) writes statistics from storaged's
99/// that are consolidated in shared memory in the controller.
100pub(super) fn spawn_statistics_scraper<StatsWrapper, Stats, T>(
101 statistics_collection_id: GlobalId,
102 collection_mgmt: CollectionManager<T>,
103 shared_stats: Arc<Mutex<StatsWrapper>>,
104 previous_values: Vec<Row>,
105 initial_interval: Duration,
106mut interval_updated: Receiver<Duration>,
107 metrics: mz_storage_client::metrics::StorageControllerMetrics,
108) -> Box<dyn Any + Send + Sync>
109where
110StatsWrapper: AsStats<Stats> + Debug + Send + 'static,
111 Stats: PackableStats + Clone + Debug + Send + 'static,
112 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
113{
114let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
115116 mz_ore::task::spawn(|| "statistics_scraper", async move {
117// Keep track of what we think is the contents of the output
118 // collection, so that we can emit the required retractions/updates
119 // when we learn about new metrics.
120 //
121 // We assume that `shared_stats` is kept up-to-date (and initialized)
122 // by the controller.
123let mut current_metrics = <ChangeBatch<_>>::new();
124125let mut correction = Vec::new();
126 {
127let mut shared_stats = shared_stats.lock().expect("poisoned");
128for row in previous_values {
129let current = Stats::unpack(row, &metrics);
130131 shared_stats
132 .as_mut_stats()
133 .insert(current.0, StatsState::Initialized(current.1));
134 }
135136let mut row_buf = Row::default();
137for (_, stats) in shared_stats.as_stats().iter() {
138if let StatsState::Initialized(stats) = stats {
139 stats.pack(row_buf.packer());
140 correction.push((row_buf.clone(), Diff::ONE));
141 }
142 }
143 }
144145tracing::debug!(%statistics_collection_id, ?correction, "seeding stats collection");
146147// Make sure that the desired state matches what is already there, when
148 // we start up!
149if !correction.is_empty() {
150 current_metrics.extend(correction.iter().map(|(r, d)| (r.clone(), d.into_inner())));
151152 collection_mgmt.differential_append(statistics_collection_id, correction);
153 }
154155let mut interval = tokio::time::interval(initial_interval);
156 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
157158loop {
159tokio::select! {
160 _msg = &mut shutdown_rx => {
161break;
162 }
163164_ = interval_updated.changed() => {
165let new_interval = *interval_updated.borrow_and_update();
166if new_interval != interval.period() {
167 interval = tokio::time::interval(new_interval);
168 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
169// Note that the next interval will tick immediately. This is fine.
170}
171 }
172173_ = interval.tick() => {
174let mut row_buf = Row::default();
175let mut correction = current_metrics
176 .iter()
177 .cloned()
178 .map(|(row, diff)| (row, -diff))
179 .collect_vec();
180181// Ideally we move quickly when holding the lock here, as it can hold
182 // up the coordinator. Because we are just moving some data around, we should
183 // be fine!
184{
185let mut shared_stats = shared_stats.lock().expect("poisoned");
186for (_, stats) in shared_stats.as_mut_stats().iter_mut() {
187let stat = stats.stat_to_emit();
188 stat.pack(row_buf.packer());
189 correction.push((row_buf.clone(), 1));
190 stats.mark_initialized();
191 }
192 }
193194 consolidation::consolidate(&mut correction);
195196tracing::trace!(%statistics_collection_id, ?correction, "updating stats collection");
197198// Update our view of the output collection and write updates
199 // out to the collection.
200if !correction.is_empty() {
201 current_metrics.extend(correction.iter().cloned());
202 collection_mgmt
203 .differential_append(statistics_collection_id, correction.into_iter().map(|(r, d)| (r, d.into())).collect());
204 }
205 }
206 }
207 }
208209tracing::info!("shutting down statistics sender task");
210 });
211212 Box::new(shutdown_tx)
213}
214215/// A wrapper around source and webhook statistics maps so we can hold them within a single lock.
216#[derive(Debug)]
217pub(super) struct SourceStatistics {
218/// Statistics-per-source that will be emitted to the source statistics table with
219 /// the [`spawn_statistics_scraper`] above. Values are optional as cluster-hosted
220 /// sources must initialize the first values, but we need `None` to mark a source
221 /// having been created vs dropped (particularly when a cluster can race and report
222 /// statistics for a dropped source, which we must ignore).
223pub source_statistics: BTreeMap<GlobalId, StatsState<SourceStatisticsUpdate>>,
224/// A shared map with atomics for webhook appenders to update the (currently 4)
225 /// statistics that can meaningfully produce. These are periodically
226 /// copied into `source_statistics` [`spawn_webhook_statistics_scraper`] to avoid
227 /// contention.
228pub webhook_statistics: BTreeMap<GlobalId, Arc<WebhookStatistics>>,
229}
230231impl AsStats<SourceStatisticsUpdate> for SourceStatistics {
232fn as_stats(&self) -> &BTreeMap<GlobalId, StatsState<SourceStatisticsUpdate>> {
233&self.source_statistics
234 }
235236fn as_mut_stats(&mut self) -> &mut BTreeMap<GlobalId, StatsState<SourceStatisticsUpdate>> {
237&mut self.source_statistics
238 }
239}
240241/// Spawns a task that continually drains webhook statistics into `shared_stats.
242pub(super) fn spawn_webhook_statistics_scraper(
243 shared_stats: Arc<Mutex<SourceStatistics>>,
244 initial_interval: Duration,
245mut interval_updated: Receiver<Duration>,
246) -> Box<dyn Any + Send + Sync> {
247let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
248249 mz_ore::task::spawn(|| "webhook_statistics_scraper", async move {
250let mut interval = tokio::time::interval(initial_interval);
251 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
252253loop {
254tokio::select! {
255 _msg = &mut shutdown_rx => {
256break;
257 }
258259_ = interval_updated.changed() => {
260let new_interval = *interval_updated.borrow_and_update();
261if new_interval != interval.period() {
262 interval = tokio::time::interval(new_interval);
263 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
264// Note that the next interval will tick immediately. This is fine.
265}
266 },
267268_ = interval.tick() => {
269let mut shared_stats = shared_stats.lock().expect("poisoned");
270let shared_stats = &mut *shared_stats;
271for (id, ws) in shared_stats.webhook_statistics.iter() {
272// Don't override it if its been removed.
273shared_stats
274 .source_statistics
275 .entry(*id)
276 .and_modify(|current| current.stat().incorporate(ws.drain_into_update(*id)));
277 }
278 }
279 }
280 }
281282tracing::info!("shutting down statistics sender task");
283 });
284285 Box::new(shutdown_tx)
286}