mz_storage_client/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! These structure represents a full set up updates for the `mz_source_statistics_raw`
11//! and `mz_sink_statistics_raw` tables for a specific source-worker/sink-worker pair.
12//! They are structured like this for simplicity
13//! and efficiency: Each storage worker can individually collect and consolidate metrics,
14//! then control how much `StorageResponse` traffic is produced when sending updates
15//! back to the controller to be written.
16//!
17//! The proto conversions for this types are in the `client` module, for now.
18
19use std::sync::LazyLock;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::time::Instant;
22
23use mz_controller_types::ReplicaId;
24use serde::{Deserialize, Serialize};
25
26use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
27use mz_repr::{GlobalId, RelationDesc, Row, ScalarType};
28
29include!(concat!(env!("OUT_DIR"), "/mz_storage_client.statistics.rs"));
30
31pub static MZ_SOURCE_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
32    RelationDesc::builder()
33        // Id of the source (or subsource).
34        .with_column("id", ScalarType::String.nullable(false))
35        // Id of the replica producing these statistics.
36        .with_column("replica_id", ScalarType::String.nullable(true))
37        //
38        // Counters
39        //
40        // A counter of the messages we have read from upstream for this source.
41        // Never resets.
42        .with_column("messages_received", ScalarType::UInt64.nullable(false))
43        // A counter of the bytes we have read from upstream for this source.
44        // Never resets.
45        .with_column("bytes_received", ScalarType::UInt64.nullable(false))
46        // A counter of the updates we have staged to commit for this source.
47        // Never resets.
48        .with_column("updates_staged", ScalarType::UInt64.nullable(false))
49        // A counter of the updates we have committed for this source.
50        // Never resets.
51        .with_column("updates_committed", ScalarType::UInt64.nullable(false))
52        //
53        // Resetting gauges
54        //
55        // A gauge of the number of records in the envelope state. 0 for sources
56        // Resetted when the source is restarted, for any reason.
57        .with_column("records_indexed", ScalarType::UInt64.nullable(false))
58        // A gauge of the number of bytes in the envelope state. 0 for sources
59        // Resetted when the source is restarted, for any reason.
60        .with_column("bytes_indexed", ScalarType::UInt64.nullable(false))
61        // A gauge that shows the duration of rehydration. `NULL` before rehydration
62        // is done.
63        // Resetted when the source is restarted, for any reason.
64        .with_column("rehydration_latency", ScalarType::Interval.nullable(true))
65        // A gauge of the number of _values_ (source defined unit) the _snapshot_ of this source
66        // contains.
67        // Sometimes resetted when the source can snapshot new pieces of upstream (like Postgres and
68        // MySql).
69        // (like pg and mysql) may repopulate this column when tables are added.
70        //
71        // `NULL` while we discover the snapshot size.
72        .with_column("snapshot_records_known", ScalarType::UInt64.nullable(true))
73        // A gauge of the number of _values_ (source defined unit) we have read of the _snapshot_
74        // of this source.
75        // Sometimes resetted when the source can snapshot new pieces of upstream (like Postgres and
76        // MySql).
77        //
78        // `NULL` while we discover the snapshot size.
79        .with_column("snapshot_records_staged", ScalarType::UInt64.nullable(true))
80        //
81        // Non-resetting gauges
82        //
83        // Whether or not the snapshot for the source has been committed. Never resets.
84        .with_column("snapshot_committed", ScalarType::Bool.nullable(false))
85        // The following are not yet reported by sources and have 0 or `NULL` values.
86        // They have been added here to reduce churn changing the schema of this collection.
87        //
88        // These are left nullable for now in case we want semantics for `NULL` values. We
89        // currently never expose null values.
90        //
91        // A gauge of the number of _values_ (source defined unit) available to be read from upstream.
92        // Never resets. Not to be confused with any of the counters above.
93        .with_column("offset_known", ScalarType::UInt64.nullable(true))
94        // A gauge of the number of _values_ (source defined unit) we have committed.
95        // Never resets. Not to be confused with any of the counters above.
96        .with_column("offset_committed", ScalarType::UInt64.nullable(true))
97        .finish()
98});
99
100pub static MZ_SINK_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
101    RelationDesc::builder()
102        // Id of the sink.
103        .with_column("id", ScalarType::String.nullable(false))
104        // Id of the replica producing these statistics.
105        .with_column("replica_id", ScalarType::String.nullable(true))
106        //
107        // Counters
108        //
109        // A counter of the messages we have staged to upstream.
110        // Never resets.
111        .with_column("messages_staged", ScalarType::UInt64.nullable(false))
112        // A counter of the messages we have committed.
113        // Never resets.
114        .with_column("messages_committed", ScalarType::UInt64.nullable(false))
115        // A counter of the bytes we have staged to upstream.
116        // Never resets.
117        .with_column("bytes_staged", ScalarType::UInt64.nullable(false))
118        // A counter of the bytes we have committed.
119        // Never resets.
120        .with_column("bytes_committed", ScalarType::UInt64.nullable(false))
121        .finish()
122});
123
124// Types of statistics (counter and various types of gauges), that have different semantics
125// when sources/sinks are reset.
126
127/// A trait that defines the semantics storage statistics are able to have
128pub trait StorageMetric {
129    /// Summarizes a set of measurements into a single representative value.
130    /// Typically this function is used to summarize the measurements collected from each worker.
131    fn summarize<'a, I>(values: I) -> Self
132    where
133        I: IntoIterator<Item = &'a Self>,
134        Self: Sized + 'a;
135
136    /// Incorporate this value with another.
137    fn incorporate(&mut self, other: Self, field_name: &'static str);
138}
139
140/// A counter that never resets.
141#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
142pub struct Counter(u64);
143
144impl StorageMetric for Counter {
145    fn summarize<'a, I>(values: I) -> Self
146    where
147        I: IntoIterator<Item = &'a Self>,
148        Self: Sized + 'a,
149    {
150        // Sum across workers.
151        Self(values.into_iter().map(|c| c.0).sum())
152    }
153
154    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
155        // Always add the new value to the existing one.
156        self.0 += other.0
157    }
158}
159
160impl From<u64> for Counter {
161    fn from(f: u64) -> Self {
162        Counter(f)
163    }
164}
165
166/// A latency gauge that is reset on every restart.
167#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
168pub struct ResettingLatency(Option<i64>);
169
170impl From<Option<i64>> for ResettingLatency {
171    fn from(f: Option<i64>) -> Self {
172        ResettingLatency(f)
173    }
174}
175
176impl StorageMetric for ResettingLatency {
177    fn summarize<'a, I>(values: I) -> Self
178    where
179        I: IntoIterator<Item = &'a Self>,
180        Self: Sized + 'a,
181    {
182        let mut max = 0;
183        for value in values {
184            match value.0 {
185                // If any of the values are `NULL`, then we don't yet know the max.
186                None => return Self(None),
187                // Pick the worst latency across workers.
188                Some(value) => max = std::cmp::max(max, value),
189            }
190        }
191
192        Self(Some(max))
193    }
194
195    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
196        // Reset to the new value.
197        self.0 = other.0;
198    }
199}
200
201impl ResettingLatency {
202    fn reset(&mut self) {
203        self.0 = None;
204    }
205}
206
207/// A numerical gauge that is always resets, but can start out as `NULL`.
208#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
209pub struct ResettingNullableTotal(Option<u64>);
210
211impl StorageMetric for ResettingNullableTotal {
212    fn summarize<'a, I>(values: I) -> Self
213    where
214        I: IntoIterator<Item = &'a Self>,
215        Self: Sized + 'a,
216    {
217        let mut sum = 0;
218        for value in values {
219            match value.0 {
220                // If any of the values are `NULL`, then we merge to `NULL`
221                None => return Self(None),
222                // Pick the worst latency across workers.
223                Some(value) => sum += value,
224            }
225        }
226
227        Self(Some(sum))
228    }
229
230    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
231        match (&mut self.0, other.0) {
232            (None, other) => {
233                self.0 = other;
234            }
235            // Override the total.
236            (Some(this), Some(other)) => *this = other,
237            (Some(_), None) => {
238                // `NULL`'s don't reset the value.
239            }
240        }
241    }
242}
243
244impl From<Option<u64>> for ResettingNullableTotal {
245    fn from(f: Option<u64>) -> Self {
246        ResettingNullableTotal(f)
247    }
248}
249
250impl ResettingNullableTotal {
251    fn reset(&mut self) {
252        self.0 = None;
253    }
254}
255
256/// A numerical gauge that is always resets.
257#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
258pub struct ResettingTotal(u64);
259
260impl StorageMetric for ResettingTotal {
261    fn summarize<'a, I>(values: I) -> Self
262    where
263        I: IntoIterator<Item = &'a Self>,
264        Self: Sized + 'a,
265    {
266        // Sum across workers.
267        Self(values.into_iter().map(|c| c.0).sum())
268    }
269
270    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
271        // Reset the pre-existing value.
272        self.0 = other.0;
273    }
274}
275
276impl From<u64> for ResettingTotal {
277    fn from(f: u64) -> Self {
278        ResettingTotal(f)
279    }
280}
281
282impl ResettingTotal {
283    fn reset(&mut self) {
284        self.0 = 0;
285    }
286}
287
288/// A boolean gauge that is never reset.
289#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
290pub struct Boolean(bool);
291
292impl StorageMetric for Boolean {
293    fn summarize<'a, I>(values: I) -> Self
294    where
295        I: IntoIterator<Item = &'a Self>,
296        Self: Sized + 'a,
297    {
298        // All workers must be true for this gauge to be true.
299        Self(values.into_iter().fold(true, |base, new| base & new.0))
300    }
301
302    fn incorporate(&mut self, other: Self, field_name: &'static str) {
303        // A boolean regressing to `false` is a bug.
304        //
305        // Clippy's suggestion here is not good!
306        #[allow(clippy::bool_comparison)]
307        if other.0 < self.0 {
308            tracing::error!(
309                "boolean gauge for field {field_name} erroneously regressed from true to false",
310            );
311            return;
312        }
313        self.0 = other.0;
314    }
315}
316
317impl From<bool> for Boolean {
318    fn from(f: bool) -> Self {
319        Boolean(f)
320    }
321}
322
323/// A numerical gauge that never regresses.
324#[derive(Debug, Serialize, Deserialize, Default)]
325pub struct Total {
326    /// Defaults to 0. Can be skipped on updates from clusterd.
327    total: Option<u64>,
328    /// If provided, it is bumped on regressions, as opposed to `error!`
329    /// logs.
330    #[serde(skip)]
331    regressions:
332        Option<mz_ore::metrics::DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>>>,
333}
334
335impl From<Option<u64>> for Total {
336    fn from(f: Option<u64>) -> Self {
337        Total {
338            total: f,
339            regressions: None,
340        }
341    }
342}
343
344impl Clone for Total {
345    fn clone(&self) -> Self {
346        Self {
347            total: self.total,
348            regressions: None,
349        }
350    }
351}
352
353impl PartialEq for Total {
354    fn eq(&self, other: &Self) -> bool {
355        self.total == other.total
356    }
357}
358
359impl Total {
360    /// Pack this `Total` into a `u64`, defaulting to 0.
361    fn pack(&self) -> u64 {
362        self.total.unwrap_or_default()
363    }
364}
365
366impl StorageMetric for Total {
367    fn summarize<'a, I>(values: I) -> Self
368    where
369        I: IntoIterator<Item = &'a Self>,
370        Self: Sized + 'a,
371    {
372        // Sum across workers, if all workers have participated
373        // a non-`None` value.
374        let mut any_none = false;
375
376        let inner = values
377            .into_iter()
378            .filter_map(|i| {
379                any_none |= i.total.is_none();
380                i.total.as_ref()
381            })
382            .sum();
383
384        // If any are none, we can't aggregate.
385        // self.regressions is only meaningful in incorporation.
386        Self {
387            total: (!any_none).then_some(inner),
388            regressions: None,
389        }
390    }
391
392    fn incorporate(&mut self, other: Self, field_name: &'static str) {
393        match (&mut self.total, other.total) {
394            (_, None) => {}
395            (None, Some(other)) => self.total = Some(other),
396            (Some(this), Some(other)) => {
397                if other < *this {
398                    if let Some(metric) = &self.regressions {
399                        metric.inc()
400                    } else {
401                        tracing::error!(
402                            "total gauge {field_name} erroneously regressed from {} to {}",
403                            this,
404                            other
405                        );
406                    }
407                    return;
408                }
409                *this = other
410            }
411        }
412    }
413}
414
415/// A gauge that has semantics based on the `StorageMetric` implementation of its inner.
416#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
417pub struct Gauge<T>(T);
418
419impl<T> Gauge<T> {
420    // This can't be a `From` impl cause of coherence issues :(
421    pub fn gauge<F>(f: F) -> Self
422    where
423        T: From<F>,
424    {
425        Gauge(f.into())
426    }
427}
428
429impl<T: StorageMetric> StorageMetric for Gauge<T> {
430    fn summarize<'a, I>(values: I) -> Self
431    where
432        I: IntoIterator<Item = &'a Self>,
433        Self: Sized + 'a,
434    {
435        Gauge(T::summarize(values.into_iter().map(|i| &i.0)))
436    }
437
438    fn incorporate(&mut self, other: Self, field_name: &'static str) {
439        self.0.incorporate(other.0, field_name)
440    }
441}
442
443/// A trait that abstracts over user-facing statistics objects that can be
444/// packed into a `Row` and unpacked again. That is they round-trip.
445pub trait PackableStats {
446    /// Pack `self` into the `Row`.
447    fn pack(&self, packer: mz_repr::RowPacker<'_>);
448    /// Unpack a `Row` back into a `Self`.
449    fn unpack(
450        row: Row,
451        metrics: &crate::metrics::StorageControllerMetrics,
452    ) -> (GlobalId, Option<ReplicaId>, Self);
453}
454
455/// Stats that can get out-of-date and should then get reaped.
456pub trait ExpirableStats {
457    /// The last time this bag of stats was updated (via an update coming in
458    /// from a replica, presumably).
459    fn last_updated(&self) -> Instant;
460}
461
462/// Stats that should be initialized with a row of zeroes, that is we want the
463/// first data to show up in the builtin stats collection to be zeroes.
464///
465/// We do the tracking within the stat object itself instead of adding external
466/// tracking. That's a bit of a roundabout way of doing it, but it works.
467///
468/// This is an odd requirement, and you would think we should just report what
469/// is there when we get it, but this was consciously changed a while back and
470/// the current Console incarnation depends on this behavior.
471pub trait ZeroInitializedStats {
472    /// `True` if we have have not emitted a "zero update" for this stats blob.
473    fn needs_zero_initialization(&self) -> bool;
474
475    /// Marks that we have emitted a "zero update" for this stats blob.
476    fn mark_zero_initialized(&mut self);
477
478    /// Returns a "all zero" instance of this stat, with collection id and/or
479    /// replica id retained.
480    fn zero_stat(&self) -> Self;
481}
482
483/// An update as reported from a storage instance. The semantics
484/// of each field are documented above in `MZ_SOURCE_STATISTICS_RAW_DESC`,
485/// and encoded in the field types.
486#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
487pub struct SourceStatisticsUpdate {
488    pub id: GlobalId,
489
490    pub messages_received: Counter,
491    pub bytes_received: Counter,
492    pub updates_staged: Counter,
493    pub updates_committed: Counter,
494
495    pub records_indexed: Gauge<ResettingTotal>,
496    pub bytes_indexed: Gauge<ResettingTotal>,
497    pub rehydration_latency_ms: Gauge<ResettingLatency>,
498    pub snapshot_records_known: Gauge<ResettingNullableTotal>,
499    pub snapshot_records_staged: Gauge<ResettingNullableTotal>,
500
501    pub snapshot_committed: Gauge<Boolean>,
502    // `offset_known` is enriched with a counter in `unpack` and `with_metrics` that is
503    // bumped whenever it regresses. This is distinct from `offset_committed`, which
504    // `error!` logs.
505    //
506    // `offset_committed` is entirely in our control: it is calculated from source frontiers
507    // that are guaranteed to never go backwards. Therefore, it regresses is a bug in how we
508    // calculate it.
509    //
510    // `offset_known` is calculated based on information the upstream service of the source gives
511    // us. This is meaningfully less reliable, and can cause regressions in the value. Some known
512    // cases that cause this are:
513    // - A Kafka topic being deleted and recreated.
514    // - A Postgres source being restored to a backup.
515    //
516    // We attempt to communicate both of these to the user using the source status system tables.
517    // While emitting a regressed `offset_known` can be at least partially avoided in the source
518    // implementation, we avoid noisy sentry alerts by instead bumping a counter that can be used
519    // if a scenario requires more investigation.
520    pub offset_known: Gauge<Total>,
521    pub offset_committed: Gauge<Total>,
522}
523
524impl SourceStatisticsUpdate {
525    pub fn new(id: GlobalId) -> Self {
526        Self {
527            id,
528            messages_received: Default::default(),
529            bytes_received: Default::default(),
530            updates_staged: Default::default(),
531            updates_committed: Default::default(),
532            records_indexed: Default::default(),
533            bytes_indexed: Default::default(),
534            rehydration_latency_ms: Default::default(),
535            snapshot_records_known: Default::default(),
536            snapshot_records_staged: Default::default(),
537            snapshot_committed: Default::default(),
538            offset_known: Default::default(),
539            offset_committed: Default::default(),
540        }
541    }
542
543    pub fn summarize<'a, I, F>(values: F) -> Self
544    where
545        I: IntoIterator<Item = &'a Self>,
546        F: Fn() -> I,
547        Self: 'a,
548    {
549        SourceStatisticsUpdate {
550            id: values().into_iter().next().unwrap().id,
551            messages_received: Counter::summarize(
552                values().into_iter().map(|s| &s.messages_received),
553            ),
554            bytes_received: Counter::summarize(values().into_iter().map(|s| &s.bytes_received)),
555            updates_staged: Counter::summarize(values().into_iter().map(|s| &s.updates_staged)),
556            updates_committed: Counter::summarize(
557                values().into_iter().map(|s| &s.updates_committed),
558            ),
559            records_indexed: Gauge::summarize(values().into_iter().map(|s| &s.records_indexed)),
560            bytes_indexed: Gauge::summarize(values().into_iter().map(|s| &s.bytes_indexed)),
561            rehydration_latency_ms: Gauge::summarize(
562                values().into_iter().map(|s| &s.rehydration_latency_ms),
563            ),
564            snapshot_records_known: Gauge::summarize(
565                values().into_iter().map(|s| &s.snapshot_records_known),
566            ),
567            snapshot_records_staged: Gauge::summarize(
568                values().into_iter().map(|s| &s.snapshot_records_staged),
569            ),
570            snapshot_committed: Gauge::summarize(
571                values().into_iter().map(|s| &s.snapshot_committed),
572            ),
573            offset_known: Gauge::summarize(values().into_iter().map(|s| &s.offset_known)),
574            offset_committed: Gauge::summarize(values().into_iter().map(|s| &s.offset_committed)),
575        }
576    }
577
578    /// Reset counters so that we continue to ship diffs to the controller.
579    pub fn reset_counters(&mut self) {
580        self.messages_received.0 = 0;
581        self.bytes_received.0 = 0;
582        self.updates_staged.0 = 0;
583        self.updates_committed.0 = 0;
584    }
585
586    /// Reset all _resetable_ gauges to their default values.
587    pub fn reset_gauges(&mut self) {
588        self.records_indexed.0.reset();
589        self.bytes_indexed.0.reset();
590        self.rehydration_latency_ms.0.reset();
591        self.snapshot_records_known.0.reset();
592        self.snapshot_records_staged.0.reset();
593    }
594
595    pub fn incorporate(&mut self, other: SourceStatisticsUpdate) {
596        let SourceStatisticsUpdate {
597            messages_received,
598            bytes_received,
599            updates_staged,
600            updates_committed,
601            records_indexed,
602            bytes_indexed,
603            rehydration_latency_ms,
604            snapshot_records_known,
605            snapshot_records_staged,
606            snapshot_committed,
607            offset_known,
608            offset_committed,
609            ..
610        } = self;
611
612        messages_received.incorporate(other.messages_received, "messages_received");
613        bytes_received.incorporate(other.bytes_received, "bytes_received");
614        updates_staged.incorporate(other.updates_staged, "updates_staged");
615        updates_committed.incorporate(other.updates_committed, "updates_committed");
616        records_indexed.incorporate(other.records_indexed, "records_indexed");
617        bytes_indexed.incorporate(other.bytes_indexed, "bytes_indexed");
618        rehydration_latency_ms.incorporate(other.rehydration_latency_ms, "rehydration_latency_ms");
619        snapshot_records_known.incorporate(other.snapshot_records_known, "snapshot_records_known");
620        snapshot_records_staged
621            .incorporate(other.snapshot_records_staged, "snapshot_records_staged");
622        snapshot_committed.incorporate(other.snapshot_committed, "snapshot_committed");
623        offset_known.incorporate(other.offset_known, "offset_known");
624        offset_committed.incorporate(other.offset_committed, "offset_committed");
625    }
626
627    /// Incorporate only the counters of the given update, ignoring gauge values.
628    pub fn incorporate_counters(&mut self, other: SourceStatisticsUpdate) {
629        let SourceStatisticsUpdate {
630            messages_received,
631            bytes_received,
632            updates_staged,
633            updates_committed,
634            ..
635        } = self;
636
637        messages_received.incorporate(other.messages_received, "messages_received");
638        bytes_received.incorporate(other.bytes_received, "bytes_received");
639        updates_staged.incorporate(other.updates_staged, "updates_staged");
640        updates_committed.incorporate(other.updates_committed, "updates_committed");
641    }
642
643    /// Enrich statistics that use prometheus metrics.
644    pub fn with_metrics(mut self, metrics: &crate::metrics::StorageControllerMetrics) -> Self {
645        self.offset_known.0.regressions = Some(metrics.regressed_offset_known(self.id));
646        self
647    }
648}
649
650impl PackableStats for SourceStatisticsUpdate {
651    fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
652        use mz_repr::Datum;
653        // id
654        packer.push(Datum::from(self.id.to_string().as_str()));
655        // Counters.
656        packer.push(Datum::from(self.messages_received.0));
657        packer.push(Datum::from(self.bytes_received.0));
658        packer.push(Datum::from(self.updates_staged.0));
659        packer.push(Datum::from(self.updates_committed.0));
660        // Resetting gauges.
661        packer.push(Datum::from(self.records_indexed.0.0));
662        packer.push(Datum::from(self.bytes_indexed.0.0));
663        let rehydration_latency = self
664            .rehydration_latency_ms
665            .0
666            .0
667            .map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000));
668        packer.push(Datum::from(rehydration_latency));
669        packer.push(Datum::from(self.snapshot_records_known.0.0));
670        packer.push(Datum::from(self.snapshot_records_staged.0.0));
671        // Gauges
672        packer.push(Datum::from(self.snapshot_committed.0.0));
673        packer.push(Datum::from(self.offset_known.0.pack()));
674        packer.push(Datum::from(self.offset_committed.0.pack()));
675    }
676
677    fn unpack(
678        row: Row,
679        metrics: &crate::metrics::StorageControllerMetrics,
680    ) -> (GlobalId, Option<ReplicaId>, Self) {
681        let mut iter = row.iter();
682        let mut s = Self {
683            id: iter.next().unwrap().unwrap_str().parse().unwrap(),
684
685            messages_received: iter.next().unwrap().unwrap_uint64().into(),
686            bytes_received: iter.next().unwrap().unwrap_uint64().into(),
687            updates_staged: iter.next().unwrap().unwrap_uint64().into(),
688            updates_committed: iter.next().unwrap().unwrap_uint64().into(),
689
690            records_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
691            bytes_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
692            rehydration_latency_ms: Gauge::gauge(
693                <Option<mz_repr::adt::interval::Interval>>::try_from(iter.next().unwrap())
694                    .unwrap()
695                    .map(|int| int.micros / 1000),
696            ),
697            snapshot_records_known: Gauge::gauge(
698                <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
699            ),
700            snapshot_records_staged: Gauge::gauge(
701                <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
702            ),
703
704            snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()),
705            offset_known: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
706            offset_committed: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
707        };
708
709        s.offset_known.0.regressions = Some(metrics.regressed_offset_known(s.id));
710        (s.id, None, s)
711    }
712}
713
714impl RustType<ProtoSourceStatisticsUpdate> for SourceStatisticsUpdate {
715    fn into_proto(&self) -> ProtoSourceStatisticsUpdate {
716        ProtoSourceStatisticsUpdate {
717            id: Some(self.id.into_proto()),
718
719            messages_received: self.messages_received.0,
720            bytes_received: self.bytes_received.0,
721            updates_staged: self.updates_staged.0,
722            updates_committed: self.updates_committed.0,
723
724            records_indexed: self.records_indexed.0.0,
725            bytes_indexed: self.bytes_indexed.0.0,
726            rehydration_latency_ms: self.rehydration_latency_ms.0.0,
727            snapshot_records_known: self.snapshot_records_known.0.0,
728            snapshot_records_staged: self.snapshot_records_staged.0.0,
729
730            snapshot_committed: self.snapshot_committed.0.0,
731            offset_known: self.offset_known.0.total,
732            offset_committed: self.offset_committed.0.total,
733        }
734    }
735
736    fn from_proto(proto: ProtoSourceStatisticsUpdate) -> Result<Self, TryFromProtoError> {
737        Ok(SourceStatisticsUpdate {
738            id: proto
739                .id
740                .into_rust_if_some("ProtoSourceStatisticsUpdate::id")?,
741
742            messages_received: Counter(proto.messages_received),
743            bytes_received: Counter(proto.bytes_received),
744            updates_staged: Counter(proto.updates_staged),
745            updates_committed: Counter(proto.updates_committed),
746
747            records_indexed: Gauge::gauge(proto.records_indexed),
748            bytes_indexed: Gauge::gauge(proto.bytes_indexed),
749            rehydration_latency_ms: Gauge::gauge(proto.rehydration_latency_ms),
750            snapshot_records_known: Gauge::gauge(proto.snapshot_records_known),
751            snapshot_records_staged: Gauge::gauge(proto.snapshot_records_staged),
752
753            snapshot_committed: Gauge::gauge(proto.snapshot_committed),
754            offset_known: Gauge::gauge(proto.offset_known),
755            offset_committed: Gauge::gauge(proto.offset_committed),
756        })
757    }
758}
759
760/// Statistics that we keep in the controller about a given collection
761/// (identified by id) on a given replica (identified by replica_id).
762///
763/// This mirrors [SourceStatisticsUpdate] and we update ourselve by
764/// incorporating them using [ControllerSourceStatistics::incorporate]
765#[derive(Clone, Debug, PartialEq)]
766pub struct ControllerSourceStatistics {
767    pub id: GlobalId,
768
769    // This is an Option because of Webhook "sources", which don't really run on
770    // a cluster, but we decided a while ago that they are "sources", so here we
771    // are and these show up in the source statistics collection.
772    pub replica_id: Option<ReplicaId>,
773
774    pub messages_received: Counter,
775    pub bytes_received: Counter,
776    pub updates_staged: Counter,
777    pub updates_committed: Counter,
778
779    pub records_indexed: Gauge<ResettingTotal>,
780    pub bytes_indexed: Gauge<ResettingTotal>,
781    pub rehydration_latency_ms: Gauge<ResettingLatency>,
782    pub snapshot_records_known: Gauge<ResettingNullableTotal>,
783    pub snapshot_records_staged: Gauge<ResettingNullableTotal>,
784
785    pub snapshot_committed: Gauge<Boolean>,
786    pub offset_known: Gauge<Total>,
787    pub offset_committed: Gauge<Total>,
788
789    // We don't try to be cute with updating and removing stats when dropping
790    // replicas, which might run into problems with race conditions or when
791    // crashes occur.
792    //
793    // Instead, we mark when a statistic was last updated and periodically prune
794    // old statistics. It's fine if this is not 100% accurate or when the
795    // "counter" resets when the controller restarts. Eventually things will get
796    // dropped.
797    pub last_updated: Instant,
798
799    // For implementing `ZeroInitializedStats`.
800    needs_zero_initialization: bool,
801}
802
803impl ControllerSourceStatistics {
804    pub fn new(id: GlobalId, replica_id: Option<ReplicaId>) -> Self {
805        Self {
806            id,
807            replica_id,
808            messages_received: Default::default(),
809            bytes_received: Default::default(),
810            updates_staged: Default::default(),
811            updates_committed: Default::default(),
812            records_indexed: Default::default(),
813            bytes_indexed: Default::default(),
814            rehydration_latency_ms: Default::default(),
815            snapshot_records_known: Default::default(),
816            snapshot_records_staged: Default::default(),
817            snapshot_committed: Default::default(),
818            offset_known: Default::default(),
819            offset_committed: Default::default(),
820            last_updated: Instant::now(),
821            needs_zero_initialization: true,
822        }
823    }
824
825    /// Incorporate updates from the given [SourceStatisticsUpdate] into
826    /// ourselves
827    pub fn incorporate(&mut self, update: SourceStatisticsUpdate) {
828        let ControllerSourceStatistics {
829            id: _,
830            replica_id: _,
831            messages_received,
832            bytes_received,
833            updates_staged,
834            updates_committed,
835            records_indexed,
836            bytes_indexed,
837            rehydration_latency_ms,
838            snapshot_records_known,
839            snapshot_records_staged,
840            snapshot_committed,
841            offset_known,
842            offset_committed,
843            last_updated,
844            needs_zero_initialization: _,
845        } = self;
846
847        messages_received.incorporate(update.messages_received, "messages_received");
848        bytes_received.incorporate(update.bytes_received, "bytes_received");
849        updates_staged.incorporate(update.updates_staged, "updates_staged");
850        updates_committed.incorporate(update.updates_committed, "updates_committed");
851        records_indexed.incorporate(update.records_indexed, "records_indexed");
852        bytes_indexed.incorporate(update.bytes_indexed, "bytes_indexed");
853        rehydration_latency_ms.incorporate(update.rehydration_latency_ms, "rehydration_latency_ms");
854        snapshot_records_known.incorporate(update.snapshot_records_known, "snapshot_records_known");
855        snapshot_records_staged
856            .incorporate(update.snapshot_records_staged, "snapshot_records_staged");
857        snapshot_committed.incorporate(update.snapshot_committed, "snapshot_committed");
858        offset_known.incorporate(update.offset_known, "offset_known");
859        offset_committed.incorporate(update.offset_committed, "offset_committed");
860
861        *last_updated = Instant::now();
862    }
863}
864
865impl PackableStats for ControllerSourceStatistics {
866    fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
867        use mz_repr::Datum;
868        // id
869        packer.push(Datum::from(self.id.to_string().as_str()));
870        // replica_id
871        if let Some(replica_id) = self.replica_id {
872            packer.push(Datum::from(replica_id.to_string().as_str()));
873        } else {
874            packer.push(Datum::Null);
875        }
876        // Counters.
877        packer.push(Datum::from(self.messages_received.0));
878        packer.push(Datum::from(self.bytes_received.0));
879        packer.push(Datum::from(self.updates_staged.0));
880        packer.push(Datum::from(self.updates_committed.0));
881        // Resetting gauges.
882        packer.push(Datum::from(self.records_indexed.0.0));
883        packer.push(Datum::from(self.bytes_indexed.0.0));
884        let rehydration_latency = self
885            .rehydration_latency_ms
886            .0
887            .0
888            .map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000));
889        packer.push(Datum::from(rehydration_latency));
890        packer.push(Datum::from(self.snapshot_records_known.0.0));
891        packer.push(Datum::from(self.snapshot_records_staged.0.0));
892        // Gauges
893        packer.push(Datum::from(self.snapshot_committed.0.0));
894        packer.push(Datum::from(self.offset_known.0.pack()));
895        packer.push(Datum::from(self.offset_committed.0.pack()));
896    }
897
898    fn unpack(
899        row: Row,
900        metrics: &crate::metrics::StorageControllerMetrics,
901    ) -> (GlobalId, Option<ReplicaId>, Self) {
902        let mut iter = row.iter();
903        let id = iter.next().unwrap().unwrap_str().parse().unwrap();
904        let replica_id_or_null = iter.next().unwrap();
905
906        let replica_id = if replica_id_or_null.is_null() {
907            None
908        } else {
909            Some(
910                replica_id_or_null
911                    .unwrap_str()
912                    .parse::<ReplicaId>()
913                    .unwrap(),
914            )
915        };
916
917        let mut s = Self {
918            id,
919            replica_id,
920
921            messages_received: iter.next().unwrap().unwrap_uint64().into(),
922            bytes_received: iter.next().unwrap().unwrap_uint64().into(),
923            updates_staged: iter.next().unwrap().unwrap_uint64().into(),
924            updates_committed: iter.next().unwrap().unwrap_uint64().into(),
925
926            records_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
927            bytes_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
928            rehydration_latency_ms: Gauge::gauge(
929                <Option<mz_repr::adt::interval::Interval>>::try_from(iter.next().unwrap())
930                    .unwrap()
931                    .map(|int| int.micros / 1000),
932            ),
933            snapshot_records_known: Gauge::gauge(
934                <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
935            ),
936            snapshot_records_staged: Gauge::gauge(
937                <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
938            ),
939
940            snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()),
941            offset_known: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
942            offset_committed: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
943            last_updated: Instant::now(),
944            // This is coming from a Row, so we have definitely already written
945            // out a "zero update" at some point.
946            needs_zero_initialization: false,
947        };
948
949        s.offset_known.0.regressions = Some(metrics.regressed_offset_known(s.id));
950        (s.id, replica_id, s)
951    }
952}
953
954impl ExpirableStats for ControllerSourceStatistics {
955    fn last_updated(&self) -> Instant {
956        self.last_updated
957    }
958}
959
960impl ZeroInitializedStats for ControllerSourceStatistics {
961    fn needs_zero_initialization(&self) -> bool {
962        self.needs_zero_initialization
963    }
964
965    fn mark_zero_initialized(&mut self) {
966        self.needs_zero_initialization = false;
967    }
968
969    fn zero_stat(&self) -> Self {
970        ControllerSourceStatistics::new(self.id, self.replica_id)
971    }
972}
973
974/// Statistics that we keep in the controller about a given collection
975/// (identified by id) on a given replica (identified by replica_id).
976///
977/// This mirrors [SinkStatisticsUpdate] and we update ourselve by incorporating
978/// them using [ControllerSinkStatistics::incorporate]
979#[derive(Clone, Debug, PartialEq)]
980pub struct ControllerSinkStatistics {
981    pub id: GlobalId,
982    pub replica_id: ReplicaId,
983
984    pub messages_staged: Counter,
985    pub messages_committed: Counter,
986    pub bytes_staged: Counter,
987    pub bytes_committed: Counter,
988
989    // See comment on ControllerSourceStatistics.last_updated.
990    pub last_updated: Instant,
991
992    // For implementing `ZeroInitializedStats`.
993    needs_zero_initialization: bool,
994}
995
996impl ControllerSinkStatistics {
997    pub fn new(id: GlobalId, replica_id: ReplicaId) -> Self {
998        Self {
999            id,
1000            replica_id,
1001            messages_staged: Default::default(),
1002            messages_committed: Default::default(),
1003            bytes_staged: Default::default(),
1004            bytes_committed: Default::default(),
1005            last_updated: Instant::now(),
1006            needs_zero_initialization: true,
1007        }
1008    }
1009
1010    /// Incorporate updates from the given [SinkStatisticsUpdate] into ourselves
1011    pub fn incorporate(&mut self, update: SinkStatisticsUpdate) {
1012        let ControllerSinkStatistics {
1013            id: _,
1014            replica_id: _,
1015            messages_staged,
1016            messages_committed,
1017            bytes_staged,
1018            bytes_committed,
1019            last_updated,
1020            needs_zero_initialization: _,
1021        } = self;
1022
1023        messages_staged.incorporate(update.messages_staged, "messages_staged");
1024        bytes_staged.incorporate(update.bytes_staged, "bytes_staged");
1025        messages_committed.incorporate(update.messages_committed, "messages_committed");
1026        bytes_committed.incorporate(update.bytes_committed, "bytes_committed");
1027
1028        *last_updated = Instant::now();
1029    }
1030}
1031
1032impl PackableStats for ControllerSinkStatistics {
1033    fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
1034        use mz_repr::Datum;
1035        packer.push(Datum::from(self.id.to_string().as_str()));
1036        packer.push(Datum::from(self.replica_id.to_string().as_str()));
1037        packer.push(Datum::from(self.messages_staged.0));
1038        packer.push(Datum::from(self.messages_committed.0));
1039        packer.push(Datum::from(self.bytes_staged.0));
1040        packer.push(Datum::from(self.bytes_committed.0));
1041    }
1042
1043    fn unpack(
1044        row: Row,
1045        _metrics: &crate::metrics::StorageControllerMetrics,
1046    ) -> (GlobalId, Option<ReplicaId>, Self) {
1047        let mut iter = row.iter();
1048        let s = Self {
1049            id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1050            replica_id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1051            messages_staged: iter.next().unwrap().unwrap_uint64().into(),
1052            messages_committed: iter.next().unwrap().unwrap_uint64().into(),
1053            bytes_staged: iter.next().unwrap().unwrap_uint64().into(),
1054            bytes_committed: iter.next().unwrap().unwrap_uint64().into(),
1055            last_updated: Instant::now(),
1056            // This is coming from a Row, so we have definitely already written
1057            // out a "zero update" at some point.
1058            needs_zero_initialization: false,
1059        };
1060        (s.id, Some(s.replica_id), s)
1061    }
1062}
1063
1064impl ExpirableStats for ControllerSinkStatistics {
1065    fn last_updated(&self) -> Instant {
1066        self.last_updated
1067    }
1068}
1069
1070impl ZeroInitializedStats for ControllerSinkStatistics {
1071    fn needs_zero_initialization(&self) -> bool {
1072        self.needs_zero_initialization
1073    }
1074
1075    fn mark_zero_initialized(&mut self) {
1076        self.needs_zero_initialization = false;
1077    }
1078
1079    fn zero_stat(&self) -> Self {
1080        ControllerSinkStatistics::new(self.id, self.replica_id)
1081    }
1082}
1083
1084/// An update as reported from a storage instance. The semantics
1085/// of each field are documented above in `MZ_SINK_STATISTICS_RAW_DESC`,
1086/// and encoded in the field types.
1087#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
1088pub struct SinkStatisticsUpdate {
1089    pub id: GlobalId,
1090
1091    pub messages_staged: Counter,
1092    pub messages_committed: Counter,
1093    pub bytes_staged: Counter,
1094    pub bytes_committed: Counter,
1095}
1096
1097impl SinkStatisticsUpdate {
1098    pub fn new(id: GlobalId) -> Self {
1099        Self {
1100            id,
1101            messages_staged: Default::default(),
1102            messages_committed: Default::default(),
1103            bytes_staged: Default::default(),
1104            bytes_committed: Default::default(),
1105        }
1106    }
1107
1108    pub fn incorporate(&mut self, other: SinkStatisticsUpdate) {
1109        let SinkStatisticsUpdate {
1110            messages_staged,
1111            messages_committed,
1112            bytes_staged,
1113            bytes_committed,
1114            ..
1115        } = self;
1116
1117        messages_staged.incorporate(other.messages_staged, "messages_staged");
1118        messages_committed.incorporate(other.messages_committed, "messages_committed");
1119        bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
1120        bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
1121    }
1122
1123    /// Incorporate only the counters of the given update, ignoring gauge values.
1124    pub fn incorporate_counters(&mut self, other: SinkStatisticsUpdate) {
1125        let SinkStatisticsUpdate {
1126            messages_staged,
1127            messages_committed,
1128            bytes_staged,
1129            bytes_committed,
1130            ..
1131        } = self;
1132
1133        messages_staged.incorporate(other.messages_staged, "messages_staged");
1134        messages_committed.incorporate(other.messages_committed, "messages_committed");
1135        bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
1136        bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
1137    }
1138
1139    pub fn summarize<'a, I, F>(values: F) -> Self
1140    where
1141        I: IntoIterator<Item = &'a Self>,
1142        F: Fn() -> I,
1143        Self: 'a,
1144    {
1145        SinkStatisticsUpdate {
1146            id: values().into_iter().next().unwrap().id,
1147            messages_staged: Counter::summarize(values().into_iter().map(|s| &s.messages_staged)),
1148            messages_committed: Counter::summarize(
1149                values().into_iter().map(|s| &s.messages_committed),
1150            ),
1151            bytes_staged: Counter::summarize(values().into_iter().map(|s| &s.bytes_staged)),
1152            bytes_committed: Counter::summarize(values().into_iter().map(|s| &s.bytes_committed)),
1153        }
1154    }
1155
1156    /// Reset counters so that we continue to ship diffs to the controller.
1157    pub fn reset_counters(&mut self) {
1158        self.messages_staged.0 = 0;
1159        self.messages_committed.0 = 0;
1160        self.bytes_staged.0 = 0;
1161        self.bytes_committed.0 = 0;
1162    }
1163
1164    /// Reset all _resetable_ gauges to their default values.
1165    pub fn reset_gauges(&self) {}
1166}
1167
1168impl PackableStats for SinkStatisticsUpdate {
1169    fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
1170        use mz_repr::Datum;
1171        packer.push(Datum::from(self.id.to_string().as_str()));
1172        packer.push(Datum::from(self.messages_staged.0));
1173        packer.push(Datum::from(self.messages_committed.0));
1174        packer.push(Datum::from(self.bytes_staged.0));
1175        packer.push(Datum::from(self.bytes_committed.0));
1176    }
1177
1178    fn unpack(
1179        row: Row,
1180        _metrics: &crate::metrics::StorageControllerMetrics,
1181    ) -> (GlobalId, Option<ReplicaId>, Self) {
1182        let mut iter = row.iter();
1183        let s = Self {
1184            // Id
1185            id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1186            // Counters
1187            messages_staged: iter.next().unwrap().unwrap_uint64().into(),
1188            messages_committed: iter.next().unwrap().unwrap_uint64().into(),
1189            bytes_staged: iter.next().unwrap().unwrap_uint64().into(),
1190            bytes_committed: iter.next().unwrap().unwrap_uint64().into(),
1191        };
1192        (s.id, None, s)
1193    }
1194}
1195
1196impl RustType<ProtoSinkStatisticsUpdate> for SinkStatisticsUpdate {
1197    fn into_proto(&self) -> ProtoSinkStatisticsUpdate {
1198        ProtoSinkStatisticsUpdate {
1199            id: Some(self.id.into_proto()),
1200
1201            messages_staged: self.messages_staged.0,
1202            messages_committed: self.messages_committed.0,
1203            bytes_staged: self.bytes_staged.0,
1204            bytes_committed: self.bytes_committed.0,
1205        }
1206    }
1207
1208    fn from_proto(proto: ProtoSinkStatisticsUpdate) -> Result<Self, TryFromProtoError> {
1209        Ok(SinkStatisticsUpdate {
1210            id: proto
1211                .id
1212                .into_rust_if_some("ProtoSinkStatisticsUpdate::id")?,
1213
1214            messages_staged: Counter(proto.messages_staged),
1215            messages_committed: Counter(proto.messages_committed),
1216            bytes_staged: Counter(proto.bytes_staged),
1217            bytes_committed: Counter(proto.bytes_committed),
1218        })
1219    }
1220}
1221
1222/// Statistics for webhooks.
1223#[derive(Default, Debug)]
1224pub struct WebhookStatistics {
1225    pub messages_received: AtomicU64,
1226    pub bytes_received: AtomicU64,
1227    pub updates_staged: AtomicU64,
1228    pub updates_committed: AtomicU64,
1229}
1230
1231impl WebhookStatistics {
1232    /// Drain the current statistics into a `SourceStatisticsUpdate` with
1233    /// other values defaulted, resetting the atomic counters.
1234    pub fn drain_into_update(&self, id: GlobalId) -> SourceStatisticsUpdate {
1235        SourceStatisticsUpdate {
1236            id,
1237            messages_received: self.messages_received.swap(0, Ordering::Relaxed).into(),
1238            bytes_received: self.bytes_received.swap(0, Ordering::Relaxed).into(),
1239            updates_staged: self.updates_staged.swap(0, Ordering::Relaxed).into(),
1240            updates_committed: self.updates_committed.swap(0, Ordering::Relaxed).into(),
1241            records_indexed: Gauge::gauge(0),
1242            bytes_indexed: Gauge::gauge(0),
1243            rehydration_latency_ms: Gauge::gauge(None),
1244            snapshot_records_known: Gauge::gauge(None),
1245            snapshot_records_staged: Gauge::gauge(None),
1246            snapshot_committed: Gauge::gauge(true),
1247            offset_known: Gauge::gauge(None::<u64>),
1248            offset_committed: Gauge::gauge(None::<u64>),
1249        }
1250    }
1251}