mz_storage_client/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! These structure represents a full set up updates for the `mz_source_statistics_raw`
11//! and `mz_sink_statistics_raw` tables for a specific source-worker/sink-worker pair.
12//! They are structured like this for simplicity
13//! and efficiency: Each storage worker can individually collect and consolidate metrics,
14//! then control how much `StorageResponse` traffic is produced when sending updates
15//! back to the controller to be written.
16//!
17//! The proto conversions for this types are in the `client` module, for now.
18
19use std::sync::LazyLock;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::time::Instant;
22
23use mz_controller_types::ReplicaId;
24use serde::{Deserialize, Serialize};
25
26use mz_repr::{GlobalId, RelationDesc, Row, SqlScalarType};
27
28pub static MZ_SOURCE_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
29    RelationDesc::builder()
30        // Id of the source (or subsource).
31        .with_column("id", SqlScalarType::String.nullable(false))
32        // Id of the replica producing these statistics.
33        .with_column("replica_id", SqlScalarType::String.nullable(true))
34        //
35        // Counters
36        //
37        // A counter of the messages we have read from upstream for this source.
38        // Never resets.
39        .with_column("messages_received", SqlScalarType::UInt64.nullable(false))
40        // A counter of the bytes we have read from upstream for this source.
41        // Never resets.
42        .with_column("bytes_received", SqlScalarType::UInt64.nullable(false))
43        // A counter of the updates we have staged to commit for this source.
44        // Never resets.
45        .with_column("updates_staged", SqlScalarType::UInt64.nullable(false))
46        // A counter of the updates we have committed for this source.
47        // Never resets.
48        .with_column("updates_committed", SqlScalarType::UInt64.nullable(false))
49        //
50        // Resetting gauges
51        //
52        // A gauge of the number of records in the envelope state. 0 for sources
53        // Resetted when the source is restarted, for any reason.
54        .with_column("records_indexed", SqlScalarType::UInt64.nullable(false))
55        // A gauge of the number of bytes in the envelope state. 0 for sources
56        // Resetted when the source is restarted, for any reason.
57        .with_column("bytes_indexed", SqlScalarType::UInt64.nullable(false))
58        // A gauge that shows the duration of rehydration. `NULL` before rehydration
59        // is done.
60        // Resetted when the source is restarted, for any reason.
61        .with_column(
62            "rehydration_latency",
63            SqlScalarType::Interval.nullable(true),
64        )
65        // A gauge of the number of _values_ (source defined unit) the _snapshot_ of this source
66        // contains.
67        // Sometimes resetted when the source can snapshot new pieces of upstream (like Postgres and
68        // MySql).
69        // (like pg and mysql) may repopulate this column when tables are added.
70        //
71        // `NULL` while we discover the snapshot size.
72        .with_column(
73            "snapshot_records_known",
74            SqlScalarType::UInt64.nullable(true),
75        )
76        // A gauge of the number of _values_ (source defined unit) we have read of the _snapshot_
77        // of this source.
78        // Sometimes resetted when the source can snapshot new pieces of upstream (like Postgres and
79        // MySql).
80        //
81        // `NULL` while we discover the snapshot size.
82        .with_column(
83            "snapshot_records_staged",
84            SqlScalarType::UInt64.nullable(true),
85        )
86        //
87        // Non-resetting gauges
88        //
89        // Whether or not the snapshot for the source has been committed. Never resets.
90        .with_column("snapshot_committed", SqlScalarType::Bool.nullable(false))
91        // The following are not yet reported by sources and have 0 or `NULL` values.
92        // They have been added here to reduce churn changing the schema of this collection.
93        //
94        // These are left nullable for now in case we want semantics for `NULL` values. We
95        // currently never expose null values.
96        //
97        // A gauge of the number of _values_ (source defined unit) available to be read from upstream.
98        // Never resets. Not to be confused with any of the counters above.
99        .with_column("offset_known", SqlScalarType::UInt64.nullable(true))
100        // A gauge of the number of _values_ (source defined unit) we have committed.
101        // Never resets. Not to be confused with any of the counters above.
102        .with_column("offset_committed", SqlScalarType::UInt64.nullable(true))
103        .finish()
104});
105
106pub static MZ_SINK_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
107    RelationDesc::builder()
108        // Id of the sink.
109        .with_column("id", SqlScalarType::String.nullable(false))
110        // Id of the replica producing these statistics.
111        .with_column("replica_id", SqlScalarType::String.nullable(true))
112        //
113        // Counters
114        //
115        // A counter of the messages we have staged to upstream.
116        // Never resets.
117        .with_column("messages_staged", SqlScalarType::UInt64.nullable(false))
118        // A counter of the messages we have committed.
119        // Never resets.
120        .with_column("messages_committed", SqlScalarType::UInt64.nullable(false))
121        // A counter of the bytes we have staged to upstream.
122        // Never resets.
123        .with_column("bytes_staged", SqlScalarType::UInt64.nullable(false))
124        // A counter of the bytes we have committed.
125        // Never resets.
126        .with_column("bytes_committed", SqlScalarType::UInt64.nullable(false))
127        .finish()
128});
129
130// Types of statistics (counter and various types of gauges), that have different semantics
131// when sources/sinks are reset.
132
133/// A trait that defines the semantics storage statistics are able to have
134pub trait StorageMetric {
135    /// Summarizes a set of measurements into a single representative value.
136    /// Typically this function is used to summarize the measurements collected from each worker.
137    fn summarize<'a, I>(values: I) -> Self
138    where
139        I: IntoIterator<Item = &'a Self>,
140        Self: Sized + 'a;
141
142    /// Incorporate this value with another.
143    fn incorporate(&mut self, other: Self, field_name: &'static str);
144}
145
146/// A counter that never resets.
147#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
148pub struct Counter(u64);
149
150impl StorageMetric for Counter {
151    fn summarize<'a, I>(values: I) -> Self
152    where
153        I: IntoIterator<Item = &'a Self>,
154        Self: Sized + 'a,
155    {
156        // Sum across workers.
157        Self(values.into_iter().map(|c| c.0).sum())
158    }
159
160    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
161        // Always add the new value to the existing one.
162        self.0 += other.0
163    }
164}
165
166impl From<u64> for Counter {
167    fn from(f: u64) -> Self {
168        Counter(f)
169    }
170}
171
172/// A latency gauge that is reset on every restart.
173#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
174pub struct ResettingLatency(Option<i64>);
175
176impl From<Option<i64>> for ResettingLatency {
177    fn from(f: Option<i64>) -> Self {
178        ResettingLatency(f)
179    }
180}
181
182impl StorageMetric for ResettingLatency {
183    fn summarize<'a, I>(values: I) -> Self
184    where
185        I: IntoIterator<Item = &'a Self>,
186        Self: Sized + 'a,
187    {
188        let mut max = 0;
189        for value in values {
190            match value.0 {
191                // If any of the values are `NULL`, then we don't yet know the max.
192                None => return Self(None),
193                // Pick the worst latency across workers.
194                Some(value) => max = std::cmp::max(max, value),
195            }
196        }
197
198        Self(Some(max))
199    }
200
201    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
202        // Reset to the new value.
203        self.0 = other.0;
204    }
205}
206
207impl ResettingLatency {
208    fn reset(&mut self) {
209        self.0 = None;
210    }
211}
212
213/// A numerical gauge that is always resets, but can start out as `NULL`.
214#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
215pub struct ResettingNullableTotal(Option<u64>);
216
217impl StorageMetric for ResettingNullableTotal {
218    fn summarize<'a, I>(values: I) -> Self
219    where
220        I: IntoIterator<Item = &'a Self>,
221        Self: Sized + 'a,
222    {
223        let mut sum = 0;
224        for value in values {
225            match value.0 {
226                // If any of the values are `NULL`, then we merge to `NULL`
227                None => return Self(None),
228                // Pick the worst latency across workers.
229                Some(value) => sum += value,
230            }
231        }
232
233        Self(Some(sum))
234    }
235
236    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
237        match (&mut self.0, other.0) {
238            (None, other) => {
239                self.0 = other;
240            }
241            // Override the total.
242            (Some(this), Some(other)) => *this = other,
243            (Some(_), None) => {
244                // `NULL`'s don't reset the value.
245            }
246        }
247    }
248}
249
250impl From<Option<u64>> for ResettingNullableTotal {
251    fn from(f: Option<u64>) -> Self {
252        ResettingNullableTotal(f)
253    }
254}
255
256impl ResettingNullableTotal {
257    fn reset(&mut self) {
258        self.0 = None;
259    }
260}
261
262/// A numerical gauge that is always resets.
263#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
264pub struct ResettingTotal(u64);
265
266impl StorageMetric for ResettingTotal {
267    fn summarize<'a, I>(values: I) -> Self
268    where
269        I: IntoIterator<Item = &'a Self>,
270        Self: Sized + 'a,
271    {
272        // Sum across workers.
273        Self(values.into_iter().map(|c| c.0).sum())
274    }
275
276    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
277        // Reset the pre-existing value.
278        self.0 = other.0;
279    }
280}
281
282impl From<u64> for ResettingTotal {
283    fn from(f: u64) -> Self {
284        ResettingTotal(f)
285    }
286}
287
288impl ResettingTotal {
289    fn reset(&mut self) {
290        self.0 = 0;
291    }
292}
293
294/// A boolean gauge that is never reset.
295#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
296pub struct Boolean(bool);
297
298impl StorageMetric for Boolean {
299    fn summarize<'a, I>(values: I) -> Self
300    where
301        I: IntoIterator<Item = &'a Self>,
302        Self: Sized + 'a,
303    {
304        // All workers must be true for this gauge to be true.
305        Self(values.into_iter().fold(true, |base, new| base & new.0))
306    }
307
308    fn incorporate(&mut self, other: Self, field_name: &'static str) {
309        // A boolean regressing to `false` is a bug.
310        //
311        // Clippy's suggestion here is not good!
312        #[allow(clippy::bool_comparison)]
313        if other.0 < self.0 {
314            tracing::error!(
315                "boolean gauge for field {field_name} erroneously regressed from true to false",
316            );
317            return;
318        }
319        self.0 = other.0;
320    }
321}
322
323impl From<bool> for Boolean {
324    fn from(f: bool) -> Self {
325        Boolean(f)
326    }
327}
328
329/// A numerical gauge that never regresses.
330#[derive(Debug, Serialize, Deserialize, Default)]
331pub struct Total {
332    /// Defaults to 0. Can be skipped on updates from clusterd.
333    total: Option<u64>,
334    /// If provided, it is bumped on regressions, as opposed to `error!`
335    /// logs.
336    #[serde(skip)]
337    regressions:
338        Option<mz_ore::metrics::DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>>>,
339}
340
341impl From<Option<u64>> for Total {
342    fn from(f: Option<u64>) -> Self {
343        Total {
344            total: f,
345            regressions: None,
346        }
347    }
348}
349
350impl Clone for Total {
351    fn clone(&self) -> Self {
352        Self {
353            total: self.total,
354            regressions: None,
355        }
356    }
357}
358
359impl PartialEq for Total {
360    fn eq(&self, other: &Self) -> bool {
361        self.total == other.total
362    }
363}
364
365impl Total {
366    /// Pack this `Total` into a `u64`, defaulting to 0.
367    fn pack(&self) -> u64 {
368        self.total.unwrap_or_default()
369    }
370}
371
372impl StorageMetric for Total {
373    fn summarize<'a, I>(values: I) -> Self
374    where
375        I: IntoIterator<Item = &'a Self>,
376        Self: Sized + 'a,
377    {
378        // Sum across workers, if all workers have participated
379        // a non-`None` value.
380        let mut any_none = false;
381
382        let inner = values
383            .into_iter()
384            .filter_map(|i| {
385                any_none |= i.total.is_none();
386                i.total.as_ref()
387            })
388            .sum();
389
390        // If any are none, we can't aggregate.
391        // self.regressions is only meaningful in incorporation.
392        Self {
393            total: (!any_none).then_some(inner),
394            regressions: None,
395        }
396    }
397
398    fn incorporate(&mut self, other: Self, field_name: &'static str) {
399        match (&mut self.total, other.total) {
400            (_, None) => {}
401            (None, Some(other)) => self.total = Some(other),
402            (Some(this), Some(other)) => {
403                if other < *this {
404                    if let Some(metric) = &self.regressions {
405                        metric.inc()
406                    } else {
407                        tracing::error!(
408                            "total gauge {field_name} erroneously regressed from {} to {}",
409                            this,
410                            other
411                        );
412                    }
413                    return;
414                }
415                *this = other
416            }
417        }
418    }
419}
420
421/// A gauge that has semantics based on the `StorageMetric` implementation of its inner.
422#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
423pub struct Gauge<T>(T);
424
425impl<T> Gauge<T> {
426    // This can't be a `From` impl cause of coherence issues :(
427    pub fn gauge<F>(f: F) -> Self
428    where
429        T: From<F>,
430    {
431        Gauge(f.into())
432    }
433}
434
435impl<T: StorageMetric> StorageMetric for Gauge<T> {
436    fn summarize<'a, I>(values: I) -> Self
437    where
438        I: IntoIterator<Item = &'a Self>,
439        Self: Sized + 'a,
440    {
441        Gauge(T::summarize(values.into_iter().map(|i| &i.0)))
442    }
443
444    fn incorporate(&mut self, other: Self, field_name: &'static str) {
445        self.0.incorporate(other.0, field_name)
446    }
447}
448
449/// A trait that abstracts over user-facing statistics objects that can be
450/// packed into a `Row` and unpacked again. That is they round-trip.
451pub trait PackableStats {
452    /// Pack `self` into the `Row`.
453    fn pack(&self, packer: mz_repr::RowPacker<'_>);
454    /// Unpack a `Row` back into a `Self`.
455    fn unpack(
456        row: Row,
457        metrics: &crate::metrics::StorageControllerMetrics,
458    ) -> (GlobalId, Option<ReplicaId>, Self);
459}
460
461/// Stats that can get out-of-date and should then get reaped.
462pub trait ExpirableStats {
463    /// The last time this bag of stats was updated (via an update coming in
464    /// from a replica, presumably).
465    fn last_updated(&self) -> Instant;
466}
467
468/// Stats that should be initialized with a row of zeroes, that is we want the
469/// first data to show up in the builtin stats collection to be zeroes.
470///
471/// We do the tracking within the stat object itself instead of adding external
472/// tracking. That's a bit of a roundabout way of doing it, but it works.
473///
474/// This is an odd requirement, and you would think we should just report what
475/// is there when we get it, but this was consciously changed a while back and
476/// the current Console incarnation depends on this behavior.
477pub trait ZeroInitializedStats {
478    /// `True` if we have have not emitted a "zero update" for this stats blob.
479    fn needs_zero_initialization(&self) -> bool;
480
481    /// Marks that we have emitted a "zero update" for this stats blob.
482    fn mark_zero_initialized(&mut self);
483
484    /// Returns a "all zero" instance of this stat, with collection id and/or
485    /// replica id retained.
486    fn zero_stat(&self) -> Self;
487}
488
489/// An update as reported from a storage instance. The semantics
490/// of each field are documented above in `MZ_SOURCE_STATISTICS_RAW_DESC`,
491/// and encoded in the field types.
492#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
493pub struct SourceStatisticsUpdate {
494    pub id: GlobalId,
495
496    pub messages_received: Counter,
497    pub bytes_received: Counter,
498    pub updates_staged: Counter,
499    pub updates_committed: Counter,
500
501    pub records_indexed: Gauge<ResettingTotal>,
502    pub bytes_indexed: Gauge<ResettingTotal>,
503    pub rehydration_latency_ms: Gauge<ResettingLatency>,
504    pub snapshot_records_known: Gauge<ResettingNullableTotal>,
505    pub snapshot_records_staged: Gauge<ResettingNullableTotal>,
506
507    pub snapshot_committed: Gauge<Boolean>,
508    // `offset_known` is enriched with a counter in `unpack` and `with_metrics` that is
509    // bumped whenever it regresses. This is distinct from `offset_committed`, which
510    // `error!` logs.
511    //
512    // `offset_committed` is entirely in our control: it is calculated from source frontiers
513    // that are guaranteed to never go backwards. Therefore, it regresses is a bug in how we
514    // calculate it.
515    //
516    // `offset_known` is calculated based on information the upstream service of the source gives
517    // us. This is meaningfully less reliable, and can cause regressions in the value. Some known
518    // cases that cause this are:
519    // - A Kafka topic being deleted and recreated.
520    // - A Postgres source being restored to a backup.
521    //
522    // We attempt to communicate both of these to the user using the source status system tables.
523    // While emitting a regressed `offset_known` can be at least partially avoided in the source
524    // implementation, we avoid noisy sentry alerts by instead bumping a counter that can be used
525    // if a scenario requires more investigation.
526    pub offset_known: Gauge<Total>,
527    pub offset_committed: Gauge<Total>,
528}
529
530impl SourceStatisticsUpdate {
531    pub fn new(id: GlobalId) -> Self {
532        Self {
533            id,
534            messages_received: Default::default(),
535            bytes_received: Default::default(),
536            updates_staged: Default::default(),
537            updates_committed: Default::default(),
538            records_indexed: Default::default(),
539            bytes_indexed: Default::default(),
540            rehydration_latency_ms: Default::default(),
541            snapshot_records_known: Default::default(),
542            snapshot_records_staged: Default::default(),
543            snapshot_committed: Default::default(),
544            offset_known: Default::default(),
545            offset_committed: Default::default(),
546        }
547    }
548
549    pub fn summarize<'a, I, F>(values: F) -> Self
550    where
551        I: IntoIterator<Item = &'a Self>,
552        F: Fn() -> I,
553        Self: 'a,
554    {
555        SourceStatisticsUpdate {
556            id: values().into_iter().next().unwrap().id,
557            messages_received: Counter::summarize(
558                values().into_iter().map(|s| &s.messages_received),
559            ),
560            bytes_received: Counter::summarize(values().into_iter().map(|s| &s.bytes_received)),
561            updates_staged: Counter::summarize(values().into_iter().map(|s| &s.updates_staged)),
562            updates_committed: Counter::summarize(
563                values().into_iter().map(|s| &s.updates_committed),
564            ),
565            records_indexed: Gauge::summarize(values().into_iter().map(|s| &s.records_indexed)),
566            bytes_indexed: Gauge::summarize(values().into_iter().map(|s| &s.bytes_indexed)),
567            rehydration_latency_ms: Gauge::summarize(
568                values().into_iter().map(|s| &s.rehydration_latency_ms),
569            ),
570            snapshot_records_known: Gauge::summarize(
571                values().into_iter().map(|s| &s.snapshot_records_known),
572            ),
573            snapshot_records_staged: Gauge::summarize(
574                values().into_iter().map(|s| &s.snapshot_records_staged),
575            ),
576            snapshot_committed: Gauge::summarize(
577                values().into_iter().map(|s| &s.snapshot_committed),
578            ),
579            offset_known: Gauge::summarize(values().into_iter().map(|s| &s.offset_known)),
580            offset_committed: Gauge::summarize(values().into_iter().map(|s| &s.offset_committed)),
581        }
582    }
583
584    /// Reset counters so that we continue to ship diffs to the controller.
585    pub fn reset_counters(&mut self) {
586        self.messages_received.0 = 0;
587        self.bytes_received.0 = 0;
588        self.updates_staged.0 = 0;
589        self.updates_committed.0 = 0;
590    }
591
592    /// Reset all _resetable_ gauges to their default values.
593    pub fn reset_gauges(&mut self) {
594        self.records_indexed.0.reset();
595        self.bytes_indexed.0.reset();
596        self.rehydration_latency_ms.0.reset();
597        self.snapshot_records_known.0.reset();
598        self.snapshot_records_staged.0.reset();
599    }
600
601    pub fn incorporate(&mut self, other: SourceStatisticsUpdate) {
602        let SourceStatisticsUpdate {
603            messages_received,
604            bytes_received,
605            updates_staged,
606            updates_committed,
607            records_indexed,
608            bytes_indexed,
609            rehydration_latency_ms,
610            snapshot_records_known,
611            snapshot_records_staged,
612            snapshot_committed,
613            offset_known,
614            offset_committed,
615            ..
616        } = self;
617
618        messages_received.incorporate(other.messages_received, "messages_received");
619        bytes_received.incorporate(other.bytes_received, "bytes_received");
620        updates_staged.incorporate(other.updates_staged, "updates_staged");
621        updates_committed.incorporate(other.updates_committed, "updates_committed");
622        records_indexed.incorporate(other.records_indexed, "records_indexed");
623        bytes_indexed.incorporate(other.bytes_indexed, "bytes_indexed");
624        rehydration_latency_ms.incorporate(other.rehydration_latency_ms, "rehydration_latency_ms");
625        snapshot_records_known.incorporate(other.snapshot_records_known, "snapshot_records_known");
626        snapshot_records_staged
627            .incorporate(other.snapshot_records_staged, "snapshot_records_staged");
628        snapshot_committed.incorporate(other.snapshot_committed, "snapshot_committed");
629        offset_known.incorporate(other.offset_known, "offset_known");
630        offset_committed.incorporate(other.offset_committed, "offset_committed");
631    }
632
633    /// Incorporate only the counters of the given update, ignoring gauge values.
634    pub fn incorporate_counters(&mut self, other: SourceStatisticsUpdate) {
635        let SourceStatisticsUpdate {
636            messages_received,
637            bytes_received,
638            updates_staged,
639            updates_committed,
640            ..
641        } = self;
642
643        messages_received.incorporate(other.messages_received, "messages_received");
644        bytes_received.incorporate(other.bytes_received, "bytes_received");
645        updates_staged.incorporate(other.updates_staged, "updates_staged");
646        updates_committed.incorporate(other.updates_committed, "updates_committed");
647    }
648
649    /// Enrich statistics that use prometheus metrics.
650    pub fn with_metrics(mut self, metrics: &crate::metrics::StorageControllerMetrics) -> Self {
651        self.offset_known.0.regressions = Some(metrics.regressed_offset_known(self.id));
652        self
653    }
654}
655
656impl PackableStats for SourceStatisticsUpdate {
657    fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
658        use mz_repr::Datum;
659        // id
660        packer.push(Datum::from(self.id.to_string().as_str()));
661        // Counters.
662        packer.push(Datum::from(self.messages_received.0));
663        packer.push(Datum::from(self.bytes_received.0));
664        packer.push(Datum::from(self.updates_staged.0));
665        packer.push(Datum::from(self.updates_committed.0));
666        // Resetting gauges.
667        packer.push(Datum::from(self.records_indexed.0.0));
668        packer.push(Datum::from(self.bytes_indexed.0.0));
669        let rehydration_latency = self
670            .rehydration_latency_ms
671            .0
672            .0
673            .map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000));
674        packer.push(Datum::from(rehydration_latency));
675        packer.push(Datum::from(self.snapshot_records_known.0.0));
676        packer.push(Datum::from(self.snapshot_records_staged.0.0));
677        // Gauges
678        packer.push(Datum::from(self.snapshot_committed.0.0));
679        packer.push(Datum::from(self.offset_known.0.pack()));
680        packer.push(Datum::from(self.offset_committed.0.pack()));
681    }
682
683    fn unpack(
684        row: Row,
685        metrics: &crate::metrics::StorageControllerMetrics,
686    ) -> (GlobalId, Option<ReplicaId>, Self) {
687        let mut iter = row.iter();
688        let mut s = Self {
689            id: iter.next().unwrap().unwrap_str().parse().unwrap(),
690
691            messages_received: iter.next().unwrap().unwrap_uint64().into(),
692            bytes_received: iter.next().unwrap().unwrap_uint64().into(),
693            updates_staged: iter.next().unwrap().unwrap_uint64().into(),
694            updates_committed: iter.next().unwrap().unwrap_uint64().into(),
695
696            records_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
697            bytes_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
698            rehydration_latency_ms: Gauge::gauge(
699                <Option<mz_repr::adt::interval::Interval>>::try_from(iter.next().unwrap())
700                    .unwrap()
701                    .map(|int| int.micros / 1000),
702            ),
703            snapshot_records_known: Gauge::gauge(
704                <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
705            ),
706            snapshot_records_staged: Gauge::gauge(
707                <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
708            ),
709
710            snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()),
711            offset_known: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
712            offset_committed: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
713        };
714
715        s.offset_known.0.regressions = Some(metrics.regressed_offset_known(s.id));
716        (s.id, None, s)
717    }
718}
719
720/// Statistics that we keep in the controller about a given collection
721/// (identified by id) on a given replica (identified by replica_id).
722///
723/// This mirrors [SourceStatisticsUpdate] and we update ourselve by
724/// incorporating them using [ControllerSourceStatistics::incorporate]
725#[derive(Clone, Debug, PartialEq)]
726pub struct ControllerSourceStatistics {
727    pub id: GlobalId,
728
729    // This is an Option because of Webhook "sources", which don't really run on
730    // a cluster, but we decided a while ago that they are "sources", so here we
731    // are and these show up in the source statistics collection.
732    pub replica_id: Option<ReplicaId>,
733
734    pub messages_received: Counter,
735    pub bytes_received: Counter,
736    pub updates_staged: Counter,
737    pub updates_committed: Counter,
738
739    pub records_indexed: Gauge<ResettingTotal>,
740    pub bytes_indexed: Gauge<ResettingTotal>,
741    pub rehydration_latency_ms: Gauge<ResettingLatency>,
742    pub snapshot_records_known: Gauge<ResettingNullableTotal>,
743    pub snapshot_records_staged: Gauge<ResettingNullableTotal>,
744
745    pub snapshot_committed: Gauge<Boolean>,
746    pub offset_known: Gauge<Total>,
747    pub offset_committed: Gauge<Total>,
748
749    // We don't try to be cute with updating and removing stats when dropping
750    // replicas, which might run into problems with race conditions or when
751    // crashes occur.
752    //
753    // Instead, we mark when a statistic was last updated and periodically prune
754    // old statistics. It's fine if this is not 100% accurate or when the
755    // "counter" resets when the controller restarts. Eventually things will get
756    // dropped.
757    pub last_updated: Instant,
758
759    // For implementing `ZeroInitializedStats`.
760    needs_zero_initialization: bool,
761}
762
763impl ControllerSourceStatistics {
764    pub fn new(id: GlobalId, replica_id: Option<ReplicaId>) -> Self {
765        Self {
766            id,
767            replica_id,
768            messages_received: Default::default(),
769            bytes_received: Default::default(),
770            updates_staged: Default::default(),
771            updates_committed: Default::default(),
772            records_indexed: Default::default(),
773            bytes_indexed: Default::default(),
774            rehydration_latency_ms: Default::default(),
775            snapshot_records_known: Default::default(),
776            snapshot_records_staged: Default::default(),
777            snapshot_committed: Default::default(),
778            offset_known: Default::default(),
779            offset_committed: Default::default(),
780            last_updated: Instant::now(),
781            needs_zero_initialization: true,
782        }
783    }
784
785    /// Incorporate updates from the given [SourceStatisticsUpdate] into
786    /// ourselves
787    pub fn incorporate(&mut self, update: SourceStatisticsUpdate) {
788        let ControllerSourceStatistics {
789            id: _,
790            replica_id: _,
791            messages_received,
792            bytes_received,
793            updates_staged,
794            updates_committed,
795            records_indexed,
796            bytes_indexed,
797            rehydration_latency_ms,
798            snapshot_records_known,
799            snapshot_records_staged,
800            snapshot_committed,
801            offset_known,
802            offset_committed,
803            last_updated,
804            needs_zero_initialization: _,
805        } = self;
806
807        messages_received.incorporate(update.messages_received, "messages_received");
808        bytes_received.incorporate(update.bytes_received, "bytes_received");
809        updates_staged.incorporate(update.updates_staged, "updates_staged");
810        updates_committed.incorporate(update.updates_committed, "updates_committed");
811        records_indexed.incorporate(update.records_indexed, "records_indexed");
812        bytes_indexed.incorporate(update.bytes_indexed, "bytes_indexed");
813        rehydration_latency_ms.incorporate(update.rehydration_latency_ms, "rehydration_latency_ms");
814        snapshot_records_known.incorporate(update.snapshot_records_known, "snapshot_records_known");
815        snapshot_records_staged
816            .incorporate(update.snapshot_records_staged, "snapshot_records_staged");
817        snapshot_committed.incorporate(update.snapshot_committed, "snapshot_committed");
818        offset_known.incorporate(update.offset_known, "offset_known");
819        offset_committed.incorporate(update.offset_committed, "offset_committed");
820
821        *last_updated = Instant::now();
822    }
823}
824
825impl PackableStats for ControllerSourceStatistics {
826    fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
827        use mz_repr::Datum;
828        // id
829        packer.push(Datum::from(self.id.to_string().as_str()));
830        // replica_id
831        if let Some(replica_id) = self.replica_id {
832            packer.push(Datum::from(replica_id.to_string().as_str()));
833        } else {
834            packer.push(Datum::Null);
835        }
836        // Counters.
837        packer.push(Datum::from(self.messages_received.0));
838        packer.push(Datum::from(self.bytes_received.0));
839        packer.push(Datum::from(self.updates_staged.0));
840        packer.push(Datum::from(self.updates_committed.0));
841        // Resetting gauges.
842        packer.push(Datum::from(self.records_indexed.0.0));
843        packer.push(Datum::from(self.bytes_indexed.0.0));
844        let rehydration_latency = self
845            .rehydration_latency_ms
846            .0
847            .0
848            .map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000));
849        packer.push(Datum::from(rehydration_latency));
850        packer.push(Datum::from(self.snapshot_records_known.0.0));
851        packer.push(Datum::from(self.snapshot_records_staged.0.0));
852        // Gauges
853        packer.push(Datum::from(self.snapshot_committed.0.0));
854        packer.push(Datum::from(self.offset_known.0.pack()));
855        packer.push(Datum::from(self.offset_committed.0.pack()));
856    }
857
858    fn unpack(
859        row: Row,
860        metrics: &crate::metrics::StorageControllerMetrics,
861    ) -> (GlobalId, Option<ReplicaId>, Self) {
862        let mut iter = row.iter();
863        let id = iter.next().unwrap().unwrap_str().parse().unwrap();
864        let replica_id_or_null = iter.next().unwrap();
865
866        let replica_id = if replica_id_or_null.is_null() {
867            None
868        } else {
869            Some(
870                replica_id_or_null
871                    .unwrap_str()
872                    .parse::<ReplicaId>()
873                    .unwrap(),
874            )
875        };
876
877        let mut s = Self {
878            id,
879            replica_id,
880
881            messages_received: iter.next().unwrap().unwrap_uint64().into(),
882            bytes_received: iter.next().unwrap().unwrap_uint64().into(),
883            updates_staged: iter.next().unwrap().unwrap_uint64().into(),
884            updates_committed: iter.next().unwrap().unwrap_uint64().into(),
885
886            records_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
887            bytes_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
888            rehydration_latency_ms: Gauge::gauge(
889                <Option<mz_repr::adt::interval::Interval>>::try_from(iter.next().unwrap())
890                    .unwrap()
891                    .map(|int| int.micros / 1000),
892            ),
893            snapshot_records_known: Gauge::gauge(
894                <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
895            ),
896            snapshot_records_staged: Gauge::gauge(
897                <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
898            ),
899
900            snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()),
901            offset_known: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
902            offset_committed: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
903            last_updated: Instant::now(),
904            // This is coming from a Row, so we have definitely already written
905            // out a "zero update" at some point.
906            needs_zero_initialization: false,
907        };
908
909        s.offset_known.0.regressions = Some(metrics.regressed_offset_known(s.id));
910        (s.id, replica_id, s)
911    }
912}
913
914impl ExpirableStats for ControllerSourceStatistics {
915    fn last_updated(&self) -> Instant {
916        self.last_updated
917    }
918}
919
920impl ZeroInitializedStats for ControllerSourceStatistics {
921    fn needs_zero_initialization(&self) -> bool {
922        self.needs_zero_initialization
923    }
924
925    fn mark_zero_initialized(&mut self) {
926        self.needs_zero_initialization = false;
927    }
928
929    fn zero_stat(&self) -> Self {
930        ControllerSourceStatistics::new(self.id, self.replica_id)
931    }
932}
933
934/// Statistics that we keep in the controller about a given collection
935/// (identified by id) on a given replica (identified by replica_id).
936///
937/// This mirrors [SinkStatisticsUpdate] and we update ourselve by incorporating
938/// them using [ControllerSinkStatistics::incorporate]
939#[derive(Clone, Debug, PartialEq)]
940pub struct ControllerSinkStatistics {
941    pub id: GlobalId,
942    pub replica_id: ReplicaId,
943
944    pub messages_staged: Counter,
945    pub messages_committed: Counter,
946    pub bytes_staged: Counter,
947    pub bytes_committed: Counter,
948
949    // See comment on ControllerSourceStatistics.last_updated.
950    pub last_updated: Instant,
951
952    // For implementing `ZeroInitializedStats`.
953    needs_zero_initialization: bool,
954}
955
956impl ControllerSinkStatistics {
957    pub fn new(id: GlobalId, replica_id: ReplicaId) -> Self {
958        Self {
959            id,
960            replica_id,
961            messages_staged: Default::default(),
962            messages_committed: Default::default(),
963            bytes_staged: Default::default(),
964            bytes_committed: Default::default(),
965            last_updated: Instant::now(),
966            needs_zero_initialization: true,
967        }
968    }
969
970    /// Incorporate updates from the given [SinkStatisticsUpdate] into ourselves
971    pub fn incorporate(&mut self, update: SinkStatisticsUpdate) {
972        let ControllerSinkStatistics {
973            id: _,
974            replica_id: _,
975            messages_staged,
976            messages_committed,
977            bytes_staged,
978            bytes_committed,
979            last_updated,
980            needs_zero_initialization: _,
981        } = self;
982
983        messages_staged.incorporate(update.messages_staged, "messages_staged");
984        bytes_staged.incorporate(update.bytes_staged, "bytes_staged");
985        messages_committed.incorporate(update.messages_committed, "messages_committed");
986        bytes_committed.incorporate(update.bytes_committed, "bytes_committed");
987
988        *last_updated = Instant::now();
989    }
990}
991
992impl PackableStats for ControllerSinkStatistics {
993    fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
994        use mz_repr::Datum;
995        packer.push(Datum::from(self.id.to_string().as_str()));
996        packer.push(Datum::from(self.replica_id.to_string().as_str()));
997        packer.push(Datum::from(self.messages_staged.0));
998        packer.push(Datum::from(self.messages_committed.0));
999        packer.push(Datum::from(self.bytes_staged.0));
1000        packer.push(Datum::from(self.bytes_committed.0));
1001    }
1002
1003    fn unpack(
1004        row: Row,
1005        _metrics: &crate::metrics::StorageControllerMetrics,
1006    ) -> (GlobalId, Option<ReplicaId>, Self) {
1007        let mut iter = row.iter();
1008        let s = Self {
1009            id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1010            replica_id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1011            messages_staged: iter.next().unwrap().unwrap_uint64().into(),
1012            messages_committed: iter.next().unwrap().unwrap_uint64().into(),
1013            bytes_staged: iter.next().unwrap().unwrap_uint64().into(),
1014            bytes_committed: iter.next().unwrap().unwrap_uint64().into(),
1015            last_updated: Instant::now(),
1016            // This is coming from a Row, so we have definitely already written
1017            // out a "zero update" at some point.
1018            needs_zero_initialization: false,
1019        };
1020        (s.id, Some(s.replica_id), s)
1021    }
1022}
1023
1024impl ExpirableStats for ControllerSinkStatistics {
1025    fn last_updated(&self) -> Instant {
1026        self.last_updated
1027    }
1028}
1029
1030impl ZeroInitializedStats for ControllerSinkStatistics {
1031    fn needs_zero_initialization(&self) -> bool {
1032        self.needs_zero_initialization
1033    }
1034
1035    fn mark_zero_initialized(&mut self) {
1036        self.needs_zero_initialization = false;
1037    }
1038
1039    fn zero_stat(&self) -> Self {
1040        ControllerSinkStatistics::new(self.id, self.replica_id)
1041    }
1042}
1043
1044/// An update as reported from a storage instance. The semantics
1045/// of each field are documented above in `MZ_SINK_STATISTICS_RAW_DESC`,
1046/// and encoded in the field types.
1047#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
1048pub struct SinkStatisticsUpdate {
1049    pub id: GlobalId,
1050
1051    pub messages_staged: Counter,
1052    pub messages_committed: Counter,
1053    pub bytes_staged: Counter,
1054    pub bytes_committed: Counter,
1055}
1056
1057impl SinkStatisticsUpdate {
1058    pub fn new(id: GlobalId) -> Self {
1059        Self {
1060            id,
1061            messages_staged: Default::default(),
1062            messages_committed: Default::default(),
1063            bytes_staged: Default::default(),
1064            bytes_committed: Default::default(),
1065        }
1066    }
1067
1068    pub fn incorporate(&mut self, other: SinkStatisticsUpdate) {
1069        let SinkStatisticsUpdate {
1070            messages_staged,
1071            messages_committed,
1072            bytes_staged,
1073            bytes_committed,
1074            ..
1075        } = self;
1076
1077        messages_staged.incorporate(other.messages_staged, "messages_staged");
1078        messages_committed.incorporate(other.messages_committed, "messages_committed");
1079        bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
1080        bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
1081    }
1082
1083    /// Incorporate only the counters of the given update, ignoring gauge values.
1084    pub fn incorporate_counters(&mut self, other: SinkStatisticsUpdate) {
1085        let SinkStatisticsUpdate {
1086            messages_staged,
1087            messages_committed,
1088            bytes_staged,
1089            bytes_committed,
1090            ..
1091        } = self;
1092
1093        messages_staged.incorporate(other.messages_staged, "messages_staged");
1094        messages_committed.incorporate(other.messages_committed, "messages_committed");
1095        bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
1096        bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
1097    }
1098
1099    pub fn summarize<'a, I, F>(values: F) -> Self
1100    where
1101        I: IntoIterator<Item = &'a Self>,
1102        F: Fn() -> I,
1103        Self: 'a,
1104    {
1105        SinkStatisticsUpdate {
1106            id: values().into_iter().next().unwrap().id,
1107            messages_staged: Counter::summarize(values().into_iter().map(|s| &s.messages_staged)),
1108            messages_committed: Counter::summarize(
1109                values().into_iter().map(|s| &s.messages_committed),
1110            ),
1111            bytes_staged: Counter::summarize(values().into_iter().map(|s| &s.bytes_staged)),
1112            bytes_committed: Counter::summarize(values().into_iter().map(|s| &s.bytes_committed)),
1113        }
1114    }
1115
1116    /// Reset counters so that we continue to ship diffs to the controller.
1117    pub fn reset_counters(&mut self) {
1118        self.messages_staged.0 = 0;
1119        self.messages_committed.0 = 0;
1120        self.bytes_staged.0 = 0;
1121        self.bytes_committed.0 = 0;
1122    }
1123
1124    /// Reset all _resetable_ gauges to their default values.
1125    pub fn reset_gauges(&self) {}
1126}
1127
1128impl PackableStats for SinkStatisticsUpdate {
1129    fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
1130        use mz_repr::Datum;
1131        packer.push(Datum::from(self.id.to_string().as_str()));
1132        packer.push(Datum::from(self.messages_staged.0));
1133        packer.push(Datum::from(self.messages_committed.0));
1134        packer.push(Datum::from(self.bytes_staged.0));
1135        packer.push(Datum::from(self.bytes_committed.0));
1136    }
1137
1138    fn unpack(
1139        row: Row,
1140        _metrics: &crate::metrics::StorageControllerMetrics,
1141    ) -> (GlobalId, Option<ReplicaId>, Self) {
1142        let mut iter = row.iter();
1143        let s = Self {
1144            // Id
1145            id: iter.next().unwrap().unwrap_str().parse().unwrap(),
1146            // Counters
1147            messages_staged: iter.next().unwrap().unwrap_uint64().into(),
1148            messages_committed: iter.next().unwrap().unwrap_uint64().into(),
1149            bytes_staged: iter.next().unwrap().unwrap_uint64().into(),
1150            bytes_committed: iter.next().unwrap().unwrap_uint64().into(),
1151        };
1152        (s.id, None, s)
1153    }
1154}
1155
1156/// Statistics for webhooks.
1157#[derive(Default, Debug)]
1158pub struct WebhookStatistics {
1159    pub messages_received: AtomicU64,
1160    pub bytes_received: AtomicU64,
1161    pub updates_staged: AtomicU64,
1162    pub updates_committed: AtomicU64,
1163}
1164
1165impl WebhookStatistics {
1166    /// Drain the current statistics into a `SourceStatisticsUpdate` with
1167    /// other values defaulted, resetting the atomic counters.
1168    pub fn drain_into_update(&self, id: GlobalId) -> SourceStatisticsUpdate {
1169        SourceStatisticsUpdate {
1170            id,
1171            messages_received: self.messages_received.swap(0, Ordering::Relaxed).into(),
1172            bytes_received: self.bytes_received.swap(0, Ordering::Relaxed).into(),
1173            updates_staged: self.updates_staged.swap(0, Ordering::Relaxed).into(),
1174            updates_committed: self.updates_committed.swap(0, Ordering::Relaxed).into(),
1175            records_indexed: Gauge::gauge(0),
1176            bytes_indexed: Gauge::gauge(0),
1177            rehydration_latency_ms: Gauge::gauge(None),
1178            snapshot_records_known: Gauge::gauge(None),
1179            snapshot_records_staged: Gauge::gauge(None),
1180            snapshot_committed: Gauge::gauge(true),
1181            offset_known: Gauge::gauge(None::<u64>),
1182            offset_committed: Gauge::gauge(None::<u64>),
1183        }
1184    }
1185}