Skip to main content

mz_storage/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Helpers for managing storage statistics.
11//!
12//!
13//! This module collects statistics related to sources and sinks. Statistics, as exposed
14//! to their respective system tables have strong semantics, defined within the
15//! `mz_storage_types::statistics` module. This module collects and aggregates metrics
16//! across workers according to those semantics.
17//!
18//! Note that it _simultaneously_ collect prometheus metrics for the given statistics. Those
19//! metrics _do not have the same strong semantics_, which is _by design_ to ensure we
20//! are able to categorically debug sources and sinks during complex failures (or bugs
21//! with statistics collection itself). Prometheus metrics are
22//! - Never dropped or reset until a source/sink is dropped.
23//! - Entirely independent across workers.
24
25use std::cell::RefCell;
26use std::collections::BTreeMap;
27use std::rc::Rc;
28use std::time::Instant;
29
30use mz_ore::metric;
31use mz_ore::metrics::{
32    DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, IntGaugeVec, MetricTag,
33    MetricVisibility, MetricsRegistry, UIntGaugeVec,
34};
35use mz_repr::{GlobalId, Timestamp};
36use mz_storage_client::statistics::{Gauge, SinkStatisticsUpdate, SourceStatisticsUpdate};
37use mz_storage_types::sources::SourceEnvelope;
38use prometheus::core::{AtomicI64, AtomicU64};
39use serde::{Deserialize, Serialize};
40use timely::PartialOrder;
41use timely::progress::frontier::Antichain;
42
43// Note(guswynn): ordinarily these metric structs would be in the `metrics` modules, but we
44// put them here so they can be near the user-facing definitions as well.
45
46#[derive(Clone, Debug)]
47pub(crate) struct SourceStatisticsMetricDefs {
48    // Counters
49    pub(crate) messages_received: IntCounterVec,
50    pub(crate) updates_staged: IntCounterVec,
51    pub(crate) updates_committed: IntCounterVec,
52    pub(crate) bytes_received: IntCounterVec,
53
54    // Gauges
55    pub(crate) snapshot_committed: UIntGaugeVec,
56    pub(crate) bytes_indexed: UIntGaugeVec,
57    pub(crate) records_indexed: UIntGaugeVec,
58    pub(crate) rehydration_latency_ms: IntGaugeVec,
59
60    pub(crate) offset_known: UIntGaugeVec,
61    pub(crate) offset_committed: UIntGaugeVec,
62    pub(crate) snapshot_records_known: UIntGaugeVec,
63    pub(crate) snapshot_records_staged: UIntGaugeVec,
64
65    // Just prometheus.
66    pub(crate) envelope_state_tombstones: UIntGaugeVec,
67}
68
69impl SourceStatisticsMetricDefs {
70    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
71        Self {
72            snapshot_committed: registry.register(metric!(
73                name: "mz_source_snapshot_committed",
74                help: "Whether or not the worker has committed the initial snapshot for a source.",
75                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
76            )),
77            messages_received: registry.register(metric!(
78                name: "mz_source_messages_received",
79                help: "The number of raw messages the worker has received from upstream.",
80                var_labels: ["source_id", "worker_id", "parent_source_id"],
81                visibility: MetricVisibility::Public,
82                tags: [MetricTag::Source],
83            )),
84            updates_staged: registry.register(metric!(
85                name: "mz_source_updates_staged",
86                help: "The number of updates (inserts + deletes) the worker has written but not yet committed to the storage layer.",
87                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
88            )),
89            updates_committed: registry.register(metric!(
90                name: "mz_source_updates_committed",
91                help: "The number of updates (inserts + deletes) the worker has committed into the storage layer.",
92                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
93            )),
94            bytes_received: registry.register(metric!(
95                name: "mz_source_bytes_received",
96                help: "The number of bytes worth of messages the worker has received from upstream. The way the bytes are counted is source-specific.",
97                var_labels: ["source_id", "worker_id", "parent_source_id"],
98                visibility: MetricVisibility::Public,
99                tags: [MetricTag::Source],
100            )),
101            bytes_indexed: registry.register(metric!(
102                name: "mz_source_bytes_indexed",
103                help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.",
104                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
105            )),
106            records_indexed: registry.register(metric!(
107                name: "mz_source_records_indexed",
108                help: "The number of records in the source envelope state. This will be specific to the envelope in use",
109                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
110            )),
111            envelope_state_tombstones: registry.register(metric!(
112                name: "mz_source_envelope_state_tombstones",
113                help: "The number of outstanding tombstones in the source envelope state. This will be specific to the envelope in use",
114                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
115            )),
116            rehydration_latency_ms: registry.register(metric!(
117                name: "mz_source_rehydration_latency_ms",
118                help: "The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. This will be specific to the envelope in use.",
119                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id", "envelope"],
120            )),
121            offset_known: registry.register(metric!(
122                name: "mz_source_offset_known",
123                help: "The total number of _values_ (source-defined unit) present in upstream.",
124                var_labels: ["source_id", "worker_id", "shard_id"],
125                visibility: MetricVisibility::Public,
126                tags: [MetricTag::Source],
127            )),
128            offset_committed: registry.register(metric!(
129                name: "mz_source_offset_committed",
130                help: "The total number of _values_ (source-defined unit) we have fully processed, and storage and committed.",
131                var_labels: ["source_id", "worker_id", "shard_id"],
132                visibility: MetricVisibility::Public,
133                tags: [MetricTag::Source],
134            )),
135            snapshot_records_known: registry.register(metric!(
136                name: "mz_source_snapshot_records_known",
137                help: "The total number of records in the source's snapshot",
138                var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
139            )),
140            snapshot_records_staged: registry.register(metric!(
141                name: "mz_source_snapshot_records_staged",
142                help: "The total number of records read from the source's snapshot",
143                var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
144            )),
145        }
146    }
147}
148
149/// Prometheus metrics for user-facing source metrics.
150#[derive(Debug)]
151pub struct SourceStatisticsMetrics {
152    // Counters
153    pub(crate) messages_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
154    pub(crate) updates_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
155    pub(crate) updates_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
156    pub(crate) bytes_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
157
158    // Gauges
159    pub(crate) snapshot_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
160    pub(crate) bytes_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
161    pub(crate) records_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
162    pub(crate) rehydration_latency_ms: DeleteOnDropGauge<AtomicI64, Vec<String>>,
163
164    pub(crate) offset_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
165    pub(crate) offset_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
166    pub(crate) snapshot_records_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
167    pub(crate) snapshot_records_staged: DeleteOnDropGauge<AtomicU64, Vec<String>>,
168
169    // Just prometheus.
170    pub(crate) envelope_state_tombstones: DeleteOnDropGauge<AtomicU64, Vec<String>>,
171}
172
173impl SourceStatisticsMetrics {
174    pub(crate) fn new(
175        defs: &SourceStatisticsMetricDefs,
176        id: GlobalId,
177        worker_id: usize,
178        parent_source_id: GlobalId,
179        shard_id: &mz_persist_client::ShardId,
180        envelope: SourceEnvelope,
181    ) -> SourceStatisticsMetrics {
182        let shard = shard_id.to_string();
183        let envelope = match envelope {
184            SourceEnvelope::None(_) => "none",
185            SourceEnvelope::Upsert(_) => "upsert",
186            SourceEnvelope::CdcV2 => "cdcv2",
187        };
188
189        SourceStatisticsMetrics {
190            snapshot_committed: defs.snapshot_committed.get_delete_on_drop_metric(vec![
191                id.to_string(),
192                worker_id.to_string(),
193                parent_source_id.to_string(),
194                shard.clone(),
195            ]),
196            messages_received: defs.messages_received.get_delete_on_drop_metric(vec![
197                id.to_string(),
198                worker_id.to_string(),
199                parent_source_id.to_string(),
200            ]),
201            updates_staged: defs.updates_staged.get_delete_on_drop_metric(vec![
202                id.to_string(),
203                worker_id.to_string(),
204                parent_source_id.to_string(),
205                shard.clone(),
206            ]),
207            updates_committed: defs.updates_committed.get_delete_on_drop_metric(vec![
208                id.to_string(),
209                worker_id.to_string(),
210                parent_source_id.to_string(),
211                shard.clone(),
212            ]),
213            bytes_received: defs.bytes_received.get_delete_on_drop_metric(vec![
214                id.to_string(),
215                worker_id.to_string(),
216                parent_source_id.to_string(),
217            ]),
218            bytes_indexed: defs.bytes_indexed.get_delete_on_drop_metric(vec![
219                id.to_string(),
220                worker_id.to_string(),
221                parent_source_id.to_string(),
222                shard.clone(),
223            ]),
224            records_indexed: defs.records_indexed.get_delete_on_drop_metric(vec![
225                id.to_string(),
226                worker_id.to_string(),
227                parent_source_id.to_string(),
228                shard.clone(),
229            ]),
230            envelope_state_tombstones: defs.envelope_state_tombstones.get_delete_on_drop_metric(
231                vec![
232                    id.to_string(),
233                    worker_id.to_string(),
234                    parent_source_id.to_string(),
235                    shard.clone(),
236                ],
237            ),
238            rehydration_latency_ms: defs.rehydration_latency_ms.get_delete_on_drop_metric(vec![
239                id.to_string(),
240                worker_id.to_string(),
241                parent_source_id.to_string(),
242                shard.clone(),
243                envelope.to_string(),
244            ]),
245            offset_known: defs.offset_known.get_delete_on_drop_metric(vec![
246                id.to_string(),
247                worker_id.to_string(),
248                shard.clone(),
249            ]),
250            offset_committed: defs.offset_committed.get_delete_on_drop_metric(vec![
251                id.to_string(),
252                worker_id.to_string(),
253                shard.clone(),
254            ]),
255            snapshot_records_known: defs.snapshot_records_known.get_delete_on_drop_metric(vec![
256                id.to_string(),
257                worker_id.to_string(),
258                shard.clone(),
259                parent_source_id.to_string(),
260            ]),
261            snapshot_records_staged: defs.snapshot_records_staged.get_delete_on_drop_metric(vec![
262                id.to_string(),
263                worker_id.to_string(),
264                shard.clone(),
265                parent_source_id.to_string(),
266            ]),
267        }
268    }
269}
270
271#[derive(Clone, Debug)]
272pub(crate) struct SinkStatisticsMetricDefs {
273    // Counters
274    pub(crate) messages_staged: IntCounterVec,
275    pub(crate) messages_committed: IntCounterVec,
276    pub(crate) bytes_staged: IntCounterVec,
277    pub(crate) bytes_committed: IntCounterVec,
278}
279
280impl SinkStatisticsMetricDefs {
281    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
282        Self {
283            messages_staged: registry.register(metric!(
284                name: "mz_sink_messages_staged",
285                help: "The number of messages staged but possibly not committed to the sink.",
286                var_labels: ["sink_id", "worker_id"],
287            )),
288            messages_committed: registry.register(metric!(
289                name: "mz_sink_messages_committed",
290                help: "The number of messages committed to the sink.",
291                var_labels: ["sink_id", "worker_id"],
292            )),
293            bytes_staged: registry.register(metric!(
294                name: "mz_sink_bytes_staged",
295                help: "The number of bytes staged but possibly not committed to the sink.",
296                var_labels: ["sink_id", "worker_id"],
297                visibility: MetricVisibility::Public,
298                tags: [MetricTag::Sink],
299            )),
300            bytes_committed: registry.register(metric!(
301                name: "mz_sink_bytes_committed",
302                help: "The number of bytes committed to the sink.",
303                var_labels: ["sink_id", "worker_id"],
304                visibility: MetricVisibility::Public,
305                tags: [MetricTag::Sink],
306            )),
307        }
308    }
309}
310
311/// Prometheus metrics for user-facing sink metrics.
312#[derive(Debug)]
313pub struct SinkStatisticsMetrics {
314    // Counters
315    pub(crate) messages_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
316    pub(crate) messages_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
317    pub(crate) bytes_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
318    pub(crate) bytes_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
319}
320
321impl SinkStatisticsMetrics {
322    pub(crate) fn new(
323        defs: &SinkStatisticsMetricDefs,
324        id: GlobalId,
325        worker_id: usize,
326    ) -> SinkStatisticsMetrics {
327        SinkStatisticsMetrics {
328            messages_staged: defs
329                .messages_staged
330                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
331            messages_committed: defs
332                .messages_committed
333                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
334            bytes_staged: defs
335                .bytes_staged
336                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
337            bytes_committed: defs
338                .bytes_committed
339                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
340        }
341    }
342}
343
344/// Meta data needed to maintain source statistics.
345#[derive(Debug, Clone)]
346pub struct SourceStatisticsMetadata {
347    /// The resumption upper of the source.
348    resume_upper: Antichain<Timestamp>,
349    /// Time at which the source (more precisely: this metadata object) was created.
350    created_at: Instant,
351}
352
353impl SourceStatisticsMetadata {
354    /// Create a new `SourceStatisticsMetadata` object.
355    pub fn new(resume_upper: Antichain<Timestamp>) -> Self {
356        Self {
357            resume_upper,
358            created_at: Instant::now(),
359        }
360    }
361}
362
363#[derive(Debug)]
364struct StatsInner<Stats, Metrics> {
365    stats: Stats,
366    prom: Metrics,
367}
368
369/// A helper struct designed to make it easy for operators to update user-facing metrics.
370/// This struct also ensures that each stack is also incremented in prometheus.
371///
372/// Caveats:
373/// - There is one Prometheus timeseries-per-worker, and we label it with the source id and the
374/// source id of the parent source (if there is no parent, these labels have the same value).
375///     - Some metrics also have the shard id we are writing metrics for.
376/// - The prometheus metrics do not have the same timestamps as the ones exposed in sql, because
377/// they are written at different times.
378///     - This may be fixed in the future when we write the metrics from storaged directly.
379///     - The value also eventually converge to the same value.
380#[derive(Debug)]
381pub struct StorageStatistics<Stats, Metrics, Meta> {
382    // Note also that the `DeleteOnDropCounter`'s in the `SourceStatisticsMetrics`
383    // already are in an `Arc`, so this is a bit of extra wrapping, but the cost
384    // shouldn't cost too much.
385    stats: Rc<RefCell<StatsInner<Stats, Metrics>>>,
386    /// Meta data needed to maintain statistics.
387    meta: Meta,
388}
389
390impl<Stats, Metrics, Meta: Clone> Clone for StorageStatistics<Stats, Metrics, Meta> {
391    fn clone(&self) -> Self {
392        Self {
393            stats: Rc::clone(&self.stats),
394            meta: self.meta.clone(),
395        }
396    }
397}
398
399/// Statistics maintained for sources. Gauges can be uninitialized.
400#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
401pub struct SourceStatisticsRecord {
402    id: GlobalId,
403    worker_id: usize,
404    // Counters
405    messages_received: u64,
406    bytes_received: u64,
407    updates_staged: u64,
408    updates_committed: u64,
409
410    // Gauges are always wrapped in an `Option` that represents if that gauge has been
411    // initialized by that worker.
412    records_indexed: Option<u64>,
413    bytes_indexed: Option<u64>,
414
415    // This field is nullable, so its value is an `Option`.
416    rehydration_latency_ms: Option<Option<i64>>,
417
418    // The following fields are able to be unset when shipped to the controller, so their
419    // values are `Option`'s
420    snapshot_records_known: Option<Option<u64>>,
421    snapshot_records_staged: Option<Option<u64>>,
422    snapshot_committed: Option<bool>,
423    offset_known: Option<Option<u64>>,
424    offset_committed: Option<Option<u64>>,
425
426    // Just prometheus.
427    envelope_state_tombstones: u64,
428}
429
430impl SourceStatisticsRecord {
431    fn reset_gauges(&mut self) {
432        // These gauges MUST be initialized across all workers before we aggregate and
433        // report statistics.
434        self.rehydration_latency_ms = None;
435        self.snapshot_committed = None;
436
437        // We consider these gauges always initialized
438        self.bytes_indexed = Some(0);
439        self.records_indexed = Some(0);
440
441        // We don't yet populate these, so we consider the initialized (with an empty value).
442        self.snapshot_records_known = Some(None);
443        self.snapshot_records_staged = Some(None);
444        self.offset_known = Some(None);
445        self.offset_committed = Some(None);
446
447        self.envelope_state_tombstones = 0;
448    }
449
450    /// Reset counters so that we continue to ship diffs to the controller.
451    fn reset_counters(&mut self) {
452        self.messages_received = 0;
453        self.bytes_received = 0;
454        self.updates_staged = 0;
455        self.updates_committed = 0;
456    }
457
458    /// Convert this record into an `SourceStatisticsUpdate` to be merged
459    /// across workers. Gauges must be initialized.
460    fn as_update(&self) -> SourceStatisticsUpdate {
461        let SourceStatisticsRecord {
462            id,
463            worker_id: _,
464            messages_received,
465            bytes_received,
466            updates_staged,
467            updates_committed,
468            records_indexed,
469            bytes_indexed,
470            rehydration_latency_ms,
471            snapshot_records_known,
472            snapshot_records_staged,
473            snapshot_committed,
474            offset_known,
475            offset_committed,
476            envelope_state_tombstones: _,
477        } = self.clone();
478
479        SourceStatisticsUpdate {
480            id,
481            messages_received: messages_received.into(),
482            bytes_received: bytes_received.into(),
483            updates_staged: updates_staged.into(),
484            updates_committed: updates_committed.into(),
485            records_indexed: Gauge::gauge(records_indexed.unwrap()),
486            bytes_indexed: Gauge::gauge(bytes_indexed.unwrap()),
487            rehydration_latency_ms: Gauge::gauge(rehydration_latency_ms.unwrap()),
488            snapshot_records_known: Gauge::gauge(snapshot_records_known.unwrap()),
489            snapshot_records_staged: Gauge::gauge(snapshot_records_staged.unwrap()),
490            snapshot_committed: Gauge::gauge(snapshot_committed.unwrap()),
491            offset_known: Gauge::gauge(offset_known.unwrap()),
492            offset_committed: Gauge::gauge(offset_committed.unwrap()),
493        }
494    }
495}
496
497/// Statistics maintained for sinks. Gauges can be uninitialized.
498#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
499pub struct SinkStatisticsRecord {
500    id: GlobalId,
501    worker_id: usize,
502    // Counters
503    messages_staged: u64,
504    messages_committed: u64,
505    bytes_staged: u64,
506    bytes_committed: u64,
507}
508
509impl SinkStatisticsRecord {
510    fn reset_gauges(&self) {}
511
512    /// Reset counters so that we continue to ship diffs to the controller.
513    fn reset_counters(&mut self) {
514        self.messages_staged = 0;
515        self.messages_committed = 0;
516        self.bytes_staged = 0;
517        self.bytes_committed = 0;
518    }
519
520    /// Convert this record into an `SinkStatisticsUpdate` to be merged
521    /// across workers. Gauges must be initialized.
522    fn as_update(&self) -> SinkStatisticsUpdate {
523        let SinkStatisticsRecord {
524            id,
525            worker_id: _,
526            messages_staged,
527            messages_committed,
528            bytes_staged,
529            bytes_committed,
530        } = self.clone();
531
532        SinkStatisticsUpdate {
533            id,
534            messages_staged: messages_staged.into(),
535            messages_committed: messages_committed.into(),
536            bytes_staged: bytes_staged.into(),
537            bytes_committed: bytes_committed.into(),
538        }
539    }
540}
541
542/// Statistics maintained for sources.
543pub type SourceStatistics =
544    StorageStatistics<SourceStatisticsRecord, SourceStatisticsMetrics, SourceStatisticsMetadata>;
545
546/// Statistics maintained for sinks.
547pub type SinkStatistics = StorageStatistics<SinkStatisticsRecord, SinkStatisticsMetrics, ()>;
548
549impl SourceStatistics {
550    pub(crate) fn new(
551        id: GlobalId,
552        worker_id: usize,
553        metrics: &SourceStatisticsMetricDefs,
554        parent_source_id: GlobalId,
555        shard_id: &mz_persist_client::ShardId,
556        envelope: SourceEnvelope,
557        resume_upper: Antichain<Timestamp>,
558    ) -> Self {
559        Self {
560            stats: Rc::new(RefCell::new(StatsInner {
561                stats: SourceStatisticsRecord {
562                    id,
563                    worker_id,
564                    messages_received: 0,
565                    updates_staged: 0,
566                    updates_committed: 0,
567                    bytes_received: 0,
568                    records_indexed: Some(0),
569                    bytes_indexed: Some(0),
570                    rehydration_latency_ms: None,
571                    snapshot_records_staged: Some(None),
572                    snapshot_records_known: Some(None),
573                    snapshot_committed: None,
574                    offset_known: Some(None),
575                    offset_committed: Some(None),
576                    envelope_state_tombstones: 0,
577                },
578                prom: SourceStatisticsMetrics::new(
579                    metrics,
580                    id,
581                    worker_id,
582                    parent_source_id,
583                    shard_id,
584                    envelope,
585                ),
586            })),
587            meta: SourceStatisticsMetadata::new(resume_upper),
588        }
589    }
590
591    /// Reset gauges to uninitialized state.
592    /// This does not reset prometheus gauges, which will be overwritten with new values.
593    pub fn reset_gauges(&self) {
594        let mut cur = self.stats.borrow_mut();
595        cur.stats.reset_gauges();
596    }
597
598    /// Get a snapshot of the data, returning `None` if all gauges are not initialized.
599    ///
600    /// This also resets counters, so that we continue to move diffs around.
601    pub fn snapshot(&self) -> Option<SourceStatisticsRecord> {
602        let mut cur = self.stats.borrow_mut();
603
604        match &cur.stats {
605            SourceStatisticsRecord {
606                records_indexed: Some(_),
607                bytes_indexed: Some(_),
608                rehydration_latency_ms: Some(_),
609                snapshot_records_known: Some(_),
610                snapshot_records_staged: Some(_),
611                snapshot_committed: Some(_),
612                offset_known: Some(_),
613                offset_committed: Some(_),
614                ..
615            } => {
616                let ret = Some(cur.stats.clone());
617                cur.stats.reset_counters();
618                ret
619            }
620            _ => None,
621        }
622    }
623
624    /// Set the `snapshot_committed` stat based on the reported upper, and
625    /// mark the stats as initialized.
626    ///
627    /// - In sql, we ensure that we _never_ reset `snapshot_committed` to `false`, but gauges and
628    /// counters are ordinarily reset to 0 in Prometheus, so on restarts this value may be inconsistent.
629    // TODO(guswynn): Actually test that this initialization logic works.
630    pub fn initialize_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
631        self.update_snapshot_committed(upper);
632    }
633
634    /// Set the `snapshot_committed` stat based on the reported upper.
635    pub fn update_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
636        let value = *upper != Antichain::from_elem(Timestamp::MIN);
637        let mut cur = self.stats.borrow_mut();
638        cur.stats.snapshot_committed = Some(value);
639        cur.prom.snapshot_committed.set(if value { 1 } else { 0 });
640    }
641
642    /// Increment the `messages_received` stat.
643    pub fn inc_messages_received_by(&self, value: u64) {
644        let mut cur = self.stats.borrow_mut();
645        cur.stats.messages_received = cur.stats.messages_received + value;
646        cur.prom.messages_received.inc_by(value);
647    }
648
649    /// Increment the `updates` stat.
650    pub fn inc_updates_staged_by(&self, value: u64) {
651        let mut cur = self.stats.borrow_mut();
652        cur.stats.updates_staged = cur.stats.updates_staged + value;
653        cur.prom.updates_staged.inc_by(value);
654    }
655
656    /// Increment the `messages_committed` stat.
657    pub fn inc_updates_committed_by(&self, value: u64) {
658        let mut cur = self.stats.borrow_mut();
659        cur.stats.updates_committed = cur.stats.updates_committed + value;
660        cur.prom.updates_committed.inc_by(value);
661    }
662
663    /// Increment the `bytes_received` stat.
664    pub fn inc_bytes_received_by(&self, value: u64) {
665        let mut cur = self.stats.borrow_mut();
666        cur.stats.bytes_received = cur.stats.bytes_received + value;
667        cur.prom.bytes_received.inc_by(value);
668    }
669
670    /// Update the `bytes_indexed` stat.
671    /// A positive value will add and a negative value will subtract.
672    pub fn update_bytes_indexed_by(&self, value: i64) {
673        let mut cur = self.stats.borrow_mut();
674        if let Some(updated) = cur
675            .stats
676            .bytes_indexed
677            .unwrap_or(0)
678            .checked_add_signed(value)
679        {
680            cur.stats.bytes_indexed = Some(updated);
681            cur.prom.bytes_indexed.set(updated);
682        } else {
683            let bytes_indexed = cur.stats.bytes_indexed.unwrap_or(0);
684            tracing::warn!(
685                "Unexpected u64 overflow while updating bytes_indexed value {} with {}",
686                bytes_indexed,
687                value
688            );
689            cur.stats.bytes_indexed = Some(0);
690            cur.prom.bytes_indexed.set(0);
691        }
692    }
693
694    /// Set the `bytes_indexed` to the given value
695    pub fn set_bytes_indexed(&self, value: i64) {
696        let mut cur = self.stats.borrow_mut();
697        let value = if value < 0 {
698            tracing::warn!("Unexpected negative value for bytes_indexed {}", value);
699            0
700        } else {
701            value.unsigned_abs()
702        };
703        cur.stats.bytes_indexed = Some(value);
704        cur.prom.bytes_indexed.set(value);
705    }
706
707    /// Update the `records_indexed` stat.
708    /// A positive value will add and a negative value will subtract.
709    pub fn update_records_indexed_by(&self, value: i64) {
710        let mut cur = self.stats.borrow_mut();
711        if let Some(updated) = cur
712            .stats
713            .records_indexed
714            .unwrap_or(0)
715            .checked_add_signed(value)
716        {
717            cur.stats.records_indexed = Some(updated);
718            cur.prom.records_indexed.set(updated);
719        } else {
720            let records_indexed = cur.stats.records_indexed.unwrap_or(0);
721            tracing::warn!(
722                "Unexpected u64 overflow while updating records_indexed value {} with {}",
723                records_indexed,
724                value
725            );
726            cur.stats.records_indexed = Some(0);
727            cur.prom.records_indexed.set(0);
728        }
729    }
730
731    /// Set the `records_indexed` to the given value
732    pub fn set_records_indexed(&self, value: i64) {
733        let mut cur = self.stats.borrow_mut();
734        let value = if value < 0 {
735            tracing::warn!("Unexpected negative value for records_indexed {}", value);
736            0
737        } else {
738            value.unsigned_abs()
739        };
740        cur.stats.records_indexed = Some(value);
741        cur.prom.records_indexed.set(value);
742    }
743
744    /// Initialize the `rehydration_latency_ms` stat as `NULL`.
745    pub fn initialize_rehydration_latency_ms(&self) {
746        let mut cur = self.stats.borrow_mut();
747        cur.stats.rehydration_latency_ms = Some(None);
748    }
749
750    /// Update the `envelope_state_tombstones` stat.
751    /// A positive value will add and a negative value will subtract.
752    // TODO(guswynn): consider exposing this to users
753    pub fn update_envelope_state_tombstones_by(&self, value: i64) {
754        let mut cur = self.stats.borrow_mut();
755        if let Some(updated) = cur
756            .stats
757            .envelope_state_tombstones
758            .checked_add_signed(value)
759        {
760            cur.stats.envelope_state_tombstones = updated;
761            cur.prom.envelope_state_tombstones.set(updated);
762        } else {
763            let envelope_state_tombstones = cur.stats.envelope_state_tombstones;
764            tracing::warn!(
765                "Unexpected u64 overflow while updating envelope_state_tombstones value {} with {}",
766                envelope_state_tombstones,
767                value
768            );
769            cur.stats.envelope_state_tombstones = 0;
770            cur.prom.envelope_state_tombstones.set(0);
771        }
772    }
773
774    /// Set the `rehydration_latency_ms` stat based on the reported upper.
775    pub fn update_rehydration_latency_ms(&self, upper: &Antichain<Timestamp>) {
776        let mut cur = self.stats.borrow_mut();
777
778        if matches!(cur.stats.rehydration_latency_ms, Some(Some(_))) {
779            return; // source was already hydrated before
780        }
781        if !PartialOrder::less_than(&self.meta.resume_upper, upper) {
782            return; // source is not yet hydrated
783        }
784
785        let elapsed = self.meta.created_at.elapsed();
786        let value = elapsed
787            .as_millis()
788            .try_into()
789            .expect("Rehydration took more than ~584 million years!");
790        cur.stats.rehydration_latency_ms = Some(Some(value));
791        cur.prom.rehydration_latency_ms.set(value);
792    }
793
794    /// Set the `offset_known` stat to the given value.
795    pub fn set_offset_known(&self, value: u64) {
796        let mut cur = self.stats.borrow_mut();
797        cur.stats.offset_known = Some(Some(value));
798        cur.prom.offset_known.set(value);
799    }
800
801    /// Set the `offset_committed` stat to the given value.
802    pub fn set_offset_committed(&self, value: u64) {
803        let mut cur = self.stats.borrow_mut();
804        cur.stats.offset_committed = Some(Some(value));
805        cur.prom.offset_committed.set(value);
806    }
807
808    /// Set the `snapshot_records_known` stat to the given value.
809    pub fn set_snapshot_records_known(&self, value: u64) {
810        let mut cur = self.stats.borrow_mut();
811        cur.stats.snapshot_records_known = Some(Some(value));
812        cur.prom.snapshot_records_known.set(value);
813    }
814
815    /// Set the `snapshot_records_known` stat to the given value.
816    pub fn set_snapshot_records_staged(&self, value: u64) {
817        let mut cur = self.stats.borrow_mut();
818        cur.stats.snapshot_records_staged = Some(Some(value));
819        cur.prom.snapshot_records_staged.set(value);
820    }
821}
822
823impl SinkStatistics {
824    pub(crate) fn new(id: GlobalId, worker_id: usize, metrics: &SinkStatisticsMetricDefs) -> Self {
825        Self {
826            stats: Rc::new(RefCell::new(StatsInner {
827                stats: SinkStatisticsRecord {
828                    id,
829                    worker_id,
830                    messages_staged: 0,
831                    messages_committed: 0,
832                    bytes_staged: 0,
833                    bytes_committed: 0,
834                },
835                prom: SinkStatisticsMetrics::new(metrics, id, worker_id),
836            })),
837            meta: (),
838        }
839    }
840
841    /// Reset gauges to uninitialized state.
842    /// This does not reset prometheus gauges, which will be overwritten with new values.
843    pub fn reset_gauges(&self) {
844        let cur = self.stats.borrow_mut();
845        cur.stats.reset_gauges();
846    }
847
848    /// Get a snapshot of the data, returning `None` if all gauges are not initialized.
849    ///
850    /// This also resets counters, so that we continue to move diffs around.
851    pub fn snapshot(&self) -> Option<SinkStatisticsRecord> {
852        let mut cur = self.stats.borrow_mut();
853
854        match &cur.stats {
855            SinkStatisticsRecord { .. } => {
856                let ret = Some(cur.stats.clone());
857                cur.stats.reset_counters();
858                ret
859            }
860        }
861    }
862
863    /// Increment the `messages_staged` stat.
864    pub fn inc_messages_staged_by(&self, value: u64) {
865        let mut cur = self.stats.borrow_mut();
866        cur.stats.messages_staged = cur.stats.messages_staged + value;
867        cur.prom.messages_staged.inc_by(value);
868    }
869
870    /// Increment the `bytes_received` stat.
871    pub fn inc_bytes_staged_by(&self, value: u64) {
872        let mut cur = self.stats.borrow_mut();
873        cur.stats.bytes_staged = cur.stats.bytes_staged + value;
874        cur.prom.bytes_staged.inc_by(value);
875    }
876
877    /// Increment the `messages_committed` stat.
878    pub fn inc_messages_committed_by(&self, value: u64) {
879        let mut cur = self.stats.borrow_mut();
880        cur.stats.messages_committed = cur.stats.messages_committed + value;
881        cur.prom.messages_committed.inc_by(value);
882    }
883
884    /// Increment the `bytes_committed` stat.
885    pub fn inc_bytes_committed_by(&self, value: u64) {
886        let mut cur = self.stats.borrow_mut();
887        cur.stats.bytes_committed = cur.stats.bytes_committed + value;
888        cur.prom.bytes_committed.inc_by(value);
889    }
890}
891
892/// A structure that keeps track of _local_ statistics, as well as aggregating
893/// statistics into a single worker (currently worker 0).
894///
895/// This is because we ONLY want to emit statistics for _gauge-style-statistics_ when
896/// ALL workers have caught up to the _currently running instance of a source/sink_ and have
897/// reported an accurate gauge. This prevents issues like 1 worker reporting
898/// `snapshot_committed=true` before the others workers have caught up to report that they haven't
899/// finished snapshotting.
900///
901/// The API flow is:
902/// - Initialize sources/sinks with `initialize_source` or `initialize_sink`. This advances the
903/// local epoch.
904/// - Advance the _global epoch_ with `advance_global_epoch`. This should always be called strictly
905/// before reinitializing objects.
906/// - Emit a snapshot of local data with `emit_local`, which can be broadcasted to other workers.
907/// - Ingest data from other workers with `ingest`.
908/// - Emit a `snapshot` of _global data_ (i.e. skipping objects that don't have all workers caught
909/// up) with `snapshot`.
910///
911/// All functions can and should be called from ALL workers.
912///
913/// Note that we hand-roll a statistics epoch here, but in the future, when the storage
914/// _controller_ has top-level support for _dataflow epochs_, this logic can be entirely moved
915/// there.
916pub struct AggregatedStatistics {
917    worker_id: usize,
918    worker_count: usize,
919    local_source_statistics: BTreeMap<GlobalId, (usize, GlobalId, SourceStatistics)>,
920    local_sink_statistics: BTreeMap<GlobalId, (usize, SinkStatistics)>,
921
922    global_source_statistics:
923        BTreeMap<GlobalId, (usize, GlobalId, Vec<Option<SourceStatisticsUpdate>>)>,
924    global_sink_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SinkStatisticsUpdate>>)>,
925}
926
927impl AggregatedStatistics {
928    /// Initialize a new `AggregatedStatistics`.
929    pub fn new(worker_id: usize, worker_count: usize) -> Self {
930        AggregatedStatistics {
931            worker_id,
932            worker_count,
933            local_source_statistics: Default::default(),
934            local_sink_statistics: Default::default(),
935            global_source_statistics: Default::default(),
936            global_sink_statistics: Default::default(),
937        }
938    }
939
940    /// Get a collection of `SourceStatistics` for the ingestion `ingestion_id`.
941    pub fn get_ingestion_stats(
942        &self,
943        ingestion_id: &GlobalId,
944    ) -> BTreeMap<GlobalId, SourceStatistics> {
945        let mut ingestion_stats = BTreeMap::new();
946        for (id, (_epoch, ingestion_id2, stats)) in self.local_source_statistics.iter() {
947            if ingestion_id == ingestion_id2 {
948                ingestion_stats.insert(id.clone(), stats.clone());
949            }
950        }
951        ingestion_stats
952    }
953
954    /// Get a `SourceStatistics` for an id, if it exists.
955    pub fn get_source(&self, id: &GlobalId) -> Option<&SourceStatistics> {
956        self.local_source_statistics.get(id).map(|(_, _, s)| s)
957    }
958
959    /// Get a `SinkStatistics` for an id, if it exists.
960    pub fn get_sink(&self, id: &GlobalId) -> Option<&SinkStatistics> {
961        self.local_sink_statistics.get(id).map(|(_, s)| s)
962    }
963
964    /// Deinitialize an object. Other methods other than `initialize_source` and `initialize_sink`
965    /// will never overwrite this.
966    pub fn deinitialize(&mut self, id: GlobalId) {
967        self.local_source_statistics.remove(&id);
968        self.local_sink_statistics.remove(&id);
969        self.global_source_statistics.remove(&id);
970        self.global_sink_statistics.remove(&id);
971    }
972
973    /// Advance the _global epoch_ for statistics.
974    ///
975    /// Gauge values from previous epochs will be ignored. Counter values from previous epochs will
976    /// still be applied as usual.
977    pub fn advance_global_epoch(&mut self, id: GlobalId) {
978        if let Some((epoch, _ingestion_id, stats)) = self.global_source_statistics.get_mut(&id) {
979            *epoch += 1;
980            for worker_stats in stats {
981                if let Some(update) = worker_stats {
982                    update.reset_gauges();
983                }
984            }
985        }
986        if let Some((epoch, stats)) = self.global_sink_statistics.get_mut(&id) {
987            *epoch += 1;
988            for worker_stats in stats {
989                if let Some(update) = worker_stats {
990                    update.reset_gauges();
991                }
992            }
993        }
994    }
995
996    /// Re-initialize a source. If it already exists, then its local epoch is advanced.
997    pub fn initialize_source<F: FnOnce() -> SourceStatistics>(
998        &mut self,
999        id: GlobalId,
1000        ingestion_id: GlobalId,
1001        resume_upper: Antichain<Timestamp>,
1002        stats: F,
1003    ) {
1004        self.local_source_statistics
1005            .entry(id)
1006            .and_modify(|(epoch, ingestion_id2, stats)| {
1007                assert_eq!(ingestion_id, *ingestion_id2);
1008                *epoch += 1;
1009                stats.reset_gauges();
1010                stats.meta = SourceStatisticsMetadata::new(resume_upper);
1011            })
1012            .or_insert_with(|| (0, ingestion_id, stats()));
1013
1014        if self.worker_id == 0 {
1015            self.global_source_statistics
1016                .entry(id)
1017                .or_insert_with(|| (0, ingestion_id, vec![None; self.worker_count]));
1018        }
1019    }
1020
1021    /// Re-initialize a sink. If it already exists, then its local epoch is advanced.
1022    pub fn initialize_sink<F: FnOnce() -> SinkStatistics>(&mut self, id: GlobalId, stats: F) {
1023        self.local_sink_statistics
1024            .entry(id)
1025            .and_modify(|(epoch, stats)| {
1026                *epoch += 1;
1027                stats.reset_gauges();
1028            })
1029            .or_insert_with(|| (0, stats()));
1030        if self.worker_id == 0 {
1031            self.global_sink_statistics
1032                .entry(id)
1033                .or_insert_with(|| (0, vec![None; self.worker_count]));
1034        }
1035    }
1036
1037    /// Ingest data from other workers.
1038    pub fn ingest(
1039        &mut self,
1040        source_statistics: Vec<(usize, SourceStatisticsRecord)>,
1041        sink_statistics: Vec<(usize, SinkStatisticsRecord)>,
1042    ) {
1043        // Currently, only worker 0 ingest data from other workers.
1044        if self.worker_id != 0 {
1045            return;
1046        }
1047
1048        for (epoch, stat) in source_statistics {
1049            if let Some((global_epoch, _, stats)) = self.global_source_statistics.get_mut(&stat.id)
1050            {
1051                // The record might be from a previous incarnation of the source. If that's the
1052                // case, we only incorporate its counter values and ignore its gauge values.
1053                let epoch_match = epoch >= *global_epoch;
1054                let mut update = stat.as_update();
1055                match (&mut stats[stat.worker_id], epoch_match) {
1056                    (None, true) => stats[stat.worker_id] = Some(update),
1057                    (None, false) => {
1058                        update.reset_gauges();
1059                        stats[stat.worker_id] = Some(update);
1060                    }
1061                    (Some(occupied), true) => occupied.incorporate(update),
1062                    (Some(occupied), false) => occupied.incorporate_counters(update),
1063                }
1064            }
1065        }
1066
1067        for (epoch, stat) in sink_statistics {
1068            if let Some((global_epoch, stats)) = self.global_sink_statistics.get_mut(&stat.id) {
1069                // The record might be from a previous incarnation of the sink. If that's the
1070                // case, we only incorporate its counter values and ignore its gauge values.
1071                let epoch_match = epoch >= *global_epoch;
1072                let update = stat.as_update();
1073                match (&mut stats[stat.worker_id], epoch_match) {
1074                    (None, true) => stats[stat.worker_id] = Some(update),
1075                    (None, false) => {
1076                        update.reset_gauges();
1077                        stats[stat.worker_id] = Some(update);
1078                    }
1079                    (Some(occupied), true) => occupied.incorporate(update),
1080                    (Some(occupied), false) => occupied.incorporate_counters(update),
1081                }
1082            }
1083        }
1084    }
1085
1086    fn _emit_local(
1087        &mut self,
1088    ) -> (
1089        Vec<(usize, SourceStatisticsRecord)>,
1090        Vec<(usize, SinkStatisticsRecord)>,
1091    ) {
1092        let sources = self
1093            .local_source_statistics
1094            .values_mut()
1095            .flat_map(|(epoch, _, s)| s.snapshot().map(|v| (*epoch, v)))
1096            .collect();
1097
1098        let sinks = self
1099            .local_sink_statistics
1100            .values_mut()
1101            .flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
1102            .collect();
1103
1104        (sources, sinks)
1105    }
1106
1107    /// Emit a snapshot of this workers local data.
1108    pub fn emit_local(
1109        &mut self,
1110    ) -> (
1111        Vec<(usize, SourceStatisticsRecord)>,
1112        Vec<(usize, SinkStatisticsRecord)>,
1113    ) {
1114        // As an optimization, worker 0 does not broadcast it data. It ingests
1115        // it in `snapshot`.
1116        if self.worker_id == 0 {
1117            return (Vec::new(), Vec::new());
1118        }
1119
1120        self._emit_local()
1121    }
1122
1123    /// Emit a _global_ snapshot of data. This does not include objects whose workers have not
1124    /// initialized gauges for the current epoch.
1125    pub fn snapshot(&mut self) -> (Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>) {
1126        if !self.worker_id == 0 {
1127            return (Vec::new(), Vec::new());
1128        }
1129
1130        let (sources, sinks) = self._emit_local();
1131        self.ingest(sources, sinks);
1132
1133        let sources = self
1134            .global_source_statistics
1135            .iter_mut()
1136            .filter_map(|(_, (_, _, s))| {
1137                if s.iter().all(|s| s.is_some()) {
1138                    let ret = Some(SourceStatisticsUpdate::summarize(|| {
1139                        s.iter().filter_map(Option::as_ref)
1140                    }));
1141
1142                    // Reset the counters so we only report diffs.
1143                    s.iter_mut().for_each(|s| {
1144                        if let Some(s) = s {
1145                            s.reset_counters();
1146                        }
1147                    });
1148                    ret
1149                } else {
1150                    None
1151                }
1152            })
1153            .collect();
1154
1155        let sinks = self
1156            .global_sink_statistics
1157            .iter_mut()
1158            .filter_map(|(_, (_, s))| {
1159                if s.iter().all(|s| s.is_some()) {
1160                    let ret = Some(SinkStatisticsUpdate::summarize(|| {
1161                        s.iter().filter_map(Option::as_ref)
1162                    }));
1163
1164                    // Reset the counters so we only report diffs.
1165                    s.iter_mut().for_each(|s| {
1166                        if let Some(s) = s {
1167                            s.reset_counters();
1168                        }
1169                    });
1170                    ret
1171                } else {
1172                    None
1173                }
1174            })
1175            .collect();
1176
1177        (sources, sinks)
1178    }
1179}