Skip to main content

mz_storage/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Helpers for managing storage statistics.
11//!
12//!
13//! This module collects statistics related to sources and sinks. Statistics, as exposed
14//! to their respective system tables have strong semantics, defined within the
15//! `mz_storage_types::statistics` module. This module collects and aggregates metrics
16//! across workers according to those semantics.
17//!
18//! Note that it _simultaneously_ collect prometheus metrics for the given statistics. Those
19//! metrics _do not have the same strong semantics_, which is _by design_ to ensure we
20//! are able to categorically debug sources and sinks during complex failures (or bugs
21//! with statistics collection itself). Prometheus metrics are
22//! - Never dropped or reset until a source/sink is dropped.
23//! - Entirely independent across workers.
24
25use std::cell::RefCell;
26use std::collections::BTreeMap;
27use std::rc::Rc;
28use std::time::Instant;
29
30use mz_ore::metric;
31use mz_ore::metrics::{
32    DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, IntGaugeVec, MetricVisibility,
33    MetricsRegistry, UIntGaugeVec,
34};
35use mz_repr::{GlobalId, Timestamp};
36use mz_storage_client::statistics::{Gauge, SinkStatisticsUpdate, SourceStatisticsUpdate};
37use mz_storage_types::sources::SourceEnvelope;
38use prometheus::core::{AtomicI64, AtomicU64};
39use serde::{Deserialize, Serialize};
40use timely::PartialOrder;
41use timely::progress::frontier::Antichain;
42
43// Note(guswynn): ordinarily these metric structs would be in the `metrics` modules, but we
44// put them here so they can be near the user-facing definitions as well.
45
46#[derive(Clone, Debug)]
47pub(crate) struct SourceStatisticsMetricDefs {
48    // Counters
49    pub(crate) messages_received: IntCounterVec,
50    pub(crate) updates_staged: IntCounterVec,
51    pub(crate) updates_committed: IntCounterVec,
52    pub(crate) bytes_received: IntCounterVec,
53
54    // Gauges
55    pub(crate) snapshot_committed: UIntGaugeVec,
56    pub(crate) bytes_indexed: UIntGaugeVec,
57    pub(crate) records_indexed: UIntGaugeVec,
58    pub(crate) rehydration_latency_ms: IntGaugeVec,
59
60    pub(crate) offset_known: UIntGaugeVec,
61    pub(crate) offset_committed: UIntGaugeVec,
62    pub(crate) snapshot_records_known: UIntGaugeVec,
63    pub(crate) snapshot_records_staged: UIntGaugeVec,
64
65    // Just prometheus.
66    pub(crate) envelope_state_tombstones: UIntGaugeVec,
67}
68
69impl SourceStatisticsMetricDefs {
70    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
71        Self {
72            snapshot_committed: registry.register(metric!(
73                name: "mz_source_snapshot_committed",
74                help: "Whether or not the worker has committed the initial snapshot for a source.",
75                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
76            )),
77            messages_received: registry.register(metric!(
78                name: "mz_source_messages_received",
79                help: "The number of raw messages the worker has received from upstream.",
80                var_labels: ["source_id", "worker_id", "parent_source_id"],
81                visibility: MetricVisibility::Public,
82            )),
83            updates_staged: registry.register(metric!(
84                name: "mz_source_updates_staged",
85                help: "The number of updates (inserts + deletes) the worker has written but not yet committed to the storage layer.",
86                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
87            )),
88            updates_committed: registry.register(metric!(
89                name: "mz_source_updates_committed",
90                help: "The number of updates (inserts + deletes) the worker has committed into the storage layer.",
91                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
92            )),
93            bytes_received: registry.register(metric!(
94                name: "mz_source_bytes_received",
95                help: "The number of bytes worth of messages the worker has received from upstream. The way the bytes are counted is source-specific.",
96                var_labels: ["source_id", "worker_id", "parent_source_id"],
97                visibility: MetricVisibility::Public,
98            )),
99            bytes_indexed: registry.register(metric!(
100                name: "mz_source_bytes_indexed",
101                help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.",
102                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
103            )),
104            records_indexed: registry.register(metric!(
105                name: "mz_source_records_indexed",
106                help: "The number of records in the source envelope state. This will be specific to the envelope in use",
107                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
108            )),
109            envelope_state_tombstones: registry.register(metric!(
110                name: "mz_source_envelope_state_tombstones",
111                help: "The number of outstanding tombstones in the source envelope state. This will be specific to the envelope in use",
112                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
113            )),
114            rehydration_latency_ms: registry.register(metric!(
115                name: "mz_source_rehydration_latency_ms",
116                help: "The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. This will be specific to the envelope in use.",
117                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id", "envelope"],
118            )),
119            offset_known: registry.register(metric!(
120                name: "mz_source_offset_known",
121                help: "The total number of _values_ (source-defined unit) present in upstream.",
122                var_labels: ["source_id", "worker_id", "shard_id"],
123                visibility: MetricVisibility::Public,
124            )),
125            offset_committed: registry.register(metric!(
126                name: "mz_source_offset_committed",
127                help: "The total number of _values_ (source-defined unit) we have fully processed, and storage and committed.",
128                var_labels: ["source_id", "worker_id", "shard_id"],
129                visibility: MetricVisibility::Public,
130            )),
131            snapshot_records_known: registry.register(metric!(
132                name: "mz_source_snapshot_records_known",
133                help: "The total number of records in the source's snapshot",
134                var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
135            )),
136            snapshot_records_staged: registry.register(metric!(
137                name: "mz_source_snapshot_records_staged",
138                help: "The total number of records read from the source's snapshot",
139                var_labels: ["source_id", "worker_id", "shard_id", "parent_source_id"],
140            )),
141        }
142    }
143}
144
145/// Prometheus metrics for user-facing source metrics.
146#[derive(Debug)]
147pub struct SourceStatisticsMetrics {
148    // Counters
149    pub(crate) messages_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
150    pub(crate) updates_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
151    pub(crate) updates_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
152    pub(crate) bytes_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
153
154    // Gauges
155    pub(crate) snapshot_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
156    pub(crate) bytes_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
157    pub(crate) records_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
158    pub(crate) rehydration_latency_ms: DeleteOnDropGauge<AtomicI64, Vec<String>>,
159
160    pub(crate) offset_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
161    pub(crate) offset_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
162    pub(crate) snapshot_records_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
163    pub(crate) snapshot_records_staged: DeleteOnDropGauge<AtomicU64, Vec<String>>,
164
165    // Just prometheus.
166    pub(crate) envelope_state_tombstones: DeleteOnDropGauge<AtomicU64, Vec<String>>,
167}
168
169impl SourceStatisticsMetrics {
170    pub(crate) fn new(
171        defs: &SourceStatisticsMetricDefs,
172        id: GlobalId,
173        worker_id: usize,
174        parent_source_id: GlobalId,
175        shard_id: &mz_persist_client::ShardId,
176        envelope: SourceEnvelope,
177    ) -> SourceStatisticsMetrics {
178        let shard = shard_id.to_string();
179        let envelope = match envelope {
180            SourceEnvelope::None(_) => "none",
181            SourceEnvelope::Upsert(_) => "upsert",
182            SourceEnvelope::CdcV2 => "cdcv2",
183        };
184
185        SourceStatisticsMetrics {
186            snapshot_committed: defs.snapshot_committed.get_delete_on_drop_metric(vec![
187                id.to_string(),
188                worker_id.to_string(),
189                parent_source_id.to_string(),
190                shard.clone(),
191            ]),
192            messages_received: defs.messages_received.get_delete_on_drop_metric(vec![
193                id.to_string(),
194                worker_id.to_string(),
195                parent_source_id.to_string(),
196            ]),
197            updates_staged: defs.updates_staged.get_delete_on_drop_metric(vec![
198                id.to_string(),
199                worker_id.to_string(),
200                parent_source_id.to_string(),
201                shard.clone(),
202            ]),
203            updates_committed: defs.updates_committed.get_delete_on_drop_metric(vec![
204                id.to_string(),
205                worker_id.to_string(),
206                parent_source_id.to_string(),
207                shard.clone(),
208            ]),
209            bytes_received: defs.bytes_received.get_delete_on_drop_metric(vec![
210                id.to_string(),
211                worker_id.to_string(),
212                parent_source_id.to_string(),
213            ]),
214            bytes_indexed: defs.bytes_indexed.get_delete_on_drop_metric(vec![
215                id.to_string(),
216                worker_id.to_string(),
217                parent_source_id.to_string(),
218                shard.clone(),
219            ]),
220            records_indexed: defs.records_indexed.get_delete_on_drop_metric(vec![
221                id.to_string(),
222                worker_id.to_string(),
223                parent_source_id.to_string(),
224                shard.clone(),
225            ]),
226            envelope_state_tombstones: defs.envelope_state_tombstones.get_delete_on_drop_metric(
227                vec![
228                    id.to_string(),
229                    worker_id.to_string(),
230                    parent_source_id.to_string(),
231                    shard.clone(),
232                ],
233            ),
234            rehydration_latency_ms: defs.rehydration_latency_ms.get_delete_on_drop_metric(vec![
235                id.to_string(),
236                worker_id.to_string(),
237                parent_source_id.to_string(),
238                shard.clone(),
239                envelope.to_string(),
240            ]),
241            offset_known: defs.offset_known.get_delete_on_drop_metric(vec![
242                id.to_string(),
243                worker_id.to_string(),
244                shard.clone(),
245            ]),
246            offset_committed: defs.offset_committed.get_delete_on_drop_metric(vec![
247                id.to_string(),
248                worker_id.to_string(),
249                shard.clone(),
250            ]),
251            snapshot_records_known: defs.snapshot_records_known.get_delete_on_drop_metric(vec![
252                id.to_string(),
253                worker_id.to_string(),
254                shard.clone(),
255                parent_source_id.to_string(),
256            ]),
257            snapshot_records_staged: defs.snapshot_records_staged.get_delete_on_drop_metric(vec![
258                id.to_string(),
259                worker_id.to_string(),
260                shard.clone(),
261                parent_source_id.to_string(),
262            ]),
263        }
264    }
265}
266
267#[derive(Clone, Debug)]
268pub(crate) struct SinkStatisticsMetricDefs {
269    // Counters
270    pub(crate) messages_staged: IntCounterVec,
271    pub(crate) messages_committed: IntCounterVec,
272    pub(crate) bytes_staged: IntCounterVec,
273    pub(crate) bytes_committed: IntCounterVec,
274}
275
276impl SinkStatisticsMetricDefs {
277    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
278        Self {
279            messages_staged: registry.register(metric!(
280                name: "mz_sink_messages_staged",
281                help: "The number of messages staged but possibly not committed to the sink.",
282                var_labels: ["sink_id", "worker_id"],
283            )),
284            messages_committed: registry.register(metric!(
285                name: "mz_sink_messages_committed",
286                help: "The number of messages committed to the sink.",
287                var_labels: ["sink_id", "worker_id"],
288            )),
289            bytes_staged: registry.register(metric!(
290                name: "mz_sink_bytes_staged",
291                help: "The number of bytes staged but possibly not committed to the sink.",
292                var_labels: ["sink_id", "worker_id"],
293                visibility: MetricVisibility::Public,
294            )),
295            bytes_committed: registry.register(metric!(
296                name: "mz_sink_bytes_committed",
297                help: "The number of bytes committed to the sink.",
298                var_labels: ["sink_id", "worker_id"],
299                visibility: MetricVisibility::Public,
300            )),
301        }
302    }
303}
304
305/// Prometheus metrics for user-facing sink metrics.
306#[derive(Debug)]
307pub struct SinkStatisticsMetrics {
308    // Counters
309    pub(crate) messages_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
310    pub(crate) messages_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
311    pub(crate) bytes_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
312    pub(crate) bytes_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
313}
314
315impl SinkStatisticsMetrics {
316    pub(crate) fn new(
317        defs: &SinkStatisticsMetricDefs,
318        id: GlobalId,
319        worker_id: usize,
320    ) -> SinkStatisticsMetrics {
321        SinkStatisticsMetrics {
322            messages_staged: defs
323                .messages_staged
324                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
325            messages_committed: defs
326                .messages_committed
327                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
328            bytes_staged: defs
329                .bytes_staged
330                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
331            bytes_committed: defs
332                .bytes_committed
333                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
334        }
335    }
336}
337
338/// Meta data needed to maintain source statistics.
339#[derive(Debug, Clone)]
340pub struct SourceStatisticsMetadata {
341    /// The resumption upper of the source.
342    resume_upper: Antichain<Timestamp>,
343    /// Time at which the source (more precisely: this metadata object) was created.
344    created_at: Instant,
345}
346
347impl SourceStatisticsMetadata {
348    /// Create a new `SourceStatisticsMetadata` object.
349    pub fn new(resume_upper: Antichain<Timestamp>) -> Self {
350        Self {
351            resume_upper,
352            created_at: Instant::now(),
353        }
354    }
355}
356
357#[derive(Debug)]
358struct StatsInner<Stats, Metrics> {
359    stats: Stats,
360    prom: Metrics,
361}
362
363/// A helper struct designed to make it easy for operators to update user-facing metrics.
364/// This struct also ensures that each stack is also incremented in prometheus.
365///
366/// Caveats:
367/// - There is one Prometheus timeseries-per-worker, and we label it with the source id and the
368/// source id of the parent source (if there is no parent, these labels have the same value).
369///     - Some metrics also have the shard id we are writing metrics for.
370/// - The prometheus metrics do not have the same timestamps as the ones exposed in sql, because
371/// they are written at different times.
372///     - This may be fixed in the future when we write the metrics from storaged directly.
373///     - The value also eventually converge to the same value.
374#[derive(Debug)]
375pub struct StorageStatistics<Stats, Metrics, Meta> {
376    // Note also that the `DeleteOnDropCounter`'s in the `SourceStatisticsMetrics`
377    // already are in an `Arc`, so this is a bit of extra wrapping, but the cost
378    // shouldn't cost too much.
379    stats: Rc<RefCell<StatsInner<Stats, Metrics>>>,
380    /// Meta data needed to maintain statistics.
381    meta: Meta,
382}
383
384impl<Stats, Metrics, Meta: Clone> Clone for StorageStatistics<Stats, Metrics, Meta> {
385    fn clone(&self) -> Self {
386        Self {
387            stats: Rc::clone(&self.stats),
388            meta: self.meta.clone(),
389        }
390    }
391}
392
393/// Statistics maintained for sources. Gauges can be uninitialized.
394#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
395pub struct SourceStatisticsRecord {
396    id: GlobalId,
397    worker_id: usize,
398    // Counters
399    messages_received: u64,
400    bytes_received: u64,
401    updates_staged: u64,
402    updates_committed: u64,
403
404    // Gauges are always wrapped in an `Option` that represents if that gauge has been
405    // initialized by that worker.
406    records_indexed: Option<u64>,
407    bytes_indexed: Option<u64>,
408
409    // This field is nullable, so its value is an `Option`.
410    rehydration_latency_ms: Option<Option<i64>>,
411
412    // The following fields are able to be unset when shipped to the controller, so their
413    // values are `Option`'s
414    snapshot_records_known: Option<Option<u64>>,
415    snapshot_records_staged: Option<Option<u64>>,
416    snapshot_committed: Option<bool>,
417    offset_known: Option<Option<u64>>,
418    offset_committed: Option<Option<u64>>,
419
420    // Just prometheus.
421    envelope_state_tombstones: u64,
422}
423
424impl SourceStatisticsRecord {
425    fn reset_gauges(&mut self) {
426        // These gauges MUST be initialized across all workers before we aggregate and
427        // report statistics.
428        self.rehydration_latency_ms = None;
429        self.snapshot_committed = None;
430
431        // We consider these gauges always initialized
432        self.bytes_indexed = Some(0);
433        self.records_indexed = Some(0);
434
435        // We don't yet populate these, so we consider the initialized (with an empty value).
436        self.snapshot_records_known = Some(None);
437        self.snapshot_records_staged = Some(None);
438        self.offset_known = Some(None);
439        self.offset_committed = Some(None);
440
441        self.envelope_state_tombstones = 0;
442    }
443
444    /// Reset counters so that we continue to ship diffs to the controller.
445    fn reset_counters(&mut self) {
446        self.messages_received = 0;
447        self.bytes_received = 0;
448        self.updates_staged = 0;
449        self.updates_committed = 0;
450    }
451
452    /// Convert this record into an `SourceStatisticsUpdate` to be merged
453    /// across workers. Gauges must be initialized.
454    fn as_update(&self) -> SourceStatisticsUpdate {
455        let SourceStatisticsRecord {
456            id,
457            worker_id: _,
458            messages_received,
459            bytes_received,
460            updates_staged,
461            updates_committed,
462            records_indexed,
463            bytes_indexed,
464            rehydration_latency_ms,
465            snapshot_records_known,
466            snapshot_records_staged,
467            snapshot_committed,
468            offset_known,
469            offset_committed,
470            envelope_state_tombstones: _,
471        } = self.clone();
472
473        SourceStatisticsUpdate {
474            id,
475            messages_received: messages_received.into(),
476            bytes_received: bytes_received.into(),
477            updates_staged: updates_staged.into(),
478            updates_committed: updates_committed.into(),
479            records_indexed: Gauge::gauge(records_indexed.unwrap()),
480            bytes_indexed: Gauge::gauge(bytes_indexed.unwrap()),
481            rehydration_latency_ms: Gauge::gauge(rehydration_latency_ms.unwrap()),
482            snapshot_records_known: Gauge::gauge(snapshot_records_known.unwrap()),
483            snapshot_records_staged: Gauge::gauge(snapshot_records_staged.unwrap()),
484            snapshot_committed: Gauge::gauge(snapshot_committed.unwrap()),
485            offset_known: Gauge::gauge(offset_known.unwrap()),
486            offset_committed: Gauge::gauge(offset_committed.unwrap()),
487        }
488    }
489}
490
491/// Statistics maintained for sinks. Gauges can be uninitialized.
492#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
493pub struct SinkStatisticsRecord {
494    id: GlobalId,
495    worker_id: usize,
496    // Counters
497    messages_staged: u64,
498    messages_committed: u64,
499    bytes_staged: u64,
500    bytes_committed: u64,
501}
502
503impl SinkStatisticsRecord {
504    fn reset_gauges(&self) {}
505
506    /// Reset counters so that we continue to ship diffs to the controller.
507    fn reset_counters(&mut self) {
508        self.messages_staged = 0;
509        self.messages_committed = 0;
510        self.bytes_staged = 0;
511        self.bytes_committed = 0;
512    }
513
514    /// Convert this record into an `SinkStatisticsUpdate` to be merged
515    /// across workers. Gauges must be initialized.
516    fn as_update(&self) -> SinkStatisticsUpdate {
517        let SinkStatisticsRecord {
518            id,
519            worker_id: _,
520            messages_staged,
521            messages_committed,
522            bytes_staged,
523            bytes_committed,
524        } = self.clone();
525
526        SinkStatisticsUpdate {
527            id,
528            messages_staged: messages_staged.into(),
529            messages_committed: messages_committed.into(),
530            bytes_staged: bytes_staged.into(),
531            bytes_committed: bytes_committed.into(),
532        }
533    }
534}
535
536/// Statistics maintained for sources.
537pub type SourceStatistics =
538    StorageStatistics<SourceStatisticsRecord, SourceStatisticsMetrics, SourceStatisticsMetadata>;
539
540/// Statistics maintained for sinks.
541pub type SinkStatistics = StorageStatistics<SinkStatisticsRecord, SinkStatisticsMetrics, ()>;
542
543impl SourceStatistics {
544    pub(crate) fn new(
545        id: GlobalId,
546        worker_id: usize,
547        metrics: &SourceStatisticsMetricDefs,
548        parent_source_id: GlobalId,
549        shard_id: &mz_persist_client::ShardId,
550        envelope: SourceEnvelope,
551        resume_upper: Antichain<Timestamp>,
552    ) -> Self {
553        Self {
554            stats: Rc::new(RefCell::new(StatsInner {
555                stats: SourceStatisticsRecord {
556                    id,
557                    worker_id,
558                    messages_received: 0,
559                    updates_staged: 0,
560                    updates_committed: 0,
561                    bytes_received: 0,
562                    records_indexed: Some(0),
563                    bytes_indexed: Some(0),
564                    rehydration_latency_ms: None,
565                    snapshot_records_staged: Some(None),
566                    snapshot_records_known: Some(None),
567                    snapshot_committed: None,
568                    offset_known: Some(None),
569                    offset_committed: Some(None),
570                    envelope_state_tombstones: 0,
571                },
572                prom: SourceStatisticsMetrics::new(
573                    metrics,
574                    id,
575                    worker_id,
576                    parent_source_id,
577                    shard_id,
578                    envelope,
579                ),
580            })),
581            meta: SourceStatisticsMetadata::new(resume_upper),
582        }
583    }
584
585    /// Reset gauges to uninitialized state.
586    /// This does not reset prometheus gauges, which will be overwritten with new values.
587    pub fn reset_gauges(&self) {
588        let mut cur = self.stats.borrow_mut();
589        cur.stats.reset_gauges();
590    }
591
592    /// Get a snapshot of the data, returning `None` if all gauges are not initialized.
593    ///
594    /// This also resets counters, so that we continue to move diffs around.
595    pub fn snapshot(&self) -> Option<SourceStatisticsRecord> {
596        let mut cur = self.stats.borrow_mut();
597
598        match &cur.stats {
599            SourceStatisticsRecord {
600                records_indexed: Some(_),
601                bytes_indexed: Some(_),
602                rehydration_latency_ms: Some(_),
603                snapshot_records_known: Some(_),
604                snapshot_records_staged: Some(_),
605                snapshot_committed: Some(_),
606                offset_known: Some(_),
607                offset_committed: Some(_),
608                ..
609            } => {
610                let ret = Some(cur.stats.clone());
611                cur.stats.reset_counters();
612                ret
613            }
614            _ => None,
615        }
616    }
617
618    /// Set the `snapshot_committed` stat based on the reported upper, and
619    /// mark the stats as initialized.
620    ///
621    /// - In sql, we ensure that we _never_ reset `snapshot_committed` to `false`, but gauges and
622    /// counters are ordinarily reset to 0 in Prometheus, so on restarts this value may be inconsistent.
623    // TODO(guswynn): Actually test that this initialization logic works.
624    pub fn initialize_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
625        self.update_snapshot_committed(upper);
626    }
627
628    /// Set the `snapshot_committed` stat based on the reported upper.
629    pub fn update_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
630        let value = *upper != Antichain::from_elem(Timestamp::MIN);
631        let mut cur = self.stats.borrow_mut();
632        cur.stats.snapshot_committed = Some(value);
633        cur.prom.snapshot_committed.set(if value { 1 } else { 0 });
634    }
635
636    /// Increment the `messages_received` stat.
637    pub fn inc_messages_received_by(&self, value: u64) {
638        let mut cur = self.stats.borrow_mut();
639        cur.stats.messages_received = cur.stats.messages_received + value;
640        cur.prom.messages_received.inc_by(value);
641    }
642
643    /// Increment the `updates` stat.
644    pub fn inc_updates_staged_by(&self, value: u64) {
645        let mut cur = self.stats.borrow_mut();
646        cur.stats.updates_staged = cur.stats.updates_staged + value;
647        cur.prom.updates_staged.inc_by(value);
648    }
649
650    /// Increment the `messages_committed` stat.
651    pub fn inc_updates_committed_by(&self, value: u64) {
652        let mut cur = self.stats.borrow_mut();
653        cur.stats.updates_committed = cur.stats.updates_committed + value;
654        cur.prom.updates_committed.inc_by(value);
655    }
656
657    /// Increment the `bytes_received` stat.
658    pub fn inc_bytes_received_by(&self, value: u64) {
659        let mut cur = self.stats.borrow_mut();
660        cur.stats.bytes_received = cur.stats.bytes_received + value;
661        cur.prom.bytes_received.inc_by(value);
662    }
663
664    /// Update the `bytes_indexed` stat.
665    /// A positive value will add and a negative value will subtract.
666    pub fn update_bytes_indexed_by(&self, value: i64) {
667        let mut cur = self.stats.borrow_mut();
668        if let Some(updated) = cur
669            .stats
670            .bytes_indexed
671            .unwrap_or(0)
672            .checked_add_signed(value)
673        {
674            cur.stats.bytes_indexed = Some(updated);
675            cur.prom.bytes_indexed.set(updated);
676        } else {
677            let bytes_indexed = cur.stats.bytes_indexed.unwrap_or(0);
678            tracing::warn!(
679                "Unexpected u64 overflow while updating bytes_indexed value {} with {}",
680                bytes_indexed,
681                value
682            );
683            cur.stats.bytes_indexed = Some(0);
684            cur.prom.bytes_indexed.set(0);
685        }
686    }
687
688    /// Set the `bytes_indexed` to the given value
689    pub fn set_bytes_indexed(&self, value: i64) {
690        let mut cur = self.stats.borrow_mut();
691        let value = if value < 0 {
692            tracing::warn!("Unexpected negative value for bytes_indexed {}", value);
693            0
694        } else {
695            value.unsigned_abs()
696        };
697        cur.stats.bytes_indexed = Some(value);
698        cur.prom.bytes_indexed.set(value);
699    }
700
701    /// Update the `records_indexed` stat.
702    /// A positive value will add and a negative value will subtract.
703    pub fn update_records_indexed_by(&self, value: i64) {
704        let mut cur = self.stats.borrow_mut();
705        if let Some(updated) = cur
706            .stats
707            .records_indexed
708            .unwrap_or(0)
709            .checked_add_signed(value)
710        {
711            cur.stats.records_indexed = Some(updated);
712            cur.prom.records_indexed.set(updated);
713        } else {
714            let records_indexed = cur.stats.records_indexed.unwrap_or(0);
715            tracing::warn!(
716                "Unexpected u64 overflow while updating records_indexed value {} with {}",
717                records_indexed,
718                value
719            );
720            cur.stats.records_indexed = Some(0);
721            cur.prom.records_indexed.set(0);
722        }
723    }
724
725    /// Set the `records_indexed` to the given value
726    pub fn set_records_indexed(&self, value: i64) {
727        let mut cur = self.stats.borrow_mut();
728        let value = if value < 0 {
729            tracing::warn!("Unexpected negative value for records_indexed {}", value);
730            0
731        } else {
732            value.unsigned_abs()
733        };
734        cur.stats.records_indexed = Some(value);
735        cur.prom.records_indexed.set(value);
736    }
737
738    /// Initialize the `rehydration_latency_ms` stat as `NULL`.
739    pub fn initialize_rehydration_latency_ms(&self) {
740        let mut cur = self.stats.borrow_mut();
741        cur.stats.rehydration_latency_ms = Some(None);
742    }
743
744    /// Update the `envelope_state_tombstones` stat.
745    /// A positive value will add and a negative value will subtract.
746    // TODO(guswynn): consider exposing this to users
747    pub fn update_envelope_state_tombstones_by(&self, value: i64) {
748        let mut cur = self.stats.borrow_mut();
749        if let Some(updated) = cur
750            .stats
751            .envelope_state_tombstones
752            .checked_add_signed(value)
753        {
754            cur.stats.envelope_state_tombstones = updated;
755            cur.prom.envelope_state_tombstones.set(updated);
756        } else {
757            let envelope_state_tombstones = cur.stats.envelope_state_tombstones;
758            tracing::warn!(
759                "Unexpected u64 overflow while updating envelope_state_tombstones value {} with {}",
760                envelope_state_tombstones,
761                value
762            );
763            cur.stats.envelope_state_tombstones = 0;
764            cur.prom.envelope_state_tombstones.set(0);
765        }
766    }
767
768    /// Set the `rehydration_latency_ms` stat based on the reported upper.
769    pub fn update_rehydration_latency_ms(&self, upper: &Antichain<Timestamp>) {
770        let mut cur = self.stats.borrow_mut();
771
772        if matches!(cur.stats.rehydration_latency_ms, Some(Some(_))) {
773            return; // source was already hydrated before
774        }
775        if !PartialOrder::less_than(&self.meta.resume_upper, upper) {
776            return; // source is not yet hydrated
777        }
778
779        let elapsed = self.meta.created_at.elapsed();
780        let value = elapsed
781            .as_millis()
782            .try_into()
783            .expect("Rehydration took more than ~584 million years!");
784        cur.stats.rehydration_latency_ms = Some(Some(value));
785        cur.prom.rehydration_latency_ms.set(value);
786    }
787
788    /// Set the `offset_known` stat to the given value.
789    pub fn set_offset_known(&self, value: u64) {
790        let mut cur = self.stats.borrow_mut();
791        cur.stats.offset_known = Some(Some(value));
792        cur.prom.offset_known.set(value);
793    }
794
795    /// Set the `offset_committed` stat to the given value.
796    pub fn set_offset_committed(&self, value: u64) {
797        let mut cur = self.stats.borrow_mut();
798        cur.stats.offset_committed = Some(Some(value));
799        cur.prom.offset_committed.set(value);
800    }
801
802    /// Set the `snapshot_records_known` stat to the given value.
803    pub fn set_snapshot_records_known(&self, value: u64) {
804        let mut cur = self.stats.borrow_mut();
805        cur.stats.snapshot_records_known = Some(Some(value));
806        cur.prom.snapshot_records_known.set(value);
807    }
808
809    /// Set the `snapshot_records_known` stat to the given value.
810    pub fn set_snapshot_records_staged(&self, value: u64) {
811        let mut cur = self.stats.borrow_mut();
812        cur.stats.snapshot_records_staged = Some(Some(value));
813        cur.prom.snapshot_records_staged.set(value);
814    }
815}
816
817impl SinkStatistics {
818    pub(crate) fn new(id: GlobalId, worker_id: usize, metrics: &SinkStatisticsMetricDefs) -> Self {
819        Self {
820            stats: Rc::new(RefCell::new(StatsInner {
821                stats: SinkStatisticsRecord {
822                    id,
823                    worker_id,
824                    messages_staged: 0,
825                    messages_committed: 0,
826                    bytes_staged: 0,
827                    bytes_committed: 0,
828                },
829                prom: SinkStatisticsMetrics::new(metrics, id, worker_id),
830            })),
831            meta: (),
832        }
833    }
834
835    /// Reset gauges to uninitialized state.
836    /// This does not reset prometheus gauges, which will be overwritten with new values.
837    pub fn reset_gauges(&self) {
838        let cur = self.stats.borrow_mut();
839        cur.stats.reset_gauges();
840    }
841
842    /// Get a snapshot of the data, returning `None` if all gauges are not initialized.
843    ///
844    /// This also resets counters, so that we continue to move diffs around.
845    pub fn snapshot(&self) -> Option<SinkStatisticsRecord> {
846        let mut cur = self.stats.borrow_mut();
847
848        match &cur.stats {
849            SinkStatisticsRecord { .. } => {
850                let ret = Some(cur.stats.clone());
851                cur.stats.reset_counters();
852                ret
853            }
854        }
855    }
856
857    /// Increment the `messages_staged` stat.
858    pub fn inc_messages_staged_by(&self, value: u64) {
859        let mut cur = self.stats.borrow_mut();
860        cur.stats.messages_staged = cur.stats.messages_staged + value;
861        cur.prom.messages_staged.inc_by(value);
862    }
863
864    /// Increment the `bytes_received` stat.
865    pub fn inc_bytes_staged_by(&self, value: u64) {
866        let mut cur = self.stats.borrow_mut();
867        cur.stats.bytes_staged = cur.stats.bytes_staged + value;
868        cur.prom.bytes_staged.inc_by(value);
869    }
870
871    /// Increment the `messages_committed` stat.
872    pub fn inc_messages_committed_by(&self, value: u64) {
873        let mut cur = self.stats.borrow_mut();
874        cur.stats.messages_committed = cur.stats.messages_committed + value;
875        cur.prom.messages_committed.inc_by(value);
876    }
877
878    /// Increment the `bytes_committed` stat.
879    pub fn inc_bytes_committed_by(&self, value: u64) {
880        let mut cur = self.stats.borrow_mut();
881        cur.stats.bytes_committed = cur.stats.bytes_committed + value;
882        cur.prom.bytes_committed.inc_by(value);
883    }
884}
885
886/// A structure that keeps track of _local_ statistics, as well as aggregating
887/// statistics into a single worker (currently worker 0).
888///
889/// This is because we ONLY want to emit statistics for _gauge-style-statistics_ when
890/// ALL workers have caught up to the _currently running instance of a source/sink_ and have
891/// reported an accurate gauge. This prevents issues like 1 worker reporting
892/// `snapshot_committed=true` before the others workers have caught up to report that they haven't
893/// finished snapshotting.
894///
895/// The API flow is:
896/// - Initialize sources/sinks with `initialize_source` or `initialize_sink`. This advances the
897/// local epoch.
898/// - Advance the _global epoch_ with `advance_global_epoch`. This should always be called strictly
899/// before reinitializing objects.
900/// - Emit a snapshot of local data with `emit_local`, which can be broadcasted to other workers.
901/// - Ingest data from other workers with `ingest`.
902/// - Emit a `snapshot` of _global data_ (i.e. skipping objects that don't have all workers caught
903/// up) with `snapshot`.
904///
905/// All functions can and should be called from ALL workers.
906///
907/// Note that we hand-roll a statistics epoch here, but in the future, when the storage
908/// _controller_ has top-level support for _dataflow epochs_, this logic can be entirely moved
909/// there.
910pub struct AggregatedStatistics {
911    worker_id: usize,
912    worker_count: usize,
913    local_source_statistics: BTreeMap<GlobalId, (usize, GlobalId, SourceStatistics)>,
914    local_sink_statistics: BTreeMap<GlobalId, (usize, SinkStatistics)>,
915
916    global_source_statistics:
917        BTreeMap<GlobalId, (usize, GlobalId, Vec<Option<SourceStatisticsUpdate>>)>,
918    global_sink_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SinkStatisticsUpdate>>)>,
919}
920
921impl AggregatedStatistics {
922    /// Initialize a new `AggregatedStatistics`.
923    pub fn new(worker_id: usize, worker_count: usize) -> Self {
924        AggregatedStatistics {
925            worker_id,
926            worker_count,
927            local_source_statistics: Default::default(),
928            local_sink_statistics: Default::default(),
929            global_source_statistics: Default::default(),
930            global_sink_statistics: Default::default(),
931        }
932    }
933
934    /// Get a collection of `SourceStatistics` for the ingestion `ingestion_id`.
935    pub fn get_ingestion_stats(
936        &self,
937        ingestion_id: &GlobalId,
938    ) -> BTreeMap<GlobalId, SourceStatistics> {
939        let mut ingestion_stats = BTreeMap::new();
940        for (id, (_epoch, ingestion_id2, stats)) in self.local_source_statistics.iter() {
941            if ingestion_id == ingestion_id2 {
942                ingestion_stats.insert(id.clone(), stats.clone());
943            }
944        }
945        ingestion_stats
946    }
947
948    /// Get a `SourceStatistics` for an id, if it exists.
949    pub fn get_source(&self, id: &GlobalId) -> Option<&SourceStatistics> {
950        self.local_source_statistics.get(id).map(|(_, _, s)| s)
951    }
952
953    /// Get a `SinkStatistics` for an id, if it exists.
954    pub fn get_sink(&self, id: &GlobalId) -> Option<&SinkStatistics> {
955        self.local_sink_statistics.get(id).map(|(_, s)| s)
956    }
957
958    /// Deinitialize an object. Other methods other than `initialize_source` and `initialize_sink`
959    /// will never overwrite this.
960    pub fn deinitialize(&mut self, id: GlobalId) {
961        self.local_source_statistics.remove(&id);
962        self.local_sink_statistics.remove(&id);
963        self.global_source_statistics.remove(&id);
964        self.global_sink_statistics.remove(&id);
965    }
966
967    /// Advance the _global epoch_ for statistics.
968    ///
969    /// Gauge values from previous epochs will be ignored. Counter values from previous epochs will
970    /// still be applied as usual.
971    pub fn advance_global_epoch(&mut self, id: GlobalId) {
972        if let Some((epoch, _ingestion_id, stats)) = self.global_source_statistics.get_mut(&id) {
973            *epoch += 1;
974            for worker_stats in stats {
975                if let Some(update) = worker_stats {
976                    update.reset_gauges();
977                }
978            }
979        }
980        if let Some((epoch, stats)) = self.global_sink_statistics.get_mut(&id) {
981            *epoch += 1;
982            for worker_stats in stats {
983                if let Some(update) = worker_stats {
984                    update.reset_gauges();
985                }
986            }
987        }
988    }
989
990    /// Re-initialize a source. If it already exists, then its local epoch is advanced.
991    pub fn initialize_source<F: FnOnce() -> SourceStatistics>(
992        &mut self,
993        id: GlobalId,
994        ingestion_id: GlobalId,
995        resume_upper: Antichain<Timestamp>,
996        stats: F,
997    ) {
998        self.local_source_statistics
999            .entry(id)
1000            .and_modify(|(epoch, ingestion_id2, stats)| {
1001                assert_eq!(ingestion_id, *ingestion_id2);
1002                *epoch += 1;
1003                stats.reset_gauges();
1004                stats.meta = SourceStatisticsMetadata::new(resume_upper);
1005            })
1006            .or_insert_with(|| (0, ingestion_id, stats()));
1007
1008        if self.worker_id == 0 {
1009            self.global_source_statistics
1010                .entry(id)
1011                .or_insert_with(|| (0, ingestion_id, vec![None; self.worker_count]));
1012        }
1013    }
1014
1015    /// Re-initialize a sink. If it already exists, then its local epoch is advanced.
1016    pub fn initialize_sink<F: FnOnce() -> SinkStatistics>(&mut self, id: GlobalId, stats: F) {
1017        self.local_sink_statistics
1018            .entry(id)
1019            .and_modify(|(epoch, stats)| {
1020                *epoch += 1;
1021                stats.reset_gauges();
1022            })
1023            .or_insert_with(|| (0, stats()));
1024        if self.worker_id == 0 {
1025            self.global_sink_statistics
1026                .entry(id)
1027                .or_insert_with(|| (0, vec![None; self.worker_count]));
1028        }
1029    }
1030
1031    /// Ingest data from other workers.
1032    pub fn ingest(
1033        &mut self,
1034        source_statistics: Vec<(usize, SourceStatisticsRecord)>,
1035        sink_statistics: Vec<(usize, SinkStatisticsRecord)>,
1036    ) {
1037        // Currently, only worker 0 ingest data from other workers.
1038        if self.worker_id != 0 {
1039            return;
1040        }
1041
1042        for (epoch, stat) in source_statistics {
1043            if let Some((global_epoch, _, stats)) = self.global_source_statistics.get_mut(&stat.id)
1044            {
1045                // The record might be from a previous incarnation of the source. If that's the
1046                // case, we only incorporate its counter values and ignore its gauge values.
1047                let epoch_match = epoch >= *global_epoch;
1048                let mut update = stat.as_update();
1049                match (&mut stats[stat.worker_id], epoch_match) {
1050                    (None, true) => stats[stat.worker_id] = Some(update),
1051                    (None, false) => {
1052                        update.reset_gauges();
1053                        stats[stat.worker_id] = Some(update);
1054                    }
1055                    (Some(occupied), true) => occupied.incorporate(update),
1056                    (Some(occupied), false) => occupied.incorporate_counters(update),
1057                }
1058            }
1059        }
1060
1061        for (epoch, stat) in sink_statistics {
1062            if let Some((global_epoch, stats)) = self.global_sink_statistics.get_mut(&stat.id) {
1063                // The record might be from a previous incarnation of the sink. If that's the
1064                // case, we only incorporate its counter values and ignore its gauge values.
1065                let epoch_match = epoch >= *global_epoch;
1066                let update = stat.as_update();
1067                match (&mut stats[stat.worker_id], epoch_match) {
1068                    (None, true) => stats[stat.worker_id] = Some(update),
1069                    (None, false) => {
1070                        update.reset_gauges();
1071                        stats[stat.worker_id] = Some(update);
1072                    }
1073                    (Some(occupied), true) => occupied.incorporate(update),
1074                    (Some(occupied), false) => occupied.incorporate_counters(update),
1075                }
1076            }
1077        }
1078    }
1079
1080    fn _emit_local(
1081        &mut self,
1082    ) -> (
1083        Vec<(usize, SourceStatisticsRecord)>,
1084        Vec<(usize, SinkStatisticsRecord)>,
1085    ) {
1086        let sources = self
1087            .local_source_statistics
1088            .values_mut()
1089            .flat_map(|(epoch, _, s)| s.snapshot().map(|v| (*epoch, v)))
1090            .collect();
1091
1092        let sinks = self
1093            .local_sink_statistics
1094            .values_mut()
1095            .flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
1096            .collect();
1097
1098        (sources, sinks)
1099    }
1100
1101    /// Emit a snapshot of this workers local data.
1102    pub fn emit_local(
1103        &mut self,
1104    ) -> (
1105        Vec<(usize, SourceStatisticsRecord)>,
1106        Vec<(usize, SinkStatisticsRecord)>,
1107    ) {
1108        // As an optimization, worker 0 does not broadcast it data. It ingests
1109        // it in `snapshot`.
1110        if self.worker_id == 0 {
1111            return (Vec::new(), Vec::new());
1112        }
1113
1114        self._emit_local()
1115    }
1116
1117    /// Emit a _global_ snapshot of data. This does not include objects whose workers have not
1118    /// initialized gauges for the current epoch.
1119    pub fn snapshot(&mut self) -> (Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>) {
1120        if !self.worker_id == 0 {
1121            return (Vec::new(), Vec::new());
1122        }
1123
1124        let (sources, sinks) = self._emit_local();
1125        self.ingest(sources, sinks);
1126
1127        let sources = self
1128            .global_source_statistics
1129            .iter_mut()
1130            .filter_map(|(_, (_, _, s))| {
1131                if s.iter().all(|s| s.is_some()) {
1132                    let ret = Some(SourceStatisticsUpdate::summarize(|| {
1133                        s.iter().filter_map(Option::as_ref)
1134                    }));
1135
1136                    // Reset the counters so we only report diffs.
1137                    s.iter_mut().for_each(|s| {
1138                        if let Some(s) = s {
1139                            s.reset_counters();
1140                        }
1141                    });
1142                    ret
1143                } else {
1144                    None
1145                }
1146            })
1147            .collect();
1148
1149        let sinks = self
1150            .global_sink_statistics
1151            .iter_mut()
1152            .filter_map(|(_, (_, s))| {
1153                if s.iter().all(|s| s.is_some()) {
1154                    let ret = Some(SinkStatisticsUpdate::summarize(|| {
1155                        s.iter().filter_map(Option::as_ref)
1156                    }));
1157
1158                    // Reset the counters so we only report diffs.
1159                    s.iter_mut().for_each(|s| {
1160                        if let Some(s) = s {
1161                            s.reset_counters();
1162                        }
1163                    });
1164                    ret
1165                } else {
1166                    None
1167                }
1168            })
1169            .collect();
1170
1171        (sources, sinks)
1172    }
1173}