mz_storage/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Helpers for managing storage statistics.
11//!
12//!
13//! This module collects statistics related to sources and sinks. Statistics, as exposed
14//! to their respective system tables have strong semantics, defined within the
15//! `mz_storage_types::statistics` module. This module collects and aggregates metrics
16//! across workers according to those semantics.
17//!
18//! Note that it _simultaneously_ collect prometheus metrics for the given statistics. Those
19//! metrics _do not have the same strong semantics_, which is _by design_ to ensure we
20//! are able to categorically debug sources and sinks during complex failures (or bugs
21//! with statistics collection itself). Prometheus metrics are
22//! - Never dropped or reset until a source/sink is dropped.
23//! - Entirely independent across workers.
24
25use std::cell::RefCell;
26use std::collections::BTreeMap;
27use std::rc::Rc;
28use std::time::Instant;
29
30use mz_ore::metric;
31use mz_ore::metrics::{
32    DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, IntGaugeVec, MetricsRegistry,
33    UIntGaugeVec,
34};
35use mz_repr::{GlobalId, Timestamp};
36use mz_storage_client::statistics::{Gauge, SinkStatisticsUpdate, SourceStatisticsUpdate};
37use mz_storage_types::sources::SourceEnvelope;
38use prometheus::core::{AtomicI64, AtomicU64};
39use serde::{Deserialize, Serialize};
40use timely::PartialOrder;
41use timely::progress::frontier::Antichain;
42
43// Note(guswynn): ordinarily these metric structs would be in the `metrics` modules, but we
44// put them here so they can be near the user-facing definitions as well.
45
46#[derive(Clone, Debug)]
47pub(crate) struct SourceStatisticsMetricDefs {
48    // Counters
49    pub(crate) messages_received: IntCounterVec,
50    pub(crate) updates_staged: IntCounterVec,
51    pub(crate) updates_committed: IntCounterVec,
52    pub(crate) bytes_received: IntCounterVec,
53
54    // Gauges
55    pub(crate) snapshot_committed: UIntGaugeVec,
56    pub(crate) bytes_indexed: UIntGaugeVec,
57    pub(crate) records_indexed: UIntGaugeVec,
58    pub(crate) rehydration_latency_ms: IntGaugeVec,
59
60    pub(crate) offset_known: UIntGaugeVec,
61    pub(crate) offset_committed: UIntGaugeVec,
62    pub(crate) snapshot_records_known: UIntGaugeVec,
63    pub(crate) snapshot_records_staged: UIntGaugeVec,
64
65    // Just prometheus.
66    pub(crate) envelope_state_tombstones: UIntGaugeVec,
67}
68
69impl SourceStatisticsMetricDefs {
70    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
71        Self {
72            snapshot_committed: registry.register(metric!(
73                name: "mz_source_snapshot_committed",
74                help: "Whether or not the worker has committed the initial snapshot for a source.",
75                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
76            )),
77            messages_received: registry.register(metric!(
78                name: "mz_source_messages_received",
79                help: "The number of raw messages the worker has received from upstream.",
80                var_labels: ["source_id", "worker_id", "parent_source_id"],
81            )),
82            updates_staged: registry.register(metric!(
83                name: "mz_source_updates_staged",
84                help: "The number of updates (inserts + deletes) the worker has written but not yet committed to the storage layer.",
85                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
86            )),
87            updates_committed: registry.register(metric!(
88                name: "mz_source_updates_committed",
89                help: "The number of updates (inserts + deletes) the worker has committed into the storage layer.",
90                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
91            )),
92            bytes_received: registry.register(metric!(
93                name: "mz_source_bytes_received",
94                help: "The number of bytes worth of messages the worker has received from upstream. The way the bytes are counted is source-specific.",
95                var_labels: ["source_id", "worker_id", "parent_source_id"],
96            )),
97            bytes_indexed: registry.register(metric!(
98                name: "mz_source_bytes_indexed",
99                help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.",
100                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
101            )),
102            records_indexed: registry.register(metric!(
103                name: "mz_source_records_indexed",
104                help: "The number of records in the source envelope state. This will be specific to the envelope in use",
105                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
106            )),
107            envelope_state_tombstones: registry.register(metric!(
108                name: "mz_source_envelope_state_tombstones",
109                help: "The number of outstanding tombstones in the source envelope state. This will be specific to the envelope in use",
110                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"],
111            )),
112            rehydration_latency_ms: registry.register(metric!(
113                name: "mz_source_rehydration_latency_ms",
114                help: "The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. This will be specific to the envelope in use.",
115                var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id", "envelope"],
116            )),
117            offset_known: registry.register(metric!(
118                name: "mz_source_offset_known",
119                help: "The total number of _values_ (source-defined unit) present in upstream.",
120                var_labels: ["source_id", "worker_id", "shard_id"],
121            )),
122            offset_committed: registry.register(metric!(
123                name: "mz_source_offset_committed",
124                help: "The total number of _values_ (source-defined unit) we have fully processed, and storage and committed.",
125                var_labels: ["source_id", "worker_id", "shard_id"],
126            )),
127            snapshot_records_known: registry.register(metric!(
128                name: "mz_source_snapshot_records_known",
129                help: "The total number of records in the source's snapshot",
130                var_labels: ["source_id", "worker_id", "shard_id"],
131            )),
132            snapshot_records_staged: registry.register(metric!(
133                name: "mz_source_snapshot_records_staged",
134                help: "The total number of records read from the source's snapshot",
135                var_labels: ["source_id", "worker_id", "shard_id"],
136            )),
137        }
138    }
139}
140
141/// Prometheus metrics for user-facing source metrics.
142#[derive(Debug)]
143pub struct SourceStatisticsMetrics {
144    // Counters
145    pub(crate) messages_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
146    pub(crate) updates_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
147    pub(crate) updates_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
148    pub(crate) bytes_received: DeleteOnDropCounter<AtomicU64, Vec<String>>,
149
150    // Gauges
151    pub(crate) snapshot_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
152    pub(crate) bytes_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
153    pub(crate) records_indexed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
154    pub(crate) rehydration_latency_ms: DeleteOnDropGauge<AtomicI64, Vec<String>>,
155
156    pub(crate) offset_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
157    pub(crate) offset_committed: DeleteOnDropGauge<AtomicU64, Vec<String>>,
158    pub(crate) snapshot_records_known: DeleteOnDropGauge<AtomicU64, Vec<String>>,
159    pub(crate) snapshot_records_staged: DeleteOnDropGauge<AtomicU64, Vec<String>>,
160
161    // Just prometheus.
162    pub(crate) envelope_state_tombstones: DeleteOnDropGauge<AtomicU64, Vec<String>>,
163}
164
165impl SourceStatisticsMetrics {
166    pub(crate) fn new(
167        defs: &SourceStatisticsMetricDefs,
168        id: GlobalId,
169        worker_id: usize,
170        parent_source_id: GlobalId,
171        shard_id: &mz_persist_client::ShardId,
172        envelope: SourceEnvelope,
173    ) -> SourceStatisticsMetrics {
174        let shard = shard_id.to_string();
175        let envelope = match envelope {
176            SourceEnvelope::None(_) => "none",
177            SourceEnvelope::Upsert(_) => "upsert",
178            SourceEnvelope::CdcV2 => "cdcv2",
179        };
180
181        SourceStatisticsMetrics {
182            snapshot_committed: defs.snapshot_committed.get_delete_on_drop_metric(vec![
183                id.to_string(),
184                worker_id.to_string(),
185                parent_source_id.to_string(),
186                shard.clone(),
187            ]),
188            messages_received: defs.messages_received.get_delete_on_drop_metric(vec![
189                id.to_string(),
190                worker_id.to_string(),
191                parent_source_id.to_string(),
192            ]),
193            updates_staged: defs.updates_staged.get_delete_on_drop_metric(vec![
194                id.to_string(),
195                worker_id.to_string(),
196                parent_source_id.to_string(),
197                shard.clone(),
198            ]),
199            updates_committed: defs.updates_committed.get_delete_on_drop_metric(vec![
200                id.to_string(),
201                worker_id.to_string(),
202                parent_source_id.to_string(),
203                shard.clone(),
204            ]),
205            bytes_received: defs.bytes_received.get_delete_on_drop_metric(vec![
206                id.to_string(),
207                worker_id.to_string(),
208                parent_source_id.to_string(),
209            ]),
210            bytes_indexed: defs.bytes_indexed.get_delete_on_drop_metric(vec![
211                id.to_string(),
212                worker_id.to_string(),
213                parent_source_id.to_string(),
214                shard.clone(),
215            ]),
216            records_indexed: defs.records_indexed.get_delete_on_drop_metric(vec![
217                id.to_string(),
218                worker_id.to_string(),
219                parent_source_id.to_string(),
220                shard.clone(),
221            ]),
222            envelope_state_tombstones: defs.envelope_state_tombstones.get_delete_on_drop_metric(
223                vec![
224                    id.to_string(),
225                    worker_id.to_string(),
226                    parent_source_id.to_string(),
227                    shard.clone(),
228                ],
229            ),
230            rehydration_latency_ms: defs.rehydration_latency_ms.get_delete_on_drop_metric(vec![
231                id.to_string(),
232                worker_id.to_string(),
233                parent_source_id.to_string(),
234                shard.clone(),
235                envelope.to_string(),
236            ]),
237            offset_known: defs.offset_known.get_delete_on_drop_metric(vec![
238                id.to_string(),
239                worker_id.to_string(),
240                parent_source_id.to_string(),
241            ]),
242            offset_committed: defs.offset_committed.get_delete_on_drop_metric(vec![
243                id.to_string(),
244                worker_id.to_string(),
245                parent_source_id.to_string(),
246            ]),
247            snapshot_records_known: defs.snapshot_records_known.get_delete_on_drop_metric(vec![
248                id.to_string(),
249                worker_id.to_string(),
250                parent_source_id.to_string(),
251            ]),
252            snapshot_records_staged: defs.snapshot_records_staged.get_delete_on_drop_metric(vec![
253                id.to_string(),
254                worker_id.to_string(),
255                parent_source_id.to_string(),
256            ]),
257        }
258    }
259}
260
261#[derive(Clone, Debug)]
262pub(crate) struct SinkStatisticsMetricDefs {
263    // Counters
264    pub(crate) messages_staged: IntCounterVec,
265    pub(crate) messages_committed: IntCounterVec,
266    pub(crate) bytes_staged: IntCounterVec,
267    pub(crate) bytes_committed: IntCounterVec,
268}
269
270impl SinkStatisticsMetricDefs {
271    pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
272        Self {
273            messages_staged: registry.register(metric!(
274                name: "mz_sink_messages_staged",
275                help: "The number of messages staged but possibly not committed to the sink.",
276                var_labels: ["sink_id", "worker_id"],
277            )),
278            messages_committed: registry.register(metric!(
279                name: "mz_sink_messages_committed",
280                help: "The number of messages committed to the sink.",
281                var_labels: ["sink_id", "worker_id"],
282            )),
283            bytes_staged: registry.register(metric!(
284                name: "mz_sink_bytes_staged",
285                help: "The number of bytes staged but possibly not committed to the sink.",
286                var_labels: ["sink_id", "worker_id"],
287            )),
288            bytes_committed: registry.register(metric!(
289                name: "mz_sink_bytes_committed",
290                help: "The number of bytes committed to the sink.",
291                var_labels: ["sink_id", "worker_id"],
292            )),
293        }
294    }
295}
296
297/// Prometheus metrics for user-facing sink metrics.
298#[derive(Debug)]
299pub struct SinkStatisticsMetrics {
300    // Counters
301    pub(crate) messages_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
302    pub(crate) messages_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
303    pub(crate) bytes_staged: DeleteOnDropCounter<AtomicU64, Vec<String>>,
304    pub(crate) bytes_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
305}
306
307impl SinkStatisticsMetrics {
308    pub(crate) fn new(
309        defs: &SinkStatisticsMetricDefs,
310        id: GlobalId,
311        worker_id: usize,
312    ) -> SinkStatisticsMetrics {
313        SinkStatisticsMetrics {
314            messages_staged: defs
315                .messages_staged
316                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
317            messages_committed: defs
318                .messages_committed
319                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
320            bytes_staged: defs
321                .bytes_staged
322                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
323            bytes_committed: defs
324                .bytes_committed
325                .get_delete_on_drop_metric(vec![id.to_string(), worker_id.to_string()]),
326        }
327    }
328}
329
330/// Meta data needed to maintain source statistics.
331#[derive(Debug, Clone)]
332pub struct SourceStatisticsMetadata {
333    /// The resumption upper of the source.
334    resume_upper: Antichain<Timestamp>,
335    /// Time at which the source (more precisely: this metadata object) was created.
336    created_at: Instant,
337}
338
339impl SourceStatisticsMetadata {
340    /// Create a new `SourceStatisticsMetadata` object.
341    pub fn new(resume_upper: Antichain<Timestamp>) -> Self {
342        Self {
343            resume_upper,
344            created_at: Instant::now(),
345        }
346    }
347}
348
349#[derive(Debug)]
350struct StatsInner<Stats, Metrics> {
351    stats: Stats,
352    prom: Metrics,
353}
354
355/// A helper struct designed to make it easy for operators to update user-facing metrics.
356/// This struct also ensures that each stack is also incremented in prometheus.
357///
358/// Caveats:
359/// - There is one Prometheus timeseries-per-worker, and we label it with the source id and the
360/// source id of the parent source (if there is no parent, these labels have the same value).
361///     - Some metrics also have the shard id we are writing metrics for.
362/// - The prometheus metrics do not have the same timestamps as the ones exposed in sql, because
363/// they are written at different times.
364///     - This may be fixed in the future when we write the metrics from storaged directly.
365///     - The value also eventually converge to the same value.
366#[derive(Debug)]
367pub struct StorageStatistics<Stats, Metrics, Meta> {
368    // Note also that the `DeleteOnDropCounter`'s in the `SourceStatisticsMetrics`
369    // already are in an `Arc`, so this is a bit of extra wrapping, but the cost
370    // shouldn't cost too much.
371    stats: Rc<RefCell<StatsInner<Stats, Metrics>>>,
372    /// Meta data needed to maintain statistics.
373    meta: Meta,
374}
375
376impl<Stats, Metrics, Meta: Clone> Clone for StorageStatistics<Stats, Metrics, Meta> {
377    fn clone(&self) -> Self {
378        Self {
379            stats: Rc::clone(&self.stats),
380            meta: self.meta.clone(),
381        }
382    }
383}
384
385/// Statistics maintained for sources. Gauges can be uninitialized.
386#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
387pub struct SourceStatisticsRecord {
388    id: GlobalId,
389    worker_id: usize,
390    // Counters
391    messages_received: u64,
392    bytes_received: u64,
393    updates_staged: u64,
394    updates_committed: u64,
395
396    // Gauges are always wrapped in an `Option` that represents if that gauge has been
397    // initialized by that worker.
398    records_indexed: Option<u64>,
399    bytes_indexed: Option<u64>,
400
401    // This field is nullable, so its value is an `Option`.
402    rehydration_latency_ms: Option<Option<i64>>,
403
404    // The following fields are able to be unset when shipped to the controller, so their
405    // values are `Option`'s
406    snapshot_records_known: Option<Option<u64>>,
407    snapshot_records_staged: Option<Option<u64>>,
408    snapshot_committed: Option<bool>,
409    offset_known: Option<Option<u64>>,
410    offset_committed: Option<Option<u64>>,
411
412    // Just prometheus.
413    envelope_state_tombstones: u64,
414}
415
416impl SourceStatisticsRecord {
417    fn reset_gauges(&mut self) {
418        // These gauges MUST be initialized across all workers before we aggregate and
419        // report statistics.
420        self.rehydration_latency_ms = None;
421        self.snapshot_committed = None;
422
423        // We consider these gauges always initialized
424        self.bytes_indexed = Some(0);
425        self.records_indexed = Some(0);
426
427        // We don't yet populate these, so we consider the initialized (with an empty value).
428        self.snapshot_records_known = Some(None);
429        self.snapshot_records_staged = Some(None);
430        self.offset_known = Some(None);
431        self.offset_committed = Some(None);
432
433        self.envelope_state_tombstones = 0;
434    }
435
436    /// Reset counters so that we continue to ship diffs to the controller.
437    fn reset_counters(&mut self) {
438        self.messages_received = 0;
439        self.bytes_received = 0;
440        self.updates_staged = 0;
441        self.updates_committed = 0;
442    }
443
444    /// Convert this record into an `SourceStatisticsUpdate` to be merged
445    /// across workers. Gauges must be initialized.
446    fn as_update(&self) -> SourceStatisticsUpdate {
447        let SourceStatisticsRecord {
448            id,
449            worker_id: _,
450            messages_received,
451            bytes_received,
452            updates_staged,
453            updates_committed,
454            records_indexed,
455            bytes_indexed,
456            rehydration_latency_ms,
457            snapshot_records_known,
458            snapshot_records_staged,
459            snapshot_committed,
460            offset_known,
461            offset_committed,
462            envelope_state_tombstones: _,
463        } = self.clone();
464
465        SourceStatisticsUpdate {
466            id,
467            messages_received: messages_received.into(),
468            bytes_received: bytes_received.into(),
469            updates_staged: updates_staged.into(),
470            updates_committed: updates_committed.into(),
471            records_indexed: Gauge::gauge(records_indexed.unwrap()),
472            bytes_indexed: Gauge::gauge(bytes_indexed.unwrap()),
473            rehydration_latency_ms: Gauge::gauge(rehydration_latency_ms.unwrap()),
474            snapshot_records_known: Gauge::gauge(snapshot_records_known.unwrap()),
475            snapshot_records_staged: Gauge::gauge(snapshot_records_staged.unwrap()),
476            snapshot_committed: Gauge::gauge(snapshot_committed.unwrap()),
477            offset_known: Gauge::gauge(offset_known.unwrap()),
478            offset_committed: Gauge::gauge(offset_committed.unwrap()),
479        }
480    }
481}
482
483/// Statistics maintained for sinks. Gauges can be uninitialized.
484#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
485pub struct SinkStatisticsRecord {
486    id: GlobalId,
487    worker_id: usize,
488    // Counters
489    messages_staged: u64,
490    messages_committed: u64,
491    bytes_staged: u64,
492    bytes_committed: u64,
493}
494
495impl SinkStatisticsRecord {
496    fn reset_gauges(&self) {}
497
498    /// Reset counters so that we continue to ship diffs to the controller.
499    fn reset_counters(&mut self) {
500        self.messages_staged = 0;
501        self.messages_committed = 0;
502        self.bytes_staged = 0;
503        self.bytes_committed = 0;
504    }
505
506    /// Convert this record into an `SinkStatisticsUpdate` to be merged
507    /// across workers. Gauges must be initialized.
508    fn as_update(&self) -> SinkStatisticsUpdate {
509        let SinkStatisticsRecord {
510            id,
511            worker_id: _,
512            messages_staged,
513            messages_committed,
514            bytes_staged,
515            bytes_committed,
516        } = self.clone();
517
518        SinkStatisticsUpdate {
519            id,
520            messages_staged: messages_staged.into(),
521            messages_committed: messages_committed.into(),
522            bytes_staged: bytes_staged.into(),
523            bytes_committed: bytes_committed.into(),
524        }
525    }
526}
527
528/// Statistics maintained for sources.
529pub type SourceStatistics =
530    StorageStatistics<SourceStatisticsRecord, SourceStatisticsMetrics, SourceStatisticsMetadata>;
531
532/// Statistics maintained for sinks.
533pub type SinkStatistics = StorageStatistics<SinkStatisticsRecord, SinkStatisticsMetrics, ()>;
534
535impl SourceStatistics {
536    pub(crate) fn new(
537        id: GlobalId,
538        worker_id: usize,
539        metrics: &SourceStatisticsMetricDefs,
540        parent_source_id: GlobalId,
541        shard_id: &mz_persist_client::ShardId,
542        envelope: SourceEnvelope,
543        resume_upper: Antichain<Timestamp>,
544    ) -> Self {
545        Self {
546            stats: Rc::new(RefCell::new(StatsInner {
547                stats: SourceStatisticsRecord {
548                    id,
549                    worker_id,
550                    messages_received: 0,
551                    updates_staged: 0,
552                    updates_committed: 0,
553                    bytes_received: 0,
554                    records_indexed: Some(0),
555                    bytes_indexed: Some(0),
556                    rehydration_latency_ms: None,
557                    snapshot_records_staged: Some(None),
558                    snapshot_records_known: Some(None),
559                    snapshot_committed: None,
560                    offset_known: Some(None),
561                    offset_committed: Some(None),
562                    envelope_state_tombstones: 0,
563                },
564                prom: SourceStatisticsMetrics::new(
565                    metrics,
566                    id,
567                    worker_id,
568                    parent_source_id,
569                    shard_id,
570                    envelope,
571                ),
572            })),
573            meta: SourceStatisticsMetadata::new(resume_upper),
574        }
575    }
576
577    /// Reset gauges to uninitialized state.
578    /// This does not reset prometheus gauges, which will be overwritten with new values.
579    pub fn reset_gauges(&self) {
580        let mut cur = self.stats.borrow_mut();
581        cur.stats.reset_gauges();
582    }
583
584    /// Get a snapshot of the data, returning `None` if all gauges are not initialized.
585    ///
586    /// This also resets counters, so that we continue to move diffs around.
587    pub fn snapshot(&self) -> Option<SourceStatisticsRecord> {
588        let mut cur = self.stats.borrow_mut();
589
590        match &cur.stats {
591            SourceStatisticsRecord {
592                records_indexed: Some(_),
593                bytes_indexed: Some(_),
594                rehydration_latency_ms: Some(_),
595                snapshot_records_known: Some(_),
596                snapshot_records_staged: Some(_),
597                snapshot_committed: Some(_),
598                offset_known: Some(_),
599                offset_committed: Some(_),
600                ..
601            } => {
602                let ret = Some(cur.stats.clone());
603                cur.stats.reset_counters();
604                ret
605            }
606            _ => None,
607        }
608    }
609
610    /// Set the `snapshot_committed` stat based on the reported upper, and
611    /// mark the stats as initialized.
612    ///
613    /// - In sql, we ensure that we _never_ reset `snapshot_committed` to `false`, but gauges and
614    /// counters are ordinarily reset to 0 in Prometheus, so on restarts this value may be inconsistent.
615    // TODO(guswynn): Actually test that this initialization logic works.
616    pub fn initialize_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
617        self.update_snapshot_committed(upper);
618    }
619
620    /// Set the `snapshot_committed` stat based on the reported upper.
621    pub fn update_snapshot_committed(&self, upper: &Antichain<Timestamp>) {
622        let value = *upper != Antichain::from_elem(Timestamp::MIN);
623        let mut cur = self.stats.borrow_mut();
624        cur.stats.snapshot_committed = Some(value);
625        cur.prom.snapshot_committed.set(if value { 1 } else { 0 });
626    }
627
628    /// Increment the `messages_received` stat.
629    pub fn inc_messages_received_by(&self, value: u64) {
630        let mut cur = self.stats.borrow_mut();
631        cur.stats.messages_received = cur.stats.messages_received + value;
632        cur.prom.messages_received.inc_by(value);
633    }
634
635    /// Increment the `updates` stat.
636    pub fn inc_updates_staged_by(&self, value: u64) {
637        let mut cur = self.stats.borrow_mut();
638        cur.stats.updates_staged = cur.stats.updates_staged + value;
639        cur.prom.updates_staged.inc_by(value);
640    }
641
642    /// Increment the `messages_committed` stat.
643    pub fn inc_updates_committed_by(&self, value: u64) {
644        let mut cur = self.stats.borrow_mut();
645        cur.stats.updates_committed = cur.stats.updates_committed + value;
646        cur.prom.updates_committed.inc_by(value);
647    }
648
649    /// Increment the `bytes_received` stat.
650    pub fn inc_bytes_received_by(&self, value: u64) {
651        let mut cur = self.stats.borrow_mut();
652        cur.stats.bytes_received = cur.stats.bytes_received + value;
653        cur.prom.bytes_received.inc_by(value);
654    }
655
656    /// Update the `bytes_indexed` stat.
657    /// A positive value will add and a negative value will subtract.
658    pub fn update_bytes_indexed_by(&self, value: i64) {
659        let mut cur = self.stats.borrow_mut();
660        if let Some(updated) = cur
661            .stats
662            .bytes_indexed
663            .unwrap_or(0)
664            .checked_add_signed(value)
665        {
666            cur.stats.bytes_indexed = Some(updated);
667            cur.prom.bytes_indexed.set(updated);
668        } else {
669            let bytes_indexed = cur.stats.bytes_indexed.unwrap_or(0);
670            tracing::warn!(
671                "Unexpected u64 overflow while updating bytes_indexed value {} with {}",
672                bytes_indexed,
673                value
674            );
675            cur.stats.bytes_indexed = Some(0);
676            cur.prom.bytes_indexed.set(0);
677        }
678    }
679
680    /// Set the `bytes_indexed` to the given value
681    pub fn set_bytes_indexed(&self, value: i64) {
682        let mut cur = self.stats.borrow_mut();
683        let value = if value < 0 {
684            tracing::warn!("Unexpected negative value for bytes_indexed {}", value);
685            0
686        } else {
687            value.unsigned_abs()
688        };
689        cur.stats.bytes_indexed = Some(value);
690        cur.prom.bytes_indexed.set(value);
691    }
692
693    /// Update the `records_indexed` stat.
694    /// A positive value will add and a negative value will subtract.
695    pub fn update_records_indexed_by(&self, value: i64) {
696        let mut cur = self.stats.borrow_mut();
697        if let Some(updated) = cur
698            .stats
699            .records_indexed
700            .unwrap_or(0)
701            .checked_add_signed(value)
702        {
703            cur.stats.records_indexed = Some(updated);
704            cur.prom.records_indexed.set(updated);
705        } else {
706            let records_indexed = cur.stats.records_indexed.unwrap_or(0);
707            tracing::warn!(
708                "Unexpected u64 overflow while updating records_indexed value {} with {}",
709                records_indexed,
710                value
711            );
712            cur.stats.records_indexed = Some(0);
713            cur.prom.records_indexed.set(0);
714        }
715    }
716
717    /// Set the `records_indexed` to the given value
718    pub fn set_records_indexed(&self, value: i64) {
719        let mut cur = self.stats.borrow_mut();
720        let value = if value < 0 {
721            tracing::warn!("Unexpected negative value for records_indexed {}", value);
722            0
723        } else {
724            value.unsigned_abs()
725        };
726        cur.stats.records_indexed = Some(value);
727        cur.prom.records_indexed.set(value);
728    }
729
730    /// Initialize the `rehydration_latency_ms` stat as `NULL`.
731    pub fn initialize_rehydration_latency_ms(&self) {
732        let mut cur = self.stats.borrow_mut();
733        cur.stats.rehydration_latency_ms = Some(None);
734    }
735
736    /// Update the `envelope_state_tombstones` stat.
737    /// A positive value will add and a negative value will subtract.
738    // TODO(guswynn): consider exposing this to users
739    pub fn update_envelope_state_tombstones_by(&self, value: i64) {
740        let mut cur = self.stats.borrow_mut();
741        if let Some(updated) = cur
742            .stats
743            .envelope_state_tombstones
744            .checked_add_signed(value)
745        {
746            cur.stats.envelope_state_tombstones = updated;
747            cur.prom.envelope_state_tombstones.set(updated);
748        } else {
749            let envelope_state_tombstones = cur.stats.envelope_state_tombstones;
750            tracing::warn!(
751                "Unexpected u64 overflow while updating envelope_state_tombstones value {} with {}",
752                envelope_state_tombstones,
753                value
754            );
755            cur.stats.envelope_state_tombstones = 0;
756            cur.prom.envelope_state_tombstones.set(0);
757        }
758    }
759
760    /// Set the `rehydration_latency_ms` stat based on the reported upper.
761    pub fn update_rehydration_latency_ms(&self, upper: &Antichain<Timestamp>) {
762        let mut cur = self.stats.borrow_mut();
763
764        if matches!(cur.stats.rehydration_latency_ms, Some(Some(_))) {
765            return; // source was already hydrated before
766        }
767        if !PartialOrder::less_than(&self.meta.resume_upper, upper) {
768            return; // source is not yet hydrated
769        }
770
771        let elapsed = self.meta.created_at.elapsed();
772        let value = elapsed
773            .as_millis()
774            .try_into()
775            .expect("Rehydration took more than ~584 million years!");
776        cur.stats.rehydration_latency_ms = Some(Some(value));
777        cur.prom.rehydration_latency_ms.set(value);
778    }
779
780    /// Set the `offset_known` stat to the given value.
781    pub fn set_offset_known(&self, value: u64) {
782        let mut cur = self.stats.borrow_mut();
783        cur.stats.offset_known = Some(Some(value));
784        cur.prom.offset_known.set(value);
785    }
786
787    /// Set the `offset_committed` stat to the given value.
788    pub fn set_offset_committed(&self, value: u64) {
789        let mut cur = self.stats.borrow_mut();
790        cur.stats.offset_committed = Some(Some(value));
791        cur.prom.offset_committed.set(value);
792    }
793
794    /// Set the `snapshot_records_known` stat to the given value.
795    pub fn set_snapshot_records_known(&self, value: u64) {
796        let mut cur = self.stats.borrow_mut();
797        cur.stats.snapshot_records_known = Some(Some(value));
798        cur.prom.snapshot_records_known.set(value);
799    }
800
801    /// Set the `snapshot_records_known` stat to the given value.
802    pub fn set_snapshot_records_staged(&self, value: u64) {
803        let mut cur = self.stats.borrow_mut();
804        cur.stats.snapshot_records_staged = Some(Some(value));
805        cur.prom.snapshot_records_staged.set(value);
806    }
807}
808
809impl SinkStatistics {
810    pub(crate) fn new(id: GlobalId, worker_id: usize, metrics: &SinkStatisticsMetricDefs) -> Self {
811        Self {
812            stats: Rc::new(RefCell::new(StatsInner {
813                stats: SinkStatisticsRecord {
814                    id,
815                    worker_id,
816                    messages_staged: 0,
817                    messages_committed: 0,
818                    bytes_staged: 0,
819                    bytes_committed: 0,
820                },
821                prom: SinkStatisticsMetrics::new(metrics, id, worker_id),
822            })),
823            meta: (),
824        }
825    }
826
827    /// Reset gauges to uninitialized state.
828    /// This does not reset prometheus gauges, which will be overwritten with new values.
829    pub fn reset_gauges(&self) {
830        let cur = self.stats.borrow_mut();
831        cur.stats.reset_gauges();
832    }
833
834    /// Get a snapshot of the data, returning `None` if all gauges are not initialized.
835    ///
836    /// This also resets counters, so that we continue to move diffs around.
837    pub fn snapshot(&self) -> Option<SinkStatisticsRecord> {
838        let mut cur = self.stats.borrow_mut();
839
840        match &cur.stats {
841            SinkStatisticsRecord { .. } => {
842                let ret = Some(cur.stats.clone());
843                cur.stats.reset_counters();
844                ret
845            }
846        }
847    }
848
849    /// Increment the `messages_staged` stat.
850    pub fn inc_messages_staged_by(&self, value: u64) {
851        let mut cur = self.stats.borrow_mut();
852        cur.stats.messages_staged = cur.stats.messages_staged + value;
853        cur.prom.messages_staged.inc_by(value);
854    }
855
856    /// Increment the `bytes_received` stat.
857    pub fn inc_bytes_staged_by(&self, value: u64) {
858        let mut cur = self.stats.borrow_mut();
859        cur.stats.bytes_staged = cur.stats.bytes_staged + value;
860        cur.prom.bytes_staged.inc_by(value);
861    }
862
863    /// Increment the `messages_committed` stat.
864    pub fn inc_messages_committed_by(&self, value: u64) {
865        let mut cur = self.stats.borrow_mut();
866        cur.stats.messages_committed = cur.stats.messages_committed + value;
867        cur.prom.messages_committed.inc_by(value);
868    }
869
870    /// Increment the `bytes_committed` stat.
871    pub fn inc_bytes_committed_by(&self, value: u64) {
872        let mut cur = self.stats.borrow_mut();
873        cur.stats.bytes_committed = cur.stats.bytes_committed + value;
874        cur.prom.bytes_committed.inc_by(value);
875    }
876}
877
878/// A structure that keeps track of _local_ statistics, as well as aggregating
879/// statistics into a single worker (currently worker 0).
880///
881/// This is because we ONLY want to emit statistics for _gauge-style-statistics_ when
882/// ALL workers have caught up to the _currently running instance of a source/sink_ and have
883/// reported an accurate gauge. This prevents issues like 1 worker reporting
884/// `snapshot_committed=true` before the others workers have caught up to report that they haven't
885/// finished snapshotting.
886///
887/// The API flow is:
888/// - Initialize sources/sinks with `initialize_source` or `initialize_sink`. This advances the
889/// local epoch.
890/// - Advance the _global epoch_ with `advance_global_epoch`. This should always be called strictly
891/// before reinitializing objects.
892/// - Emit a snapshot of local data with `emit_local`, which can be broadcasted to other workers.
893/// - Ingest data from other workers with `ingest`.
894/// - Emit a `snapshot` of _global data_ (i.e. skipping objects that don't have all workers caught
895/// up) with `snapshot`.
896///
897/// All functions can and should be called from ALL workers.
898///
899/// Note that we hand-roll a statistics epoch here, but in the future, when the storage
900/// _controller_ has top-level support for _dataflow epochs_, this logic can be entirely moved
901/// there.
902pub struct AggregatedStatistics {
903    worker_id: usize,
904    worker_count: usize,
905    local_source_statistics: BTreeMap<GlobalId, (usize, SourceStatistics)>,
906    local_sink_statistics: BTreeMap<GlobalId, (usize, SinkStatistics)>,
907
908    global_source_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SourceStatisticsUpdate>>)>,
909    global_sink_statistics: BTreeMap<GlobalId, (usize, Vec<Option<SinkStatisticsUpdate>>)>,
910}
911
912impl AggregatedStatistics {
913    /// Initialize a new `AggregatedStatistics`.
914    pub fn new(worker_id: usize, worker_count: usize) -> Self {
915        AggregatedStatistics {
916            worker_id,
917            worker_count,
918            local_source_statistics: Default::default(),
919            local_sink_statistics: Default::default(),
920            global_source_statistics: Default::default(),
921            global_sink_statistics: Default::default(),
922        }
923    }
924
925    /// Get a `SourceStatistics` for an id, if it exists.
926    pub fn get_source(&self, id: &GlobalId) -> Option<&SourceStatistics> {
927        self.local_source_statistics.get(id).map(|(_, s)| s)
928    }
929
930    /// Get a `SinkStatistics` for an id, if it exists.
931    pub fn get_sink(&self, id: &GlobalId) -> Option<&SinkStatistics> {
932        self.local_sink_statistics.get(id).map(|(_, s)| s)
933    }
934
935    /// Deinitialize an object. Other methods other than `initialize_source` and `initialize_sink`
936    /// will never overwrite this.
937    pub fn deinitialize(&mut self, id: GlobalId) {
938        self.local_source_statistics.remove(&id);
939        self.local_sink_statistics.remove(&id);
940        self.global_source_statistics.remove(&id);
941        self.global_sink_statistics.remove(&id);
942    }
943
944    /// Advance the _global epoch_ for statistics.
945    ///
946    /// Gauge values from previous epochs will be ignored. Counter values from previous epochs will
947    /// still be applied as usual.
948    pub fn advance_global_epoch(&mut self, id: GlobalId) {
949        if let Some((epoch, stats)) = self.global_source_statistics.get_mut(&id) {
950            *epoch += 1;
951            for worker_stats in stats {
952                if let Some(update) = worker_stats {
953                    update.reset_gauges();
954                }
955            }
956        }
957        if let Some((epoch, stats)) = self.global_sink_statistics.get_mut(&id) {
958            *epoch += 1;
959            for worker_stats in stats {
960                if let Some(update) = worker_stats {
961                    update.reset_gauges();
962                }
963            }
964        }
965    }
966
967    /// Re-initialize a source. If it already exists, then its local epoch is advanced.
968    pub fn initialize_source<F: FnOnce() -> SourceStatistics>(
969        &mut self,
970        id: GlobalId,
971        resume_upper: Antichain<Timestamp>,
972        stats: F,
973    ) {
974        self.local_source_statistics
975            .entry(id)
976            .and_modify(|(epoch, stats)| {
977                *epoch += 1;
978                stats.reset_gauges();
979                stats.meta = SourceStatisticsMetadata::new(resume_upper);
980            })
981            .or_insert_with(|| (0, stats()));
982
983        if self.worker_id == 0 {
984            self.global_source_statistics
985                .entry(id)
986                .or_insert_with(|| (0, vec![None; self.worker_count]));
987        }
988    }
989
990    /// Re-initialize a sink. If it already exists, then its local epoch is advanced.
991    pub fn initialize_sink<F: FnOnce() -> SinkStatistics>(&mut self, id: GlobalId, stats: F) {
992        self.local_sink_statistics
993            .entry(id)
994            .and_modify(|(epoch, stats)| {
995                *epoch += 1;
996                stats.reset_gauges();
997            })
998            .or_insert_with(|| (0, stats()));
999        if self.worker_id == 0 {
1000            self.global_sink_statistics
1001                .entry(id)
1002                .or_insert_with(|| (0, vec![None; self.worker_count]));
1003        }
1004    }
1005
1006    /// Ingest data from other workers.
1007    pub fn ingest(
1008        &mut self,
1009        source_statistics: Vec<(usize, SourceStatisticsRecord)>,
1010        sink_statistics: Vec<(usize, SinkStatisticsRecord)>,
1011    ) {
1012        // Currently, only worker 0 ingest data from other workers.
1013        if self.worker_id != 0 {
1014            return;
1015        }
1016
1017        for (epoch, stat) in source_statistics {
1018            if let Some((global_epoch, stats)) = self.global_source_statistics.get_mut(&stat.id) {
1019                // The record might be from a previous incarnation of the source. If that's the
1020                // case, we only incorporate its counter values and ignore its gauge values.
1021                let epoch_match = epoch >= *global_epoch;
1022                let mut update = stat.as_update();
1023                match (&mut stats[stat.worker_id], epoch_match) {
1024                    (None, true) => stats[stat.worker_id] = Some(update),
1025                    (None, false) => {
1026                        update.reset_gauges();
1027                        stats[stat.worker_id] = Some(update);
1028                    }
1029                    (Some(occupied), true) => occupied.incorporate(update),
1030                    (Some(occupied), false) => occupied.incorporate_counters(update),
1031                }
1032            }
1033        }
1034
1035        for (epoch, stat) in sink_statistics {
1036            if let Some((global_epoch, stats)) = self.global_sink_statistics.get_mut(&stat.id) {
1037                // The record might be from a previous incarnation of the sink. If that's the
1038                // case, we only incorporate its counter values and ignore its gauge values.
1039                let epoch_match = epoch >= *global_epoch;
1040                let update = stat.as_update();
1041                match (&mut stats[stat.worker_id], epoch_match) {
1042                    (None, true) => stats[stat.worker_id] = Some(update),
1043                    (None, false) => {
1044                        update.reset_gauges();
1045                        stats[stat.worker_id] = Some(update);
1046                    }
1047                    (Some(occupied), true) => occupied.incorporate(update),
1048                    (Some(occupied), false) => occupied.incorporate_counters(update),
1049                }
1050            }
1051        }
1052    }
1053
1054    fn _emit_local(
1055        &mut self,
1056    ) -> (
1057        Vec<(usize, SourceStatisticsRecord)>,
1058        Vec<(usize, SinkStatisticsRecord)>,
1059    ) {
1060        let sources = self
1061            .local_source_statistics
1062            .values_mut()
1063            .flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
1064            .collect();
1065
1066        let sinks = self
1067            .local_sink_statistics
1068            .values_mut()
1069            .flat_map(|(epoch, s)| s.snapshot().map(|v| (*epoch, v)))
1070            .collect();
1071
1072        (sources, sinks)
1073    }
1074
1075    /// Emit a snapshot of this workers local data.
1076    pub fn emit_local(
1077        &mut self,
1078    ) -> (
1079        Vec<(usize, SourceStatisticsRecord)>,
1080        Vec<(usize, SinkStatisticsRecord)>,
1081    ) {
1082        // As an optimization, worker 0 does not broadcast it data. It ingests
1083        // it in `snapshot`.
1084        if self.worker_id == 0 {
1085            return (Vec::new(), Vec::new());
1086        }
1087
1088        self._emit_local()
1089    }
1090
1091    /// Emit a _global_ snapshot of data. This does not include objects whose workers have not
1092    /// initialized gauges for the current epoch.
1093    pub fn snapshot(&mut self) -> (Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>) {
1094        if !self.worker_id == 0 {
1095            return (Vec::new(), Vec::new());
1096        }
1097
1098        let (sources, sinks) = self._emit_local();
1099        self.ingest(sources, sinks);
1100
1101        let sources = self
1102            .global_source_statistics
1103            .iter_mut()
1104            .filter_map(|(_, (_, s))| {
1105                if s.iter().all(|s| s.is_some()) {
1106                    let ret = Some(SourceStatisticsUpdate::summarize(|| {
1107                        s.iter().filter_map(Option::as_ref)
1108                    }));
1109
1110                    // Reset the counters so we only report diffs.
1111                    s.iter_mut().for_each(|s| {
1112                        if let Some(s) = s {
1113                            s.reset_counters();
1114                        }
1115                    });
1116                    ret
1117                } else {
1118                    None
1119                }
1120            })
1121            .collect();
1122
1123        let sinks = self
1124            .global_sink_statistics
1125            .iter_mut()
1126            .filter_map(|(_, (_, s))| {
1127                if s.iter().all(|s| s.is_some()) {
1128                    let ret = Some(SinkStatisticsUpdate::summarize(|| {
1129                        s.iter().filter_map(Option::as_ref)
1130                    }));
1131
1132                    // Reset the counters so we only report diffs.
1133                    s.iter_mut().for_each(|s| {
1134                        if let Some(s) = s {
1135                            s.reset_counters();
1136                        }
1137                    });
1138                    ret
1139                } else {
1140                    None
1141                }
1142            })
1143            .collect();
1144
1145        (sources, sinks)
1146    }
1147}