mz_storage_client/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! These structure represents a full set up updates for the `mz_source_statistics_raw`
11//! and `mz_sink_statistics_raw` tables for a specific source-worker/sink-worker pair.
12//! They are structured like this for simplicity
13//! and efficiency: Each storage worker can individually collect and consolidate metrics,
14//! then control how much `StorageResponse` traffic is produced when sending updates
15//! back to the controller to be written.
16//!
17//! The proto conversions for this types are in the `client` module, for now.
18
19use std::sync::LazyLock;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use serde::{Deserialize, Serialize};
23
24use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
25use mz_repr::{GlobalId, RelationDesc, Row, ScalarType};
26
27include!(concat!(env!("OUT_DIR"), "/mz_storage_client.statistics.rs"));
28
29pub static MZ_SOURCE_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
30    RelationDesc::builder()
31        // Id of the source (or subsource).
32        .with_column("id", ScalarType::String.nullable(false))
33        //
34        // Counters
35        //
36        // A counter of the messages we have read from upstream for this source.
37        // Never resets.
38        .with_column("messages_received", ScalarType::UInt64.nullable(false))
39        // A counter of the bytes we have read from upstream for this source.
40        // Never resets.
41        .with_column("bytes_received", ScalarType::UInt64.nullable(false))
42        // A counter of the updates we have staged to commit for this source.
43        // Never resets.
44        .with_column("updates_staged", ScalarType::UInt64.nullable(false))
45        // A counter of the updates we have committed for this source.
46        // Never resets.
47        .with_column("updates_committed", ScalarType::UInt64.nullable(false))
48        //
49        // Resetting gauges
50        //
51        // A gauge of the number of records in the envelope state. 0 for sources
52        // Resetted when the source is restarted, for any reason.
53        .with_column("records_indexed", ScalarType::UInt64.nullable(false))
54        // A gauge of the number of bytes in the envelope state. 0 for sources
55        // Resetted when the source is restarted, for any reason.
56        .with_column("bytes_indexed", ScalarType::UInt64.nullable(false))
57        // A gauge that shows the duration of rehydration. `NULL` before rehydration
58        // is done.
59        // Resetted when the source is restarted, for any reason.
60        .with_column("rehydration_latency", ScalarType::Interval.nullable(true))
61        // A gauge of the number of _values_ (source defined unit) the _snapshot_ of this source
62        // contains.
63        // Sometimes resetted when the source can snapshot new pieces of upstream (like Postgres and
64        // MySql).
65        // (like pg and mysql) may repopulate this column when tables are added.
66        //
67        // `NULL` while we discover the snapshot size.
68        .with_column("snapshot_records_known", ScalarType::UInt64.nullable(true))
69        // A gauge of the number of _values_ (source defined unit) we have read of the _snapshot_
70        // of this source.
71        // Sometimes resetted when the source can snapshot new pieces of upstream (like Postgres and
72        // MySql).
73        //
74        // `NULL` while we discover the snapshot size.
75        .with_column("snapshot_records_staged", ScalarType::UInt64.nullable(true))
76        //
77        // Non-resetting gauges
78        //
79        // Whether or not the snapshot for the source has been committed. Never resets.
80        .with_column("snapshot_committed", ScalarType::Bool.nullable(false))
81        // The following are not yet reported by sources and have 0 or `NULL` values.
82        // They have been added here to reduce churn changing the schema of this collection.
83        //
84        // These are left nullable for now in case we want semantics for `NULL` values. We
85        // currently never expose null values.
86        //
87        // A gauge of the number of _values_ (source defined unit) available to be read from upstream.
88        // Never resets. Not to be confused with any of the counters above.
89        .with_column("offset_known", ScalarType::UInt64.nullable(true))
90        // A gauge of the number of _values_ (source defined unit) we have committed.
91        // Never resets. Not to be confused with any of the counters above.
92        .with_column("offset_committed", ScalarType::UInt64.nullable(true))
93        .finish()
94});
95
96pub static MZ_SINK_STATISTICS_RAW_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
97    RelationDesc::builder()
98        // Id of the sink.
99        .with_column("id", ScalarType::String.nullable(false))
100        //
101        // Counters
102        //
103        // A counter of the messages we have staged to upstream.
104        // Never resets.
105        .with_column("messages_staged", ScalarType::UInt64.nullable(false))
106        // A counter of the messages we have committed.
107        // Never resets.
108        .with_column("messages_committed", ScalarType::UInt64.nullable(false))
109        // A counter of the bytes we have staged to upstream.
110        // Never resets.
111        .with_column("bytes_staged", ScalarType::UInt64.nullable(false))
112        // A counter of the bytes we have committed.
113        // Never resets.
114        .with_column("bytes_committed", ScalarType::UInt64.nullable(false))
115        .finish()
116});
117
118// Types of statistics (counter and various types of gauges), that have different semantics
119// when sources/sinks are reset.
120
121/// A trait that defines the semantics storage statistics are able to have
122pub trait StorageMetric {
123    /// Summarizes a set of measurements into a single representative value.
124    /// Typically this function is used to summarize the measurements collected from each worker.
125    fn summarize<'a, I>(values: I) -> Self
126    where
127        I: IntoIterator<Item = &'a Self>,
128        Self: Sized + 'a;
129
130    /// Incorporate this value with another.
131    fn incorporate(&mut self, other: Self, field_name: &'static str);
132}
133
134/// A counter that never resets.
135#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
136pub struct Counter(u64);
137
138impl StorageMetric for Counter {
139    fn summarize<'a, I>(values: I) -> Self
140    where
141        I: IntoIterator<Item = &'a Self>,
142        Self: Sized + 'a,
143    {
144        // Sum across workers.
145        Self(values.into_iter().map(|c| c.0).sum())
146    }
147
148    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
149        // Always add the new value to the existing one.
150        self.0 += other.0
151    }
152}
153
154impl From<u64> for Counter {
155    fn from(f: u64) -> Self {
156        Counter(f)
157    }
158}
159
160/// A latency gauge that is reset on every restart.
161#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
162pub struct ResettingLatency(Option<i64>);
163
164impl From<Option<i64>> for ResettingLatency {
165    fn from(f: Option<i64>) -> Self {
166        ResettingLatency(f)
167    }
168}
169
170impl StorageMetric for ResettingLatency {
171    fn summarize<'a, I>(values: I) -> Self
172    where
173        I: IntoIterator<Item = &'a Self>,
174        Self: Sized + 'a,
175    {
176        let mut max = 0;
177        for value in values {
178            match value.0 {
179                // If any of the values are `NULL`, then we don't yet know the max.
180                None => return Self(None),
181                // Pick the worst latency across workers.
182                Some(value) => max = std::cmp::max(max, value),
183            }
184        }
185
186        Self(Some(max))
187    }
188
189    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
190        // Reset to the new value.
191        self.0 = other.0;
192    }
193}
194
195impl ResettingLatency {
196    fn reset(&mut self) {
197        self.0 = None;
198    }
199}
200
201/// A numerical gauge that is always resets, but can start out as `NULL`.
202#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
203pub struct ResettingNullableTotal(Option<u64>);
204
205impl StorageMetric for ResettingNullableTotal {
206    fn summarize<'a, I>(values: I) -> Self
207    where
208        I: IntoIterator<Item = &'a Self>,
209        Self: Sized + 'a,
210    {
211        let mut sum = 0;
212        for value in values {
213            match value.0 {
214                // If any of the values are `NULL`, then we merge to `NULL`
215                None => return Self(None),
216                // Pick the worst latency across workers.
217                Some(value) => sum += value,
218            }
219        }
220
221        Self(Some(sum))
222    }
223
224    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
225        match (&mut self.0, other.0) {
226            (None, other) => {
227                self.0 = other;
228            }
229            // Override the total.
230            (Some(this), Some(other)) => *this = other,
231            (Some(_), None) => {
232                // `NULL`'s don't reset the value.
233            }
234        }
235    }
236}
237
238impl From<Option<u64>> for ResettingNullableTotal {
239    fn from(f: Option<u64>) -> Self {
240        ResettingNullableTotal(f)
241    }
242}
243
244impl ResettingNullableTotal {
245    fn reset(&mut self) {
246        self.0 = None;
247    }
248}
249
250/// A numerical gauge that is always resets.
251#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
252pub struct ResettingTotal(u64);
253
254impl StorageMetric for ResettingTotal {
255    fn summarize<'a, I>(values: I) -> Self
256    where
257        I: IntoIterator<Item = &'a Self>,
258        Self: Sized + 'a,
259    {
260        // Sum across workers.
261        Self(values.into_iter().map(|c| c.0).sum())
262    }
263
264    fn incorporate(&mut self, other: Self, _field_name: &'static str) {
265        // Reset the pre-existing value.
266        self.0 = other.0;
267    }
268}
269
270impl From<u64> for ResettingTotal {
271    fn from(f: u64) -> Self {
272        ResettingTotal(f)
273    }
274}
275
276impl ResettingTotal {
277    fn reset(&mut self) {
278        self.0 = 0;
279    }
280}
281
282/// A boolean gauge that is never reset.
283#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
284pub struct Boolean(bool);
285
286impl StorageMetric for Boolean {
287    fn summarize<'a, I>(values: I) -> Self
288    where
289        I: IntoIterator<Item = &'a Self>,
290        Self: Sized + 'a,
291    {
292        // All workers must be true for this gauge to be true.
293        Self(values.into_iter().fold(true, |base, new| base & new.0))
294    }
295
296    fn incorporate(&mut self, other: Self, field_name: &'static str) {
297        // A boolean regressing to `false` is a bug.
298        //
299        // Clippy's suggestion here is not good!
300        #[allow(clippy::bool_comparison)]
301        if other.0 < self.0 {
302            tracing::error!(
303                "boolean gauge for field {field_name} erroneously regressed from true to false",
304            );
305            return;
306        }
307        self.0 = other.0;
308    }
309}
310
311impl From<bool> for Boolean {
312    fn from(f: bool) -> Self {
313        Boolean(f)
314    }
315}
316
317/// A numerical gauge that never regresses.
318#[derive(Debug, Serialize, Deserialize, Default)]
319pub struct Total {
320    /// Defaults to 0. Can be skipped on updates from clusterd.
321    total: Option<u64>,
322    /// If provided, it is bumped on regressions, as opposed to `error!`
323    /// logs.
324    #[serde(skip)]
325    regressions:
326        Option<mz_ore::metrics::DeleteOnDropCounter<prometheus::core::AtomicU64, Vec<String>>>,
327}
328
329impl From<Option<u64>> for Total {
330    fn from(f: Option<u64>) -> Self {
331        Total {
332            total: f,
333            regressions: None,
334        }
335    }
336}
337
338impl Clone for Total {
339    fn clone(&self) -> Self {
340        Self {
341            total: self.total,
342            regressions: None,
343        }
344    }
345}
346
347impl PartialEq for Total {
348    fn eq(&self, other: &Self) -> bool {
349        self.total == other.total
350    }
351}
352
353impl Total {
354    /// Pack this `Total` into a `u64`, defaulting to 0.
355    fn pack(&self) -> u64 {
356        self.total.unwrap_or_default()
357    }
358}
359
360impl StorageMetric for Total {
361    fn summarize<'a, I>(values: I) -> Self
362    where
363        I: IntoIterator<Item = &'a Self>,
364        Self: Sized + 'a,
365    {
366        // Sum across workers, if all workers have participated
367        // a non-`None` value.
368        let mut any_none = false;
369
370        let inner = values
371            .into_iter()
372            .filter_map(|i| {
373                any_none |= i.total.is_none();
374                i.total.as_ref()
375            })
376            .sum();
377
378        // If any are none, we can't aggregate.
379        // self.regressions is only meaningful in incorporation.
380        Self {
381            total: (!any_none).then_some(inner),
382            regressions: None,
383        }
384    }
385
386    fn incorporate(&mut self, other: Self, field_name: &'static str) {
387        match (&mut self.total, other.total) {
388            (_, None) => {}
389            (None, Some(other)) => self.total = Some(other),
390            (Some(this), Some(other)) => {
391                if other < *this {
392                    if let Some(metric) = &self.regressions {
393                        metric.inc()
394                    } else {
395                        tracing::error!(
396                            "total gauge {field_name} erroneously regressed from {} to {}",
397                            this,
398                            other
399                        );
400                    }
401                    return;
402                }
403                *this = other
404            }
405        }
406    }
407}
408
409/// A gauge that has semantics based on the `StorageMetric` implementation of its inner.
410#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
411pub struct Gauge<T>(T);
412
413impl<T> Gauge<T> {
414    // This can't be a `From` impl cause of coherence issues :(
415    pub fn gauge<F>(f: F) -> Self
416    where
417        T: From<F>,
418    {
419        Gauge(f.into())
420    }
421}
422
423impl<T: StorageMetric> StorageMetric for Gauge<T> {
424    fn summarize<'a, I>(values: I) -> Self
425    where
426        I: IntoIterator<Item = &'a Self>,
427        Self: Sized + 'a,
428    {
429        Gauge(T::summarize(values.into_iter().map(|i| &i.0)))
430    }
431
432    fn incorporate(&mut self, other: Self, field_name: &'static str) {
433        self.0.incorporate(other.0, field_name)
434    }
435}
436
437/// A trait that abstracts over user-facing statistics objects, used
438/// by `spawn_statistics_scraper`.
439pub trait PackableStats {
440    /// Pack `self` into the `Row`.
441    fn pack(&self, packer: mz_repr::RowPacker<'_>);
442    /// Unpack a `Row` back into a `Self`.
443    fn unpack(row: Row, metrics: &crate::metrics::StorageControllerMetrics) -> (GlobalId, Self);
444}
445
446/// An update as reported from a storage instance. The semantics
447/// of each field are documented above in `MZ_SOURCE_STATISTICS_RAW_DESC`,
448/// and encoded in the field types.
449#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
450pub struct SourceStatisticsUpdate {
451    pub id: GlobalId,
452
453    pub messages_received: Counter,
454    pub bytes_received: Counter,
455    pub updates_staged: Counter,
456    pub updates_committed: Counter,
457
458    pub records_indexed: Gauge<ResettingTotal>,
459    pub bytes_indexed: Gauge<ResettingTotal>,
460    pub rehydration_latency_ms: Gauge<ResettingLatency>,
461    pub snapshot_records_known: Gauge<ResettingNullableTotal>,
462    pub snapshot_records_staged: Gauge<ResettingNullableTotal>,
463
464    pub snapshot_committed: Gauge<Boolean>,
465    // `offset_known` is enriched with a counter in `unpack` and `with_metrics` that is
466    // bumped whenever it regresses. This is distinct from `offset_committed`, which
467    // `error!` logs.
468    //
469    // `offset_committed` is entirely in our control: it is calculated from source frontiers
470    // that are guaranteed to never go backwards. Therefore, it regresses is a bug in how we
471    // calculate it.
472    //
473    // `offset_known` is calculated based on information the upstream service of the source gives
474    // us. This is meaningfully less reliable, and can cause regressions in the value. Some known
475    // cases that cause this are:
476    // - A Kafka topic being deleted and recreated.
477    // - A Postgres source being restored to a backup.
478    //
479    // We attempt to communicate both of these to the user using the source status system tables.
480    // While emitting a regressed `offset_known` can be at least partially avoided in the source
481    // implementation, we avoid noisy sentry alerts by instead bumping a counter that can be used
482    // if a scenario requires more investigation.
483    pub offset_known: Gauge<Total>,
484    pub offset_committed: Gauge<Total>,
485}
486
487impl SourceStatisticsUpdate {
488    pub fn new(id: GlobalId) -> Self {
489        Self {
490            id,
491            messages_received: Default::default(),
492            bytes_received: Default::default(),
493            updates_staged: Default::default(),
494            updates_committed: Default::default(),
495            records_indexed: Default::default(),
496            bytes_indexed: Default::default(),
497            rehydration_latency_ms: Default::default(),
498            snapshot_records_known: Default::default(),
499            snapshot_records_staged: Default::default(),
500            snapshot_committed: Default::default(),
501            offset_known: Default::default(),
502            offset_committed: Default::default(),
503        }
504    }
505
506    pub fn summarize<'a, I, F>(values: F) -> Self
507    where
508        I: IntoIterator<Item = &'a Self>,
509        F: Fn() -> I,
510        Self: 'a,
511    {
512        SourceStatisticsUpdate {
513            id: values().into_iter().next().unwrap().id,
514            messages_received: Counter::summarize(
515                values().into_iter().map(|s| &s.messages_received),
516            ),
517            bytes_received: Counter::summarize(values().into_iter().map(|s| &s.bytes_received)),
518            updates_staged: Counter::summarize(values().into_iter().map(|s| &s.updates_staged)),
519            updates_committed: Counter::summarize(
520                values().into_iter().map(|s| &s.updates_committed),
521            ),
522            records_indexed: Gauge::summarize(values().into_iter().map(|s| &s.records_indexed)),
523            bytes_indexed: Gauge::summarize(values().into_iter().map(|s| &s.bytes_indexed)),
524            rehydration_latency_ms: Gauge::summarize(
525                values().into_iter().map(|s| &s.rehydration_latency_ms),
526            ),
527            snapshot_records_known: Gauge::summarize(
528                values().into_iter().map(|s| &s.snapshot_records_known),
529            ),
530            snapshot_records_staged: Gauge::summarize(
531                values().into_iter().map(|s| &s.snapshot_records_staged),
532            ),
533            snapshot_committed: Gauge::summarize(
534                values().into_iter().map(|s| &s.snapshot_committed),
535            ),
536            offset_known: Gauge::summarize(values().into_iter().map(|s| &s.offset_known)),
537            offset_committed: Gauge::summarize(values().into_iter().map(|s| &s.offset_committed)),
538        }
539    }
540
541    /// Reset counters so that we continue to ship diffs to the controller.
542    pub fn reset_counters(&mut self) {
543        self.messages_received.0 = 0;
544        self.bytes_received.0 = 0;
545        self.updates_staged.0 = 0;
546        self.updates_committed.0 = 0;
547    }
548
549    /// Reset all _resetable_ gauges to their default values.
550    pub fn reset_gauges(&mut self) {
551        self.records_indexed.0.reset();
552        self.bytes_indexed.0.reset();
553        self.rehydration_latency_ms.0.reset();
554        self.snapshot_records_known.0.reset();
555        self.snapshot_records_staged.0.reset();
556    }
557
558    pub fn incorporate(&mut self, other: SourceStatisticsUpdate) {
559        let SourceStatisticsUpdate {
560            messages_received,
561            bytes_received,
562            updates_staged,
563            updates_committed,
564            records_indexed,
565            bytes_indexed,
566            rehydration_latency_ms,
567            snapshot_records_known,
568            snapshot_records_staged,
569            snapshot_committed,
570            offset_known,
571            offset_committed,
572            ..
573        } = self;
574
575        messages_received.incorporate(other.messages_received, "messages_received");
576        bytes_received.incorporate(other.bytes_received, "bytes_received");
577        updates_staged.incorporate(other.updates_staged, "updates_staged");
578        updates_committed.incorporate(other.updates_committed, "updates_committed");
579        records_indexed.incorporate(other.records_indexed, "records_indexed");
580        bytes_indexed.incorporate(other.bytes_indexed, "bytes_indexed");
581        rehydration_latency_ms.incorporate(other.rehydration_latency_ms, "rehydration_latency_ms");
582        snapshot_records_known.incorporate(other.snapshot_records_known, "snapshot_records_known");
583        snapshot_records_staged
584            .incorporate(other.snapshot_records_staged, "snapshot_records_staged");
585        snapshot_committed.incorporate(other.snapshot_committed, "snapshot_committed");
586        offset_known.incorporate(other.offset_known, "offset_known");
587        offset_committed.incorporate(other.offset_committed, "offset_committed");
588    }
589
590    /// Incorporate only the counters of the given update, ignoring gauge values.
591    pub fn incorporate_counters(&mut self, other: SourceStatisticsUpdate) {
592        let SourceStatisticsUpdate {
593            messages_received,
594            bytes_received,
595            updates_staged,
596            updates_committed,
597            ..
598        } = self;
599
600        messages_received.incorporate(other.messages_received, "messages_received");
601        bytes_received.incorporate(other.bytes_received, "bytes_received");
602        updates_staged.incorporate(other.updates_staged, "updates_staged");
603        updates_committed.incorporate(other.updates_committed, "updates_committed");
604    }
605
606    /// Enrich statistics that use prometheus metrics.
607    pub fn with_metrics(mut self, metrics: &crate::metrics::StorageControllerMetrics) -> Self {
608        self.offset_known.0.regressions = Some(metrics.regressed_offset_known(self.id));
609        self
610    }
611}
612
613impl PackableStats for SourceStatisticsUpdate {
614    fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
615        use mz_repr::Datum;
616        // id
617        packer.push(Datum::from(self.id.to_string().as_str()));
618        // Counters.
619        packer.push(Datum::from(self.messages_received.0));
620        packer.push(Datum::from(self.bytes_received.0));
621        packer.push(Datum::from(self.updates_staged.0));
622        packer.push(Datum::from(self.updates_committed.0));
623        // Resetting gauges.
624        packer.push(Datum::from(self.records_indexed.0.0));
625        packer.push(Datum::from(self.bytes_indexed.0.0));
626        let rehydration_latency = self
627            .rehydration_latency_ms
628            .0
629            .0
630            .map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000));
631        packer.push(Datum::from(rehydration_latency));
632        packer.push(Datum::from(self.snapshot_records_known.0.0));
633        packer.push(Datum::from(self.snapshot_records_staged.0.0));
634        // Gauges
635        packer.push(Datum::from(self.snapshot_committed.0.0));
636        packer.push(Datum::from(self.offset_known.0.pack()));
637        packer.push(Datum::from(self.offset_committed.0.pack()));
638    }
639
640    fn unpack(row: Row, metrics: &crate::metrics::StorageControllerMetrics) -> (GlobalId, Self) {
641        let mut iter = row.iter();
642        let mut s = Self {
643            id: iter.next().unwrap().unwrap_str().parse().unwrap(),
644
645            messages_received: iter.next().unwrap().unwrap_uint64().into(),
646            bytes_received: iter.next().unwrap().unwrap_uint64().into(),
647            updates_staged: iter.next().unwrap().unwrap_uint64().into(),
648            updates_committed: iter.next().unwrap().unwrap_uint64().into(),
649
650            records_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
651            bytes_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()),
652            rehydration_latency_ms: Gauge::gauge(
653                <Option<mz_repr::adt::interval::Interval>>::try_from(iter.next().unwrap())
654                    .unwrap()
655                    .map(|int| int.micros / 1000),
656            ),
657            snapshot_records_known: Gauge::gauge(
658                <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
659            ),
660            snapshot_records_staged: Gauge::gauge(
661                <Option<u64>>::try_from(iter.next().unwrap()).unwrap(),
662            ),
663
664            snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()),
665            offset_known: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
666            offset_committed: Gauge::gauge(Some(iter.next().unwrap().unwrap_uint64())),
667        };
668
669        s.offset_known.0.regressions = Some(metrics.regressed_offset_known(s.id));
670        (s.id, s)
671    }
672}
673
674impl RustType<ProtoSourceStatisticsUpdate> for SourceStatisticsUpdate {
675    fn into_proto(&self) -> ProtoSourceStatisticsUpdate {
676        ProtoSourceStatisticsUpdate {
677            id: Some(self.id.into_proto()),
678
679            messages_received: self.messages_received.0,
680            bytes_received: self.bytes_received.0,
681            updates_staged: self.updates_staged.0,
682            updates_committed: self.updates_committed.0,
683
684            records_indexed: self.records_indexed.0.0,
685            bytes_indexed: self.bytes_indexed.0.0,
686            rehydration_latency_ms: self.rehydration_latency_ms.0.0,
687            snapshot_records_known: self.snapshot_records_known.0.0,
688            snapshot_records_staged: self.snapshot_records_staged.0.0,
689
690            snapshot_committed: self.snapshot_committed.0.0,
691            offset_known: self.offset_known.0.total,
692            offset_committed: self.offset_committed.0.total,
693        }
694    }
695
696    fn from_proto(proto: ProtoSourceStatisticsUpdate) -> Result<Self, TryFromProtoError> {
697        Ok(SourceStatisticsUpdate {
698            id: proto
699                .id
700                .into_rust_if_some("ProtoSourceStatisticsUpdate::id")?,
701
702            messages_received: Counter(proto.messages_received),
703            bytes_received: Counter(proto.bytes_received),
704            updates_staged: Counter(proto.updates_staged),
705            updates_committed: Counter(proto.updates_committed),
706
707            records_indexed: Gauge::gauge(proto.records_indexed),
708            bytes_indexed: Gauge::gauge(proto.bytes_indexed),
709            rehydration_latency_ms: Gauge::gauge(proto.rehydration_latency_ms),
710            snapshot_records_known: Gauge::gauge(proto.snapshot_records_known),
711            snapshot_records_staged: Gauge::gauge(proto.snapshot_records_staged),
712
713            snapshot_committed: Gauge::gauge(proto.snapshot_committed),
714            offset_known: Gauge::gauge(proto.offset_known),
715            offset_committed: Gauge::gauge(proto.offset_committed),
716        })
717    }
718}
719
720/// An update as reported from a storage instance. The semantics
721/// of each field are documented above in `MZ_SINK_STATISTICS_RAW_DESC`,
722/// and encoded in the field types.
723#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
724pub struct SinkStatisticsUpdate {
725    pub id: GlobalId,
726
727    pub messages_staged: Counter,
728    pub messages_committed: Counter,
729    pub bytes_staged: Counter,
730    pub bytes_committed: Counter,
731}
732
733impl SinkStatisticsUpdate {
734    pub fn new(id: GlobalId) -> Self {
735        Self {
736            id,
737            messages_staged: Default::default(),
738            messages_committed: Default::default(),
739            bytes_staged: Default::default(),
740            bytes_committed: Default::default(),
741        }
742    }
743
744    pub fn incorporate(&mut self, other: SinkStatisticsUpdate) {
745        let SinkStatisticsUpdate {
746            messages_staged,
747            messages_committed,
748            bytes_staged,
749            bytes_committed,
750            ..
751        } = self;
752
753        messages_staged.incorporate(other.messages_staged, "messages_staged");
754        messages_committed.incorporate(other.messages_committed, "messages_committed");
755        bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
756        bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
757    }
758
759    /// Incorporate only the counters of the given update, ignoring gauge values.
760    pub fn incorporate_counters(&mut self, other: SinkStatisticsUpdate) {
761        let SinkStatisticsUpdate {
762            messages_staged,
763            messages_committed,
764            bytes_staged,
765            bytes_committed,
766            ..
767        } = self;
768
769        messages_staged.incorporate(other.messages_staged, "messages_staged");
770        messages_committed.incorporate(other.messages_committed, "messages_committed");
771        bytes_staged.incorporate(other.bytes_staged, "bytes_staged");
772        bytes_committed.incorporate(other.bytes_committed, "bytes_committed");
773    }
774
775    pub fn summarize<'a, I, F>(values: F) -> Self
776    where
777        I: IntoIterator<Item = &'a Self>,
778        F: Fn() -> I,
779        Self: 'a,
780    {
781        SinkStatisticsUpdate {
782            id: values().into_iter().next().unwrap().id,
783            messages_staged: Counter::summarize(values().into_iter().map(|s| &s.messages_staged)),
784            messages_committed: Counter::summarize(
785                values().into_iter().map(|s| &s.messages_committed),
786            ),
787            bytes_staged: Counter::summarize(values().into_iter().map(|s| &s.bytes_staged)),
788            bytes_committed: Counter::summarize(values().into_iter().map(|s| &s.bytes_committed)),
789        }
790    }
791
792    /// Reset counters so that we continue to ship diffs to the controller.
793    pub fn reset_counters(&mut self) {
794        self.messages_staged.0 = 0;
795        self.messages_committed.0 = 0;
796        self.bytes_staged.0 = 0;
797        self.bytes_committed.0 = 0;
798    }
799
800    /// Reset all _resetable_ gauges to their default values.
801    pub fn reset_gauges(&self) {}
802}
803
804impl PackableStats for SinkStatisticsUpdate {
805    fn pack(&self, mut packer: mz_repr::RowPacker<'_>) {
806        use mz_repr::Datum;
807        packer.push(Datum::from(self.id.to_string().as_str()));
808        packer.push(Datum::from(self.messages_staged.0));
809        packer.push(Datum::from(self.messages_committed.0));
810        packer.push(Datum::from(self.bytes_staged.0));
811        packer.push(Datum::from(self.bytes_committed.0));
812    }
813
814    fn unpack(row: Row, _metrics: &crate::metrics::StorageControllerMetrics) -> (GlobalId, Self) {
815        let mut iter = row.iter();
816        let s = Self {
817            // Id
818            id: iter.next().unwrap().unwrap_str().parse().unwrap(),
819            // Counters
820            messages_staged: iter.next().unwrap().unwrap_uint64().into(),
821            messages_committed: iter.next().unwrap().unwrap_uint64().into(),
822            bytes_staged: iter.next().unwrap().unwrap_uint64().into(),
823            bytes_committed: iter.next().unwrap().unwrap_uint64().into(),
824        };
825
826        (s.id, s)
827    }
828}
829
830impl RustType<ProtoSinkStatisticsUpdate> for SinkStatisticsUpdate {
831    fn into_proto(&self) -> ProtoSinkStatisticsUpdate {
832        ProtoSinkStatisticsUpdate {
833            id: Some(self.id.into_proto()),
834
835            messages_staged: self.messages_staged.0,
836            messages_committed: self.messages_committed.0,
837            bytes_staged: self.bytes_staged.0,
838            bytes_committed: self.bytes_committed.0,
839        }
840    }
841
842    fn from_proto(proto: ProtoSinkStatisticsUpdate) -> Result<Self, TryFromProtoError> {
843        Ok(SinkStatisticsUpdate {
844            id: proto
845                .id
846                .into_rust_if_some("ProtoSinkStatisticsUpdate::id")?,
847
848            messages_staged: Counter(proto.messages_staged),
849            messages_committed: Counter(proto.messages_committed),
850            bytes_staged: Counter(proto.bytes_staged),
851            bytes_committed: Counter(proto.bytes_committed),
852        })
853    }
854}
855
856/// Statistics for webhooks.
857#[derive(Default, Debug)]
858pub struct WebhookStatistics {
859    pub messages_received: AtomicU64,
860    pub bytes_received: AtomicU64,
861    pub updates_staged: AtomicU64,
862    pub updates_committed: AtomicU64,
863}
864
865impl WebhookStatistics {
866    /// Drain the current statistics into a `SourceStatisticsUpdate` with
867    /// other values defaulted, resetting the atomic counters.
868    pub fn drain_into_update(&self, id: GlobalId) -> SourceStatisticsUpdate {
869        SourceStatisticsUpdate {
870            id,
871            messages_received: self.messages_received.swap(0, Ordering::Relaxed).into(),
872            bytes_received: self.bytes_received.swap(0, Ordering::Relaxed).into(),
873            updates_staged: self.updates_staged.swap(0, Ordering::Relaxed).into(),
874            updates_committed: self.updates_committed.swap(0, Ordering::Relaxed).into(),
875            records_indexed: Gauge::gauge(0),
876            bytes_indexed: Gauge::gauge(0),
877            rehydration_latency_ms: Gauge::gauge(None),
878            snapshot_records_known: Gauge::gauge(None),
879            snapshot_records_staged: Gauge::gauge(None),
880            snapshot_committed: Gauge::gauge(true),
881            offset_known: Gauge::gauge(None::<u64>),
882            offset_committed: Gauge::gauge(None::<u64>),
883        }
884    }
885}