1use std::sync::Arc;
32
33use mz_ore::metrics::MetricsRegistry;
34use mz_repr::GlobalId;
35
36use crate::statistics::{SinkStatisticsMetricDefs, SourceStatisticsMetricDefs};
37use mz_storage_operators::metrics::BackpressureMetrics;
38
39pub mod decode;
40pub mod sink;
41pub mod source;
42pub mod upsert;
43
44#[derive(Clone, Debug)]
53pub struct StorageMetrics {
54 pub(crate) source_defs: source::SourceMetricDefs,
55 pub(crate) decode_defs: decode::DecodeMetricDefs,
56 pub(crate) upsert_defs: upsert::UpsertMetricDefs,
57 pub(crate) upsert_backpressure_defs: upsert::UpsertBackpressureMetricDefs,
58 pub(crate) sink_defs: sink::SinkMetricDefs,
59
60 pub(crate) source_statistics: SourceStatisticsMetricDefs,
63 pub(crate) sink_statistics: SinkStatisticsMetricDefs,
64}
65
66impl StorageMetrics {
67 pub fn register_with(registry: &MetricsRegistry) -> Self {
69 Self {
70 source_defs: source::SourceMetricDefs::register_with(registry),
71 decode_defs: decode::DecodeMetricDefs::register_with(registry),
72 upsert_defs: upsert::UpsertMetricDefs::register_with(registry),
73 upsert_backpressure_defs: upsert::UpsertBackpressureMetricDefs::register_with(registry),
74 sink_defs: sink::SinkMetricDefs::register_with(registry),
75 source_statistics: SourceStatisticsMetricDefs::register_with(registry),
76 sink_statistics: SinkStatisticsMetricDefs::register_with(registry),
77 }
78 }
79
80 pub(crate) fn get_backpressure_metrics(
82 &self,
83 id: GlobalId,
84 index: usize,
85 ) -> BackpressureMetrics {
86 BackpressureMetrics {
87 emitted_bytes: Arc::new(
88 self.upsert_backpressure_defs
89 .emitted_bytes
90 .get_delete_on_drop_metric(vec![id.to_string(), index.to_string()]),
91 ),
92 last_backpressured_bytes: Arc::new(
93 self.upsert_backpressure_defs
94 .last_backpressured_bytes
95 .get_delete_on_drop_metric(vec![id.to_string(), index.to_string()]),
96 ),
97 retired_bytes: Arc::new(
98 self.upsert_backpressure_defs
99 .retired_bytes
100 .get_delete_on_drop_metric(vec![id.to_string(), index.to_string()]),
101 ),
102 }
103 }
104
105 pub(crate) fn get_upsert_metrics(
107 &self,
108 id: GlobalId,
109 worker_id: usize,
110 backpressure_metrics: Option<BackpressureMetrics>,
111 ) -> upsert::UpsertMetrics {
112 upsert::UpsertMetrics::new(&self.upsert_defs, id, worker_id, backpressure_metrics)
113 }
114
115 pub(crate) fn get_source_persist_sink_metrics(
117 &self,
118 export_id: GlobalId,
119 primary_source_id: GlobalId,
120 worker_id: usize,
121 data_shard: &mz_persist_client::ShardId,
122 ) -> source::SourcePersistSinkMetrics {
123 source::SourcePersistSinkMetrics::new(
124 &self.source_defs.source_defs,
125 export_id,
126 primary_source_id,
127 worker_id,
128 data_shard,
129 )
130 }
131
132 pub(crate) fn get_source_metrics(
134 &self,
135 id: GlobalId,
136 worker_id: usize,
137 ) -> source::SourceMetrics {
138 source::SourceMetrics::new(&self.source_defs.source_defs, id, worker_id)
139 }
140
141 pub(crate) fn get_postgres_source_metrics(
143 &self,
144 id: GlobalId,
145 ) -> source::postgres::PgSourceMetrics {
146 source::postgres::PgSourceMetrics::new(&self.source_defs.postgres_defs, id)
147 }
148
149 pub(crate) fn get_mysql_source_metrics(
151 &self,
152 id: GlobalId,
153 ) -> source::mysql::MySqlSourceMetrics {
154 source::mysql::MySqlSourceMetrics::new(&self.source_defs.mysql_defs, id)
155 }
156
157 pub(crate) fn get_offset_commit_metrics(&self, id: GlobalId) -> source::OffsetCommitMetrics {
159 source::OffsetCommitMetrics::new(&self.source_defs.source_defs, id)
160 }
161
162 pub(crate) fn get_kafka_source_metrics(
164 &self,
165 ids: Vec<i32>,
166 topic: String,
167 source_id: GlobalId,
168 ) -> source::kafka::KafkaSourceMetrics {
169 source::kafka::KafkaSourceMetrics::new(
170 &self.source_defs.kafka_source_defs,
171 ids,
172 topic,
173 source_id,
174 )
175 }
176
177 pub(crate) fn get_kafka_sink_metrics(
179 &self,
180 sink_id: GlobalId,
181 ) -> sink::kafka::KafkaSinkMetrics {
182 sink::kafka::KafkaSinkMetrics::new(&self.sink_defs.kafka_defs, sink_id)
183 }
184}