1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Metrics for all things storage.
11//!
12//! The structure of this module is designed to make adding new metrics as easy as possible. The
13//! structure and naming conventions are as follows:
14//!
15//! Metrics for X end up in the `x.rs` submodule, unless X fits into one of the existing
16//! submodules. The struct `XMetricsDefs` defines the `CounterVec/GaugeVec/etc`'s that must be
17//! registered with the `MetricsRegistry` to create new metrics. `XMetricsDefs` should be a
18//! sub-field of `StorageMetrics` (or recursively a sub-field). `XMetricsDefs` has a
19//! `register_with` function to create it using a `MetricsRegistry`.
20//!
21//! `XMetrics` contains the actual gauges/counters/etc that are created using `XMetricsDefs`.
22//! Typically these are created with `new` functions that takes a `&XMetricsDefs`, a `GlobalId`,
23//! and a worker id, but sometimes more complex schemes are used, for metrics that are globally
24//! shared, or have some other shape to their labels.
25//!
26//! `StorageMetrics` is the main entry-point to this module, and for convenience, typically
27//! provides a `get_x_metrics` to obtain an `XMetrics` struct. This is to prevent users from
28//! needing to interact with metrics _definitions_ into the code that actually bumps those
29//! metrics.
3031use std::sync::Arc;
3233use mz_ore::metrics::MetricsRegistry;
34use mz_repr::GlobalId;
3536use crate::statistics::{SinkStatisticsMetricDefs, SourceStatisticsMetricDefs};
37use mz_storage_operators::metrics::BackpressureMetrics;
3839pub mod decode;
40pub mod sink;
41pub mod source;
42pub mod upsert;
4344/// A top-level struct holding all various _definitions_ of all metrics
45/// use by the `mz-storage` crate.
46///
47/// Created by registering it with a `MetricsRegistry`, it also provides helpers
48/// to obtain various _instantiated_ time-series, either per-worker, shared globally,
49/// or some more specific labeling scheme.
50///
51/// This struct can be cloned, and the various definitions are shared.
52#[derive(Clone, Debug)]
53pub struct StorageMetrics {
54pub(crate) source_defs: source::SourceMetricDefs,
55pub(crate) decode_defs: decode::DecodeMetricDefs,
56pub(crate) upsert_defs: upsert::UpsertMetricDefs,
57pub(crate) upsert_backpressure_defs: upsert::UpsertBackpressureMetricDefs,
58pub(crate) sink_defs: sink::SinkMetricDefs,
5960// Defined in the `statistics` module, as they are kept in sync with
61 // user-facing data.
62pub(crate) source_statistics: SourceStatisticsMetricDefs,
63pub(crate) sink_statistics: SinkStatisticsMetricDefs,
64}
6566impl StorageMetrics {
67/// Register all metrics with the `MetricsRegistry`.
68pub fn register_with(registry: &MetricsRegistry) -> Self {
69Self {
70 source_defs: source::SourceMetricDefs::register_with(registry),
71 decode_defs: decode::DecodeMetricDefs::register_with(registry),
72 upsert_defs: upsert::UpsertMetricDefs::register_with(registry),
73 upsert_backpressure_defs: upsert::UpsertBackpressureMetricDefs::register_with(registry),
74 sink_defs: sink::SinkMetricDefs::register_with(registry),
75 source_statistics: SourceStatisticsMetricDefs::register_with(registry),
76 sink_statistics: SinkStatisticsMetricDefs::register_with(registry),
77 }
78 }
7980/// Get a `BackpressureMetrics` for the given id and worker id.
81pub(crate) fn get_backpressure_metrics(
82&self,
83 id: GlobalId,
84 index: usize,
85 ) -> BackpressureMetrics {
86 BackpressureMetrics {
87 emitted_bytes: Arc::new(
88self.upsert_backpressure_defs
89 .emitted_bytes
90 .get_delete_on_drop_metric(vec![id.to_string(), index.to_string()]),
91 ),
92 last_backpressured_bytes: Arc::new(
93self.upsert_backpressure_defs
94 .last_backpressured_bytes
95 .get_delete_on_drop_metric(vec![id.to_string(), index.to_string()]),
96 ),
97 retired_bytes: Arc::new(
98self.upsert_backpressure_defs
99 .retired_bytes
100 .get_delete_on_drop_metric(vec![id.to_string(), index.to_string()]),
101 ),
102 }
103 }
104105/// Get an `UpsertMetrics` for the given id and worker id (and optional `BackpressureMetrics`).
106pub(crate) fn get_upsert_metrics(
107&self,
108 id: GlobalId,
109 worker_id: usize,
110 backpressure_metrics: Option<BackpressureMetrics>,
111 ) -> upsert::UpsertMetrics {
112 upsert::UpsertMetrics::new(&self.upsert_defs, id, worker_id, backpressure_metrics)
113 }
114115/// Get a `SourcePersistSinkMetrics` for the given configuration.
116pub(crate) fn get_source_persist_sink_metrics(
117&self,
118 export_id: GlobalId,
119 primary_source_id: GlobalId,
120 worker_id: usize,
121 data_shard: &mz_persist_client::ShardId,
122 ) -> source::SourcePersistSinkMetrics {
123 source::SourcePersistSinkMetrics::new(
124&self.source_defs.source_defs,
125 export_id,
126 primary_source_id,
127 worker_id,
128 data_shard,
129 )
130 }
131132/// Get a `SourceMetrics` for the given id and worker id.
133pub(crate) fn get_source_metrics(
134&self,
135 id: GlobalId,
136 worker_id: usize,
137 ) -> source::SourceMetrics {
138 source::SourceMetrics::new(&self.source_defs.source_defs, id, worker_id)
139 }
140141/// Get a `PgMetrics` for the given id.
142pub(crate) fn get_postgres_source_metrics(
143&self,
144 id: GlobalId,
145 ) -> source::postgres::PgSourceMetrics {
146 source::postgres::PgSourceMetrics::new(&self.source_defs.postgres_defs, id)
147 }
148149/// Get a `MySqlSourceMetrics` for the given id.
150pub(crate) fn get_mysql_source_metrics(
151&self,
152 id: GlobalId,
153 ) -> source::mysql::MySqlSourceMetrics {
154 source::mysql::MySqlSourceMetrics::new(&self.source_defs.mysql_defs, id)
155 }
156157/// Get an `OffsetCommitMetrics` for the given id.
158pub(crate) fn get_offset_commit_metrics(&self, id: GlobalId) -> source::OffsetCommitMetrics {
159 source::OffsetCommitMetrics::new(&self.source_defs.source_defs, id)
160 }
161162/// Get an `KafkaSourceMetrics` for the given configuration.
163pub(crate) fn get_kafka_source_metrics(
164&self,
165 ids: Vec<i32>,
166 topic: String,
167 source_id: GlobalId,
168 ) -> source::kafka::KafkaSourceMetrics {
169 source::kafka::KafkaSourceMetrics::new(
170&self.source_defs.kafka_source_defs,
171 ids,
172 topic,
173 source_id,
174 )
175 }
176177/// Get an `KafkaSinkMetrics` for the given configuration.
178pub(crate) fn get_kafka_sink_metrics(
179&self,
180 sink_id: GlobalId,
181 ) -> sink::kafka::KafkaSinkMetrics {
182 sink::kafka::KafkaSinkMetrics::new(&self.sink_defs.kafka_defs, sink_id)
183 }
184}