mz_storage/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Helpers for managing storage statistics.
11//!
12//!
13//! This module collects statistics related to sources and sinks. Statistics, as exposed
14//! to their respective system tables have strong semantics, defined within the
15//! `mz_storage_types::statistics` module. This module collects and aggregates metrics
16//! across workers according to those semantics.
17//!
18//! Note that it _simultaneously_ collect prometheus metrics for the given statistics. Those
19//! metrics _do not have the same strong semantics_, which is _by design_ to ensure we
20//! are able to categorically debug sources and sinks during complex failures (or bugs
21//! with statistics collection itself). Prometheus metrics are
22//! - Never dropped or reset until a source/sink is dropped.
23//! - Entirely independent across workers.
24
25use std::cell::RefCell;
26use std::collections::BTreeMap;
27use std::rc::Rc;
28use std::time::Instant;
29
30use mz_ore::metric;
31use mz_ore::metrics::{
32    DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, IntGaugeVec, MetricsRegistry,
33    UIntGaugeVec,
34};
35use mz_repr::{GlobalId, Timestamp};
36use mz_storage_client::statistics::{Gauge, SinkStatisticsUpdate, SourceStatisticsUpdate};
37use mz_storage_types::sources::SourceEnvelope;
38use prometheus::core::{AtomicI64, AtomicU64};
39use serde::{Deserialize, Serialize};
40use timely::PartialOrder;
41use timely::progress::frontier::Antichain;
42
43// Note(guswynn): ordinarily these metric structs would be in the `metrics` modules, but we
44// put them here so they can be near the user-facing definitions as well.
45
46#[derive(Clone, Debug)]
47pub(crate) struct SourceStatisticsMetricDefs {
48    // Counters
49    pub(crate) messages_received: IntCounterVec,
50    pub(crate) updates_staged: IntCounterVec,
51    pub(crate) updates_committed: IntCounterVec,
52    pub(crate) bytes_received: IntCounterVec,
53
54    // Gauges
55    pub(crate) snapshot_committed: UIntGaugeVec,
56    pub(crate) bytes_indexed: UIntGaugeVec,
57    pub(crate) records_indexed: UIntGaugeVec,
58    pub(crate) rehydration_latency_ms: IntGaugeVec,
59
60    pub(crate) offset_known: UIntGaugeVec,
61    pub(crate) offset_committed: UIntGaugeVec,
62    pub(crate) snapshot_records_known: UIntGaugeVec,
63    pub(crate) snapshot_records_staged: UIntGaugeVec,
64
65    // Just prometheus.
66    pub(crate) envelope_state_tombstones: UIntGaugeVec,
67}
68
69impl SourceStatisticsMetricDefs {
70    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
71        Self {
72            snapshot_committed: registry.register(metric!(
73                name: "mz_source_snapshot_committed",
74                help: "Whether or not the worker has committed the initial snapshot for a source.",
75                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
76            )),
77            messages_received: registry.register(metric!(
78                name: "mz_source_messages_received",
79                help: "The number of raw messages the worker has received from upstream.",
80                var_labels: ["source_id", "worker_id", "parent_source_id"],
81            )),
82            updates_staged: registry.register(metric!(
83                name: "mz_source_updates_staged",
84                help: "The number of updates (inserts + deletes) the worker has written but not yet committed to the storage layer.",
85                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
86            )),
87            updates_committed: registry.register(metric!(
88                name: "mz_source_updates_committed",
89                help: "The number of updates (inserts + deletes) the worker has committed into the storage layer.",
90                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
91            )),
92            bytes_received: registry.register(metric!(
93                name: "mz_source_bytes_received",
94                help: "The number of bytes worth of messages the worker has received from upstream. The way the bytes are counted is source-specific.",
95                var_labels: ["source_id", "worker_id", "parent_source_id"],
96            )),
97            bytes_indexed: registry.register(metric!(
98                name: "mz_source_bytes_indexed",
99                help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.",
100                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
101            )),
102            records_indexed: registry.register(metric!(
103                name: "mz_source_records_indexed",
104                help: "The number of records in the source envelope state. This will be specific to the envelope in use",
105                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
106            )),
107            envelope_state_tombstones: registry.register(metric!(
108                name: "mz_source_envelope_state_tombstones",
109                help: "The number of outstanding tombstones in the source envelope state. This will be specific to the envelope in use",
110                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
111            )),
112            rehydration_latency_ms: registry.register(metric!(
113                name: "mz_source_rehydration_latency_ms",
114                help: "The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. This will be specific to the envelope in use.",
115                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id", "envelope"],
116            )),
117            offset_known: registry.register(metric!(
118                name: "mz_source_offset_known",
119                help: "The total number of _values_ (source-defined unit) present in upstream.",
120                var_labels: ["source_id", "worker_id", "shard_id"],
121            )),
122            offset_committed: registry.register(metric!(
123                name: "mz_source_offset_committed",
124                help: "The total number of _values_ (source-defined unit) we have fully processed, and storage and committed.",
125                var_labels: ["source_id", "worker_id", "shard_id"],
126            )),
127            snapshot_records_known: registry.register(metric!(
128                name: "mz_source_snapshot_records_known",
129                help: "The total number of records in the source's snapshot",
130                var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
131            )),
132            snapshot_records_staged: registry.register(metric!(
133                name: "mz_source_snapshot_records_staged",
134                help: "The total number of records read from the source's snapshot",
135                var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
136            )),
137        }
138    }
139}
140
141/// Prometheus metrics for user-facing source metrics.
142#[derive(Debug)]
143pub struct SourceStatisticsMetrics {
144    // Counters
145    pub(crate) messages_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
146    pub(crate) updates_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
147    pub(crate) updates_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
148    pub(crate) bytes_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
149
150    // Gauges
151    pub(crate) snapshot_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
152    pub(crate) bytes_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
153    pub(crate) records_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
154    pub(crate) rehydration_latency_ms: DeleteOnDropGauge<AtomicI64, Vec<String>>,
155
156    pub(crate) offset_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
157    pub(crate) offset_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
158    pub(crate) snapshot_records_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
159    pub(crate) snapshot_records_staged: DeleteOnDropGauge<AtomicU64, Vec<String>>,
160
161    // Just prometheus.
162    pub(crate) envelope_state_tombstones: DeleteOnDropGauge<AtomicU64, Vec<String>>,
163}
164
165impl SourceStatisticsMetrics {
166    pub(crate) fn new(
167        defs: &SourceStatisticsMetricDefs,
168        id: GlobalId,
169        worker_id: usize,
170        parent_source_id: GlobalId,
171        shard_id: &mz_persist_client::ShardId,
172        envelope: SourceEnvelope,
173    ) -> SourceStatisticsMetrics {
174        let shard = shard_id.to_string();
175        let envelope = match envelope {
176            SourceEnvelope::None(_) => "none",
177            SourceEnvelope::Upsert(_) => "upsert",
178            SourceEnvelope::CdcV2 => "cdcv2",
179        };
180
181        SourceStatisticsMetrics {
182            snapshot_committed: defs.snapshot_committed.get_delete_on_drop_metric(vec![
183                id.to_string(),
184                worker_id.to_string(),
185                parent_source_id.to_string(),
186                shard.clone(),
187            ]),
188            messages_received: defs.messages_received.get_delete_on_drop_metric(vec![
189                id.to_string(),
190                worker_id.to_string(),
191                parent_source_id.to_string(),
192            ]),
193            updates_staged: defs.updates_staged.get_delete_on_drop_metric(vec![
194                id.to_string(),
195                worker_id.to_string(),
196                parent_source_id.to_string(),
197                shard.clone(),
198            ]),
199            updates_committed: defs.updates_committed.get_delete_on_drop_metric(vec![
200                id.to_string(),
201                worker_id.to_string(),
202                parent_source_id.to_string(),
203                shard.clone(),
204            ]),
205            bytes_received: defs.bytes_received.get_delete_on_drop_metric(vec![
206                id.to_string(),
207                worker_id.to_string(),
208                parent_source_id.to_string(),
209            ]),
210            bytes_indexed: defs.bytes_indexed.get_delete_on_drop_metric(vec![
211                id.to_string(),
212                worker_id.to_string(),
213                parent_source_id.to_string(),
214                shard.clone(),
215            ]),
216            records_indexed: defs.records_indexed.get_delete_on_drop_metric(vec![
217                id.to_string(),
218                worker_id.to_string(),
219                parent_source_id.to_string(),
220                shard.clone(),
221            ]),
222            envelope_state_tombstones: defs.envelope_state_tombstones.get_delete_on_drop_metric(
223                vec![
224                    id.to_string(),
225                    worker_id.to_string(),
226                    parent_source_id.to_string(),
227                    shard.clone(),
228                ],
229            ),
230            rehydration_latency_ms: defs.rehydration_latency_ms.get_delete_on_drop_metric(vec![
231                id.to_string(),
232                worker_id.to_string(),
233                parent_source_id.to_string(),
234                shard.clone(),
235                envelope.to_string(),
236            ]),
237            offset_known: defs.offset_known.get_delete_on_drop_metric(vec![
238                id.to_string(),
239                worker_id.to_string(),
240                shard.clone(),
241            ]),
242            offset_committed: defs.offset_committed.get_delete_on_drop_metric(vec![
243                id.to_string(),
244                worker_id.to_string(),
245                shard.clone(),
246            ]),
247            snapshot_records_known: defs.snapshot_records_known.get_delete_on_drop_metric(vec![
248                id.to_string(),
249                worker_id.to_string(),
250                shard.clone(),
251                parent_source_id.to_string(),
252            ]),
253            snapshot_records_staged: defs.snapshot_records_staged.get_delete_on_drop_metric(vec![
254                id.to_string(),
255                worker_id.to_string(),
256                shard.clone(),
257                parent_source_id.to_string(),
258            ]),
259        }
260    }
261}
262
263#[derive(Clone, Debug)]
264pub(crate) struct SinkStatisticsMetricDefs {
265    // Counters
266    pub(crate) messages_staged: IntCounterVec,
267    pub(crate) messages_committed: IntCounterVec,
268    pub(crate) bytes_staged: IntCounterVec,
269    pub(crate) bytes_committed: IntCounterVec,
270}
271
272impl SinkStatisticsMetricDefs {
273    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
274        Self {
275            messages_staged: registry.register(metric!(
276                name: "mz_sink_messages_staged",
277                help: "The number of messages staged but possibly not committed to the sink.",
278                var_labels: ["sink_id", "worker_id"],
279            )),
280            messages_committed: registry.register(metric!(
281                name: "mz_sink_messages_committed",
282                help: "The number of messages committed to the sink.",
283                var_labels: ["sink_id", "worker_id"],
284            )),
285            bytes_staged: registry.register(metric!(
286                name: "mz_sink_bytes_staged",
287                help: "The number of bytes staged but possibly not committed to the sink.",
288                var_labels: ["sink_id", "worker_id"],
289            )),
290            bytes_committed: registry.register(metric!(
291                name: "mz_sink_bytes_committed",
292                help: "The number of bytes committed to the sink.",
293                var_labels: ["sink_id", "worker_id"],
294            )),
295        }
296    }
297}
298
299/// Prometheus metrics for user-facing sink metrics.
300#[derive(Debug)]
301pub struct SinkStatisticsMetrics {
302    // Counters
303    pub(crate) messages_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
304    pub(crate) messages_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
305    pub(crate) bytes_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
306    pub(crate) bytes_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
307}
308
309impl SinkStatisticsMetrics {
310    pub(crate) fn new(
311        defs: &SinkStatisticsMetricDefs,
312        id: GlobalId,
313        worker_id: usize,
314    ) -> SinkStatisticsMetrics {
315        SinkStatisticsMetrics {
316            messages_staged: defs
317                .messages_staged
318                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
319            messages_committed: defs
320                .messages_committed
321                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
322            bytes_staged: defs
323                .bytes_staged
324                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
325            bytes_committed: defs
326                .bytes_committed
327                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
328        }
329    }
330}
331
332/// Meta data needed to maintain source statistics.
333#[derive(Debug, Clone)]
334pub struct SourceStatisticsMetadata {
335    /// The resumption upper of the source.
336    resume_upper: Antichain<Timestamp>,
337    /// Time at which the source (more precisely: this metadata object) was created.
338    created_at: Instant,
339}
340
341impl SourceStatisticsMetadata {
342    /// Create a new `SourceStatisticsMetadata` object.
343    pub fn new(resume_upper: Antichain<Timestamp>) -> Self {
344        Self {
345            resume_upper,
346            created_at: Instant::now(),
347        }
348    }
349}
350
351#[derive(Debug)]
352struct StatsInner<Stats, Metrics> {
353    stats: Stats,
354    prom: Metrics,
355}
356
357/// A helper struct designed to make it easy for operators to update user-facing metrics.
358/// This struct also ensures that each stack is also incremented in prometheus.
359///
360/// Caveats:
361/// - There is one Prometheus timeseries-per-worker, and we label it with the source id and the
362/// source id of the parent source (if there is no parent, these labels have the same value).
363///     - Some metrics also have the shard id we are writing metrics for.
364/// - The prometheus metrics do not have the same timestamps as the ones exposed in sql, because
365/// they are written at different times.
366///     - This may be fixed in the future when we write the metrics from storaged directly.
367///     - The value also eventually converge to the same value.
368#[derive(Debug)]
369pub struct StorageStatistics<Stats, Metrics, Meta> {
370    // Note also that the `DeleteOnDropCounter`'s in the `SourceStatisticsMetrics`
371    // already are in an `Arc`, so this is a bit of extra wrapping, but the cost
372    // shouldn't cost too much.
373    stats: Rc<RefCell<StatsInner<Stats, Metrics>>>,
374    /// Meta data needed to maintain statistics.
375    meta: Meta,
376}
377
378impl<Stats, Metrics, Meta: Clone> Clone for StorageStatistics<Stats, Metrics, Meta> {
379    fn clone(&self) -> Self {
380        Self {
381            stats: Rc::clone(&self.stats),
382            meta: self.meta.clone(),
383        }
384    }
385}
386
387/// Statistics maintained for sources. Gauges can be uninitialized.
388#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
389pub struct SourceStatisticsRecord {
390    id: GlobalId,
391    worker_id: usize,
392    // Counters
393    messages_received: u64,
394    bytes_received: u64,
395    updates_staged: u64,
396    updates_committed: u64,
397
398    // Gauges are always wrapped in an `Option` that represents if that gauge has been
399    // initialized by that worker.
400    records_indexed: Option<u64>,
401    bytes_indexed: Option<u64>,
402
403    // This field is nullable, so its value is an `Option`.
404    rehydration_latency_ms: Option<Option<i64>>,
405
406    // The following fields are able to be unset when shipped to the controller, so their
407    // values are `Option`'s
408    snapshot_records_known: Option<Option<u64>>,
409    snapshot_records_staged: Option<Option<u64>>,
410    snapshot_committed: Option<bool>,
411    offset_known: Option<Option<u64>>,
412    offset_committed: Option<Option<u64>>,
413
414    // Just prometheus.
415    envelope_state_tombstones: u64,
416}
417
418impl SourceStatisticsRecord {
419    fn reset_gauges(&mut self) {
420        // These gauges MUST be initialized across all workers before we aggregate and
421        // report statistics.
422        self.rehydration_latency_ms = None;
423        self.snapshot_committed = None;
424
425        // We consider these gauges always initialized
426        self.bytes_indexed = Some(0);
427        self.records_indexed = Some(0);
428
429        // We don't yet populate these, so we consider the initialized (with an empty value).
430        self.snapshot_records_known = Some(None);
431        self.snapshot_records_staged = Some(None);
432        self.offset_known = Some(None);
433        self.offset_committed = Some(None);
434
435        self.envelope_state_tombstones = 0;
436    }
437
438    /// Reset counters so that we continue to ship diffs to the controller.
439    fn reset_counters(&mut self) {
440        self.messages_received = 0;
441        self.bytes_received = 0;
442        self.updates_staged = 0;
443        self.updates_committed = 0;
444    }
445
446    /// Convert this record into an `SourceStatisticsUpdate` to be merged
447    /// across workers. Gauges must be initialized.
448    fn as_update(&self) -> SourceStatisticsUpdate {
449        let SourceStatisticsRecord {
450            id,
451            worker_id: _,
452            messages_received,
453            bytes_received,
454            updates_staged,
455            updates_committed,
456            records_indexed,
457            bytes_indexed,
458            rehydration_latency_ms,
459            snapshot_records_known,
460            snapshot_records_staged,
461            snapshot_committed,
462            offset_known,
463            offset_committed,
464            envelope_state_tombstones: _,
465        } = self.clone();
466
467        SourceStatisticsUpdate {
468            id,
469            messages_received: messages_received.into(),
470            bytes_received: bytes_received.into(),
471            updates_staged: updates_staged.into(),
472            updates_committed: updates_committed.into(),
473            records_indexed: Gauge::gauge(records_indexed.unwrap()),
474            bytes_indexed: Gauge::gauge(bytes_indexed.unwrap()),
475            rehydration_latency_ms: Gauge::gauge(rehydration_latency_ms.unwrap()),
476            snapshot_records_known: Gauge::gauge(snapshot_records_known.unwrap()),
477            snapshot_records_staged: Gauge::gauge(snapshot_records_staged.unwrap()),
478            snapshot_committed: Gauge::gauge(snapshot_committed.unwrap()),
479            offset_known: Gauge::gauge(offset_known.unwrap()),
480            offset_committed: Gauge::gauge(offset_committed.unwrap()),
481        }
482    }
483}
484
485/// Statistics maintained for sinks. Gauges can be uninitialized.
486#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
487pub struct SinkStatisticsRecord {
488    id: GlobalId,
489    worker_id: usize,
490    // Counters
491    messages_staged: u64,
492    messages_committed: u64,
493    bytes_staged: u64,
494    bytes_committed: u64,
495}
496
497impl SinkStatisticsRecord {
498    fn reset_gauges(&self) {}
499
500    /// Reset counters so that we continue to ship diffs to the controller.
501    fn reset_counters(&mut self) {
502        self.messages_staged = 0;
503        self.messages_committed = 0;
504        self.bytes_staged = 0;
505        self.bytes_committed = 0;
506    }
507
508    /// Convert this record into an `SinkStatisticsUpdate` to be merged
509    /// across workers. Gauges must be initialized.
510    fn as_update(&self) -> SinkStatisticsUpdate {
511        let SinkStatisticsRecord {
512            id,
513            worker_id: _,
514            messages_staged,
515            messages_committed,
516            bytes_staged,
517            bytes_committed,
518        } = self.clone();
519
520        SinkStatisticsUpdate {
521            id,
522            messages_staged: messages_staged.into(),
523            messages_committed: messages_committed.into(),
524            bytes_staged: bytes_staged.into(),
525            bytes_committed: bytes_committed.into(),
526        }
527    }
528}
529
530/// Statistics maintained for sources.
531pub type SourceStatistics =
532    StorageStatistics<SourceStatisticsRecord, SourceStatisticsMetrics, SourceStatisticsMetadata>;
533
534/// Statistics maintained for sinks.
535pub type SinkStatistics = StorageStatistics<SinkStatisticsRecord, SinkStatisticsMetrics, ()>;
536
537impl SourceStatistics {
538    pub(crate) fn new(
539        id: GlobalId,
540        worker_id: usize,
541        metrics: &SourceStatisticsMetricDefs,
542        parent_source_id: GlobalId,
543        shard_id: &mz_persist_client::ShardId,
544        envelope: SourceEnvelope,
545        resume_upper: Antichain<Timestamp>,
546    ) -> Self {
547        Self {
548            stats: Rc::new(RefCell::new(StatsInner {
549                stats: SourceStatisticsRecord {
550                    id,
551                    worker_id,
552                    messages_received: 0,
553                    updates_staged: 0,
554                    updates_committed: 0,
555                    bytes_received: 0,
556                    records_indexed: Some(0),
557                    bytes_indexed: Some(0),
558                    rehydration_latency_ms: None,
559                    snapshot_records_staged: Some(None),
560                    snapshot_records_known: Some(None),
561                    snapshot_committed: None,
562                    offset_known: Some(None),
563                    offset_committed: Some(None),
564                    envelope_state_tombstones: 0,
565                },
566                prom: SourceStatisticsMetrics::new(
567                    metrics,
568                    id,
569                    worker_id,
570                    parent_source_id,
571                    shard_id,
572                    envelope,
573                ),
574            })),
575            meta: SourceStatisticsMetadata::new(resume_upper),
576        }
577    }
578
579    /// Reset gauges to uninitialized state.
580    /// This does not reset prometheus gauges, which will be overwritten with new values.
581    pub fn reset_gauges(&self) {
582        let mut cur = self.stats.borrow_mut();
583        cur.stats.reset_gauges();
584    }
585
586    /// Get a snapshot of the data, returning `None` if all gauges are not initialized.
587    ///
588    /// This also resets counters, so that we continue to move diffs around.
589    pub fn snapshot(&self) -> Option<SourceStatisticsRecord> {
590        let mut cur = self.stats.borrow_mut();
591
592        match &cur.stats {
593            SourceStatisticsRecord {
594                records_indexed: Some(_),
595                bytes_indexed: Some(_),
596                rehydration_latency_ms: Some(_),
597                snapshot_records_known: Some(_),
598                snapshot_records_staged: Some(_),
599                snapshot_committed: Some(_),
600                offset_known: Some(_),
601                offset_committed: Some(_),
602                ..
603            } => {
604                let ret = Some(cur.stats.clone());
605                cur.stats.reset_counters();
606                ret
607            }
608            _ => None,
609        }
610    }
611
612    /// Set the `snapshot_committed` stat based on the reported upper, and
613    /// mark the stats as initialized.
614    ///
615    /// - In sql, we ensure that we _never_ reset `snapshot_committed` to `false`, but gauges and
616    /// counters are ordinarily reset to 0 in Prometheus, so on restarts this value may be inconsistent.
617    // TODO(guswynn): Actually test that this initialization logic works.
618    pub fn initialize_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
619        self.update_snapshot_committed(upper);
620    }
621
622    /// Set the `snapshot_committed` stat based on the reported upper.
623    pub fn update_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
624        let value = *upper != Antichain::from_elem(Timestamp::MIN);
625        let mut cur = self.stats.borrow_mut();
626        cur.stats.snapshot_committed = Some(value);
627        cur.prom.snapshot_committed.set(if value { 1 } else { 0 });
628    }
629
630    /// Increment the `messages_received` stat.
631    pub fn inc_messages_received_by(&self, value: u64) {
632        let mut cur = self.stats.borrow_mut();
633        cur.stats.messages_received = cur.stats.messages_received + value;
634        cur.prom.messages_received.inc_by(value);
635    }
636
637    /// Increment the `updates` stat.
638    pub fn inc_updates_staged_by(&self, value: u64) {
639        let mut cur = self.stats.borrow_mut();
640        cur.stats.updates_staged = cur.stats.updates_staged + value;
641        cur.prom.updates_staged.inc_by(value);
642    }
643
644    /// Increment the `messages_committed` stat.
645    pub fn inc_updates_committed_by(&self, value: u64) {
646        let mut cur = self.stats.borrow_mut();
647        cur.stats.updates_committed = cur.stats.updates_committed + value;
648        cur.prom.updates_committed.inc_by(value);
649    }
650
651    /// Increment the `bytes_received` stat.
652    pub fn inc_bytes_received_by(&self, value: u64) {
653        let mut cur = self.stats.borrow_mut();
654        cur.stats.bytes_received = cur.stats.bytes_received + value;
655        cur.prom.bytes_received.inc_by(value);
656    }
657
658    /// Update the `bytes_indexed` stat.
659    /// A positive value will add and a negative value will subtract.
660    pub fn update_bytes_indexed_by(&self, value: i64) {
661        let mut cur = self.stats.borrow_mut();
662        if let Some(updated) = cur
663            .stats
664            .bytes_indexed
665            .unwrap_or(0)
666            .checked_add_signed(value)
667        {
668            cur.stats.bytes_indexed = Some(updated);
669            cur.prom.bytes_indexed.set(updated);
670        } else {
671            let bytes_indexed = cur.stats.bytes_indexed.unwrap_or(0);
672            tracing::warn!(
673                "Unexpected u64 overflow while updating bytes_indexed value {} with {}",
674                bytes_indexed,
675                value
676            );
677            cur.stats.bytes_indexed = Some(0);
678            cur.prom.bytes_indexed.set(0);
679        }
680    }
681
682    /// Set the `bytes_indexed` to the given value
683    pub fn set_bytes_indexed(&self, value: i64) {
684        let mut cur = self.stats.borrow_mut();
685        let value = if value < 0 {
686            tracing::warn!("Unexpected negative value for bytes_indexed {}", value);
687            0
688        } else {
689            value.unsigned_abs()
690        };
691        cur.stats.bytes_indexed = Some(value);
692        cur.prom.bytes_indexed.set(value);
693    }
694
695    /// Update the `records_indexed` stat.
696    /// A positive value will add and a negative value will subtract.
697    pub fn update_records_indexed_by(&self, value: i64) {
698        let mut cur = self.stats.borrow_mut();
699        if let Some(updated) = cur
700            .stats
701            .records_indexed
702            .unwrap_or(0)
703            .checked_add_signed(value)
704        {
705            cur.stats.records_indexed = Some(updated);
706            cur.prom.records_indexed.set(updated);
707        } else {
708            let records_indexed = cur.stats.records_indexed.unwrap_or(0);
709            tracing::warn!(
710                "Unexpected u64 overflow while updating records_indexed value {} with {}",
711                records_indexed,
712                value
713            );
714            cur.stats.records_indexed = Some(0);
715            cur.prom.records_indexed.set(0);
716        }
717    }
718
719    /// Set the `records_indexed` to the given value
720    pub fn set_records_indexed(&self, value: i64) {
721        let mut cur = self.stats.borrow_mut();
722        let value = if value < 0 {
723            tracing::warn!("Unexpected negative value for records_indexed {}", value);
724            0
725        } else {
726            value.unsigned_abs()
727        };
728        cur.stats.records_indexed = Some(value);
729        cur.prom.records_indexed.set(value);
730    }
731
732    /// Initialize the `rehydration_latency_ms` stat as `NULL`.
733    pub fn initialize_rehydration_latency_ms(&self) {
734        let mut cur = self.stats.borrow_mut();
735        cur.stats.rehydration_latency_ms = Some(None);
736    }
737
738    /// Update the `envelope_state_tombstones` stat.
739    /// A positive value will add and a negative value will subtract.
740    // TODO(guswynn): consider exposing this to users
741    pub fn update_envelope_state_tombstones_by(&self, value: i64) {
742        let mut cur = self.stats.borrow_mut();
743        if let Some(updated) = cur
744            .stats
745            .envelope_state_tombstones
746            .checked_add_signed(value)
747        {
748            cur.stats.envelope_state_tombstones = updated;
749            cur.prom.envelope_state_tombstones.set(updated);
750        } else {
751            let envelope_state_tombstones = cur.stats.envelope_state_tombstones;
752            tracing::warn!(
753                "Unexpected u64 overflow while updating envelope_state_tombstones value {} with {}",
754                envelope_state_tombstones,
755                value
756            );
757            cur.stats.envelope_state_tombstones = 0;
758            cur.prom.envelope_state_tombstones.set(0);
759        }
760    }
761
762    /// Set the `rehydration_latency_ms` stat based on the reported upper.
763    pub fn update_rehydration_latency_ms(&self, upper: &Antichain<Timestamp>) {
764        let mut cur = self.stats.borrow_mut();
765
766        if matches!(cur.stats.rehydration_latency_ms, Some(Some(_))) {
767            return; // source was already hydrated before
768        }
769        if !PartialOrder::less_than(&self.meta.resume_upper, upper) {
770            return; // source is not yet hydrated
771        }
772
773        let elapsed = self.meta.created_at.elapsed();
774        let value = elapsed
775            .as_millis()
776            .try_into()
777            .expect("Rehydration took more than ~584 million years!");
778        cur.stats.rehydration_latency_ms = Some(Some(value));
779        cur.prom.rehydration_latency_ms.set(value);
780    }
781
782    /// Set the `offset_known` stat to the given value.
783    pub fn set_offset_known(&self, value: u64) {
784        let mut cur = self.stats.borrow_mut();
785        cur.stats.offset_known = Some(Some(value));
786        cur.prom.offset_known.set(value);
787    }
788
789    /// Set the `offset_committed` stat to the given value.
790    pub fn set_offset_committed(&self, value: u64) {
791        let mut cur = self.stats.borrow_mut();
792        cur.stats.offset_committed = Some(Some(value));
793        cur.prom.offset_committed.set(value);
794    }
795
796    /// Set the `snapshot_records_known` stat to the given value.
797    pub fn set_snapshot_records_known(&self, value: u64) {
798        let mut cur = self.stats.borrow_mut();
799        cur.stats.snapshot_records_known = Some(Some(value));
800        cur.prom.snapshot_records_known.set(value);
801    }
802
803    /// Set the `snapshot_records_known` stat to the given value.
804    pub fn set_snapshot_records_staged(&self, value: u64) {
805        let mut cur = self.stats.borrow_mut();
806        cur.stats.snapshot_records_staged = Some(Some(value));
807        cur.prom.snapshot_records_staged.set(value);
808    }
809}
810
811impl SinkStatistics {
812    pub(crate) fn new(id: GlobalId, worker_id: usize, metrics: &SinkStatisticsMetricDefs) -> Self {
813        Self {
814            stats: Rc::new(RefCell::new(StatsInner {
815                stats: SinkStatisticsRecord {
816                    id,
817                    worker_id,
818                    messages_staged: 0,
819                    messages_committed: 0,
820                    bytes_staged: 0,
821                    bytes_committed: 0,
822                },
823                prom: SinkStatisticsMetrics::new(metrics, id, worker_id),
824            })),
825            meta: (),
826        }
827    }
828
829    /// Reset gauges to uninitialized state.
830    /// This does not reset prometheus gauges, which will be overwritten with new values.
831    pub fn reset_gauges(&self) {
832        let cur = self.stats.borrow_mut();
833        cur.stats.reset_gauges();
834    }
835
836    /// Get a snapshot of the data, returning `None` if all gauges are not initialized.
837    ///
838    /// This also resets counters, so that we continue to move diffs around.
839    pub fn snapshot(&self) -> Option<SinkStatisticsRecord> {
840        let mut cur = self.stats.borrow_mut();
841
842        match &cur.stats {
843            SinkStatisticsRecord { .. } => {
844                let ret = Some(cur.stats.clone());
845                cur.stats.reset_counters();
846                ret
847            }
848        }
849    }
850
851    /// Increment the `messages_staged` stat.
852    pub fn inc_messages_staged_by(&self, value: u64) {
853        let mut cur = self.stats.borrow_mut();
854        cur.stats.messages_staged = cur.stats.messages_staged + value;
855        cur.prom.messages_staged.inc_by(value);
856    }
857
858    /// Increment the `bytes_received` stat.
859    pub fn inc_bytes_staged_by(&self, value: u64) {
860        let mut cur = self.stats.borrow_mut();
861        cur.stats.bytes_staged = cur.stats.bytes_staged + value;
862        cur.prom.bytes_staged.inc_by(value);
863    }
864
865    /// Increment the `messages_committed` stat.
866    pub fn inc_messages_committed_by(&self, value: u64) {
867        let mut cur = self.stats.borrow_mut();
868        cur.stats.messages_committed = cur.stats.messages_committed + value;
869        cur.prom.messages_committed.inc_by(value);
870    }
871
872    /// Increment the `bytes_committed` stat.
873    pub fn inc_bytes_committed_by(&self, value: u64) {
874        let mut cur = self.stats.borrow_mut();
875        cur.stats.bytes_committed = cur.stats.bytes_committed + value;
876        cur.prom.bytes_committed.inc_by(value);
877    }
878}
879
880/// A structure that keeps track of _local_ statistics, as well as aggregating
881/// statistics into a single worker (currently worker 0).
882///
883/// This is because we ONLY want to emit statistics for _gauge-style-statistics_ when
884/// ALL workers have caught up to the _currently running instance of a source/sink_ and have
885/// reported an accurate gauge. This prevents issues like 1 worker reporting
886/// `snapshot_committed=true` before the others workers have caught up to report that they haven't
887/// finished snapshotting.
888///
889/// The API flow is:
890/// - Initialize sources/sinks with `initialize_source` or `initialize_sink`. This advances the
891/// local epoch.
892/// - Advance the _global epoch_ with `advance_global_epoch`. This should always be called strictly
893/// before reinitializing objects.
894/// - Emit a snapshot of local data with `emit_local`, which can be broadcasted to other workers.
895/// - Ingest data from other workers with `ingest`.
896/// - Emit a `snapshot` of _global data_ (i.e. skipping objects that don't have all workers caught
897/// up) with `snapshot`.
898///
899/// All functions can and should be called from ALL workers.
900///
901/// Note that we hand-roll a statistics epoch here, but in the future, when the storage
902/// _controller_ has top-level support for _dataflow epochs_, this logic can be entirely moved
903/// there.
904pub struct AggregatedStatistics {
905    worker_id: usize,
906    worker_count: usize,
907    local_source_statistics: BTreeMap<GlobalId, (usize, GlobalId, SourceStatistics)>,
908    local_sink_statistics: BTreeMap<GlobalId, (usize, SinkStatistics)>,
909
910    global_source_statistics:
911        BTreeMap<GlobalId, (usize, GlobalId, Vec<Option<SourceStatisticsUpdate>>)>,
912    global_sink_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SinkStatisticsUpdate>>)>,
913}
914
915impl AggregatedStatistics {
916    /// Initialize a new `AggregatedStatistics`.
917    pub fn new(worker_id: usize, worker_count: usize) -> Self {
918        AggregatedStatistics {
919            worker_id,
920            worker_count,
921            local_source_statistics: Default::default(),
922            local_sink_statistics: Default::default(),
923            global_source_statistics: Default::default(),
924            global_sink_statistics: Default::default(),
925        }
926    }
927
928    /// Get a collection of `SourceStatistics` for the ingestion `ingestion_id`.
929    pub fn get_ingestion_stats(
930        &self,
931        ingestion_id: &GlobalId,
932    ) -> BTreeMap<GlobalId, SourceStatistics> {
933        let mut ingestion_stats = BTreeMap::new();
934        for (id, (_epoch, ingestion_id2, stats)) in self.local_source_statistics.iter() {
935            if ingestion_id == ingestion_id2 {
936                ingestion_stats.insert(id.clone(), stats.clone());
937            }
938        }
939        ingestion_stats
940    }
941
942    /// Get a `SourceStatistics` for an id, if it exists.
943    pub fn get_source(&self, id: &GlobalId) -> Option<&SourceStatistics> {
944        self.local_source_statistics.get(id).map(|(_, _, s)| s)
945    }
946
947    /// Get a `SinkStatistics` for an id, if it exists.
948    pub fn get_sink(&self, id: &GlobalId) -> Option<&SinkStatistics> {
949        self.local_sink_statistics.get(id).map(|(_, s)| s)
950    }
951
952    /// Deinitialize an object. Other methods other than `initialize_source` and `initialize_sink`
953    /// will never overwrite this.
954    pub fn deinitialize(&mut self, id: GlobalId) {
955        self.local_source_statistics.remove(&id);
956        self.local_sink_statistics.remove(&id);
957        self.global_source_statistics.remove(&id);
958        self.global_sink_statistics.remove(&id);
959    }
960
961    /// Advance the _global epoch_ for statistics.
962    ///
963    /// Gauge values from previous epochs will be ignored. Counter values from previous epochs will
964    /// still be applied as usual.
965    pub fn advance_global_epoch(&mut self, id: GlobalId) {
966        if let Some((epoch, _ingestion_id, stats)) = self.global_source_statistics.get_mut(&id) {
967            *epoch += 1;
968            for worker_stats in stats {
969                if let Some(update) = worker_stats {
970                    update.reset_gauges();
971                }
972            }
973        }
974        if let Some((epoch, stats)) = self.global_sink_statistics.get_mut(&id) {
975            *epoch += 1;
976            for worker_stats in stats {
977                if let Some(update) = worker_stats {
978                    update.reset_gauges();
979                }
980            }
981        }
982    }
983
984    /// Re-initialize a source. If it already exists, then its local epoch is advanced.
985    pub fn initialize_source<F: FnOnce() -> SourceStatistics>(
986        &mut self,
987        id: GlobalId,
988        ingestion_id: GlobalId,
989        resume_upper: Antichain<Timestamp>,
990        stats: F,
991    ) {
992        self.local_source_statistics
993            .entry(id)
994            .and_modify(|(epoch, ingestion_id2, stats)| {
995                assert_eq!(ingestion_id, *ingestion_id2);
996                *epoch += 1;
997                stats.reset_gauges();
998                stats.meta = SourceStatisticsMetadata::new(resume_upper);
999            })
1000            .or_insert_with(|| (0, ingestion_id, stats()));
1001
1002        if self.worker_id == 0 {
1003            self.global_source_statistics
1004                .entry(id)
1005                .or_insert_with(|| (0, ingestion_id, vec![None; self.worker_count]));
1006        }
1007    }
1008
1009    /// Re-initialize a sink. If it already exists, then its local epoch is advanced.
1010    pub fn initialize_sink<F: FnOnce() -> SinkStatistics>(&mut self, id: GlobalId, stats: F) {
1011        self.local_sink_statistics
1012            .entry(id)
1013            .and_modify(|(epoch, stats)| {
1014                *epoch += 1;
1015                stats.reset_gauges();
1016            })
1017            .or_insert_with(|| (0, stats()));
1018        if self.worker_id == 0 {
1019            self.global_sink_statistics
1020                .entry(id)
1021                .or_insert_with(|| (0, vec![None; self.worker_count]));
1022        }
1023    }
1024
1025    /// Ingest data from other workers.
1026    pub fn ingest(
1027        &mut self,
1028        source_statistics: Vec<(usize, SourceStatisticsRecord)>,
1029        sink_statistics: Vec<(usize, SinkStatisticsRecord)>,
1030    ) {
1031        // Currently, only worker 0 ingest data from other workers.
1032        if self.worker_id != 0 {
1033            return;
1034        }
1035
1036        for (epoch, stat) in source_statistics {
1037            if let Some((global_epoch, _, stats)) = self.global_source_statistics.get_mut(&stat.id)
1038            {
1039                // The record might be from a previous incarnation of the source. If that's the
1040                // case, we only incorporate its counter values and ignore its gauge values.
1041                let epoch_match = epoch >= *global_epoch;
1042                let mut update = stat.as_update();
1043                match (&mut stats[stat.worker_id], epoch_match) {
1044                    (None, true) => stats[stat.worker_id] = Some(update),
1045                    (None, false) => {
1046                        update.reset_gauges();
1047                        stats[stat.worker_id] = Some(update);
1048                    }
1049                    (Some(occupied), true) => occupied.incorporate(update),
1050                    (Some(occupied), false) => occupied.incorporate_counters(update),
1051                }
1052            }
1053        }
1054
1055        for (epoch, stat) in sink_statistics {
1056            if let Some((global_epoch, stats)) = self.global_sink_statistics.get_mut(&stat.id) {
1057                // The record might be from a previous incarnation of the sink. If that's the
1058                // case, we only incorporate its counter values and ignore its gauge values.
1059                let epoch_match = epoch >= *global_epoch;
1060                let update = stat.as_update();
1061                match (&mut stats[stat.worker_id], epoch_match) {
1062                    (None, true) => stats[stat.worker_id] = Some(update),
1063                    (None, false) => {
1064                        update.reset_gauges();
1065                        stats[stat.worker_id] = Some(update);
1066                    }
1067                    (Some(occupied), true) => occupied.incorporate(update),
1068                    (Some(occupied), false) => occupied.incorporate_counters(update),
1069                }
1070            }
1071        }
1072    }
1073
1074    fn _emit_local(
1075        &mut self,
1076    ) -> (
1077        Vec<(usize, SourceStatisticsRecord)>,
1078        Vec<(usize, SinkStatisticsRecord)>,
1079    ) {
1080        let sources = self
1081            .local_source_statistics
1082            .values_mut()
1083            .flat_map(|(epoch, _, s)| s.snapshot().map(|v| (*epoch, v)))
1084            .collect();
1085
1086        let sinks = self
1087            .local_sink_statistics
1088            .values_mut()
1089            .flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
1090            .collect();
1091
1092        (sources, sinks)
1093    }
1094
1095    /// Emit a snapshot of this workers local data.
1096    pub fn emit_local(
1097        &mut self,
1098    ) -> (
1099        Vec<(usize, SourceStatisticsRecord)>,
1100        Vec<(usize, SinkStatisticsRecord)>,
1101    ) {
1102        // As an optimization, worker 0 does not broadcast it data. It ingests
1103        // it in `snapshot`.
1104        if self.worker_id == 0 {
1105            return (Vec::new(), Vec::new());
1106        }
1107
1108        self._emit_local()
1109    }
1110
1111    /// Emit a _global_ snapshot of data. This does not include objects whose workers have not
1112    /// initialized gauges for the current epoch.
1113    pub fn snapshot(&mut self) -> (Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>) {
1114        if !self.worker_id == 0 {
1115            return (Vec::new(), Vec::new());
1116        }
1117
1118        let (sources, sinks) = self._emit_local();
1119        self.ingest(sources, sinks);
1120
1121        let sources = self
1122            .global_source_statistics
1123            .iter_mut()
1124            .filter_map(|(_, (_, _, s))| {
1125                if s.iter().all(|s| s.is_some()) {
1126                    let ret = Some(SourceStatisticsUpdate::summarize(|| {
1127                        s.iter().filter_map(Option::as_ref)
1128                    }));
1129
1130                    // Reset the counters so we only report diffs.
1131                    s.iter_mut().for_each(|s| {
1132                        if let Some(s) = s {
1133                            s.reset_counters();
1134                        }
1135                    });
1136                    ret
1137                } else {
1138                    None
1139                }
1140            })
1141            .collect();
1142
1143        let sinks = self
1144            .global_sink_statistics
1145            .iter_mut()
1146            .filter_map(|(_, (_, s))| {
1147                if s.iter().all(|s| s.is_some()) {
1148                    let ret = Some(SinkStatisticsUpdate::summarize(|| {
1149                        s.iter().filter_map(Option::as_ref)
1150                    }));
1151
1152                    // Reset the counters so we only report diffs.
1153                    s.iter_mut().for_each(|s| {
1154                        if let Some(s) = s {
1155                            s.reset_counters();
1156                        }
1157                    });
1158                    ret
1159                } else {
1160                    None
1161                }
1162            })
1163            .collect();
1164
1165        (sources, sinks)
1166    }
1167}