1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Metrics for the storage controller components

use std::sync::Arc;

use mz_ore::cast::{CastFrom, TryCastFrom};
use mz_ore::metric;
use mz_ore::metrics::{
    CounterVecExt, DeleteOnDropCounter, DeleteOnDropHistogram, HistogramVecExt, IntCounterVec,
    MetricsRegistry,
};
use mz_ore::stats::HISTOGRAM_BYTE_BUCKETS;
use mz_service::codec::StatsCollector;
use mz_storage_types::instances::StorageInstanceId;

use crate::client::{ProtoStorageCommand, ProtoStorageResponse};

/// Storage controller metrics
#[derive(Debug, Clone)]
pub struct StorageControllerMetrics {
    messages_sent_bytes: prometheus::HistogramVec,
    messages_received_bytes: prometheus::HistogramVec,
    startup_prepared_statements_kept: prometheus::IntGauge,
    regressed_offset_known: IntCounterVec,
}

impl StorageControllerMetrics {
    pub fn new(metrics_registry: MetricsRegistry) -> Self {
        Self {
            messages_sent_bytes: metrics_registry.register(metric!(
                name: "mz_storage_messages_sent_bytes",
                help: "size of storage messages sent",
                var_labels: ["instance"],
                buckets: HISTOGRAM_BYTE_BUCKETS.to_vec()
            )),

            messages_received_bytes: metrics_registry.register(metric!(
                name: "mz_storage_messages_received_bytes",
                help: "size of storage messages received",
                var_labels: ["instance"],
                buckets: HISTOGRAM_BYTE_BUCKETS.to_vec()
            )),

            startup_prepared_statements_kept: metrics_registry.register(metric!(
                name: "mz_storage_startup_prepared_statements_kept",
                help: "number of prepared statements kept on startup",
            )),
            regressed_offset_known: metrics_registry.register(metric!(
                name: "mz_storage_regressed_offset_known",
                help: "number of regressed offset_known stats for this id",
                var_labels: ["id"],
            )),
        }
    }

    pub fn regressed_offset_known(
        &self,
        id: mz_repr::GlobalId,
    ) -> DeleteOnDropCounter<'static, prometheus::core::AtomicU64, Vec<String>> {
        self.regressed_offset_known
            .get_delete_on_drop_counter(vec![id.to_string()])
    }

    pub fn for_instance(&self, id: StorageInstanceId) -> RehydratingStorageClientMetrics {
        let labels = vec![id.to_string()];
        RehydratingStorageClientMetrics {
            inner: Arc::new(RehydratingStorageClientMetricsInner {
                messages_sent_bytes: self
                    .messages_sent_bytes
                    .get_delete_on_drop_histogram(labels.clone()),
                messages_received_bytes: self
                    .messages_received_bytes
                    .get_delete_on_drop_histogram(labels),
            }),
        }
    }

    pub fn set_startup_prepared_statements_kept(&self, n: u64) {
        let n: i64 = n.try_into().expect("realistic number");
        self.startup_prepared_statements_kept.set(n);
    }
}

#[derive(Debug)]
struct RehydratingStorageClientMetricsInner {
    messages_sent_bytes: DeleteOnDropHistogram<'static, Vec<String>>,
    messages_received_bytes: DeleteOnDropHistogram<'static, Vec<String>>,
}

/// Per-instance metrics
#[derive(Debug, Clone)]
pub struct RehydratingStorageClientMetrics {
    inner: Arc<RehydratingStorageClientMetricsInner>,
}

/// Make ReplicaConnectionMetric pluggable into the gRPC connection.
impl StatsCollector<ProtoStorageCommand, ProtoStorageResponse> for RehydratingStorageClientMetrics {
    fn send_event(&self, _item: &ProtoStorageCommand, size: usize) {
        match f64::try_cast_from(u64::cast_from(size)) {
            Some(x) => self.inner.messages_sent_bytes.observe(x),
            None => tracing::warn!(
                "{} has no precise representation as f64, ignoring message",
                size
            ),
        }
    }

    fn receive_event(&self, _item: &ProtoStorageResponse, size: usize) {
        match f64::try_cast_from(u64::cast_from(size)) {
            Some(x) => self.inner.messages_received_bytes.observe(x),
            None => tracing::warn!(
                "{} has no precise representation as f64, ignoring message",
                size
            ),
        }
    }
}