1use mz_ore::metric;
11use mz_ore::metrics::MetricsRegistry;
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21 pub query_total: IntCounterVec,
22 pub active_sessions: IntGaugeVec,
23 pub active_subscribes: IntGaugeVec,
24 pub active_copy_tos: IntGaugeVec,
25 pub queue_busy_seconds: Histogram,
26 pub determine_timestamp: IntCounterVec,
27 pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28 pub commands: IntCounterVec,
29 pub storage_usage_collection_time_seconds: Histogram,
30 pub subscribe_outputs: IntCounterVec,
31 pub canceled_peeks: IntCounter,
32 pub linearize_message_seconds: HistogramVec,
33 pub time_to_first_row_seconds: HistogramVec,
34 pub statement_logging_records: IntCounterVec,
35 pub statement_logging_unsampled_bytes: IntCounter,
36 pub statement_logging_actual_bytes: IntCounter,
37 pub message_batch: Histogram,
38 pub message_handling: HistogramVec,
39 pub optimization_notices: IntCounterVec,
40 pub append_table_duration_seconds: Histogram,
41 pub webhook_validation_reduce_failures: IntCounterVec,
42 pub webhook_get_appender: IntCounter,
43 pub check_scheduling_policies_seconds: HistogramVec,
44 pub handle_scheduling_decisions_seconds: HistogramVec,
45 pub row_set_finishing_seconds: Histogram,
46 pub session_startup_table_writes_seconds: Histogram,
47 pub parse_seconds: Histogram,
48 pub pgwire_message_processing_seconds: HistogramVec,
49 pub result_rows_first_to_last_byte_seconds: HistogramVec,
50 pub pgwire_ensure_transaction_seconds: HistogramVec,
51 pub catalog_snapshot_seconds: HistogramVec,
52 pub pgwire_recv_scheduling_delay_ms: HistogramVec,
53 pub catalog_transact_seconds: HistogramVec,
54 pub apply_catalog_implications_seconds: Histogram,
55 pub group_commit_confirm_leadership_seconds: Histogram,
56 pub group_commit_table_advancement_seconds: Histogram,
57}
58
59impl Metrics {
60 pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
61 Self {
62 query_total: registry.register(metric!(
63 name: "mz_query_total",
64 help: "The total number of queries issued of the given type since process start.",
65 var_labels: ["session_type", "statement_type"],
66 )),
67 active_sessions: registry.register(metric!(
68 name: "mz_active_sessions",
69 help: "The number of active coordinator sessions.",
70 var_labels: ["session_type"],
71 )),
72 active_subscribes: registry.register(metric!(
73 name: "mz_active_subscribes",
74 help: "The number of active SUBSCRIBE queries.",
75 var_labels: ["session_type"],
76 )),
77 active_copy_tos: registry.register(metric!(
78 name: "mz_active_copy_tos",
79 help: "The number of active COPY TO queries.",
80 var_labels: ["session_type"],
81 )),
82 queue_busy_seconds: registry.register(metric!(
83 name: "mz_coord_queue_busy_seconds",
84 help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
85 buckets: histogram_seconds_buckets(0.000_128, 32.0)
86 )),
87 determine_timestamp: registry.register(metric!(
88 name: "mz_determine_timestamp",
89 help: "The total number of calls to determine_timestamp.",
90 var_labels:["respond_immediately", "isolation_level", "compute_instance", "determination_method"],
91 )),
92 timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
93 name: "mz_timestamp_difference_for_strict_serializable_ms",
94 help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
95 var_labels:["compute_instance", "determination_method"],
96 buckets: histogram_milliseconds_buckets(1., 8000.),
97 )),
98 commands: registry.register(metric!(
99 name: "mz_adapter_commands",
100 help: "The total number of adapter commands issued of the given type since process start.",
101 var_labels: ["command_type", "status", "application_name"],
102 )),
103 storage_usage_collection_time_seconds: registry.register(metric!(
104 name: "mz_storage_usage_collection_time_seconds",
105 help: "The number of seconds the coord spends collecting usage metrics from storage.",
106 buckets: histogram_seconds_buckets(0.000_128, 8.0)
107 )),
108 subscribe_outputs: registry.register(metric!(
109 name: "mz_subscribe_outputs",
110 help: "The total number of different subscribe outputs used",
111 var_labels: ["session_type", "subscribe_output"],
112 )),
113 canceled_peeks: registry.register(metric!(
114 name: "mz_canceled_peeks_total",
115 help: "The total number of canceled peeks since process start.",
116 )),
117 linearize_message_seconds: registry.register(metric!(
118 name: "mz_linearize_message_seconds",
119 help: "The number of seconds it takes to linearize strict serializable messages",
120 var_labels: ["type", "immediately_handled"],
121 buckets: histogram_seconds_buckets(0.000_128, 8.0),
122 )),
123 time_to_first_row_seconds: registry.register(metric! {
124 name: "mz_time_to_first_row_seconds",
125 help: "Latency of an execute for a successful query from pgwire's perspective",
126 var_labels: ["instance_id", "isolation_level", "strategy"],
127 buckets: histogram_seconds_buckets(0.000_128, 32.0)
128 }),
129 statement_logging_records: registry.register(metric! {
130 name: "mz_statement_logging_record_count",
131 help: "The total number of SQL statements tagged with whether or not they were recorded.",
132 var_labels: ["sample"],
133 }),
134 statement_logging_unsampled_bytes: registry.register(metric!(
135 name: "mz_statement_logging_unsampled_bytes",
136 help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
137 )),
138 statement_logging_actual_bytes: registry.register(metric!(
139 name: "mz_statement_logging_actual_bytes",
140 help: "The total amount of SQL text that was logged by statement logging.",
141 )),
142 message_batch: registry.register(metric!(
143 name: "mz_coordinator_message_batch_size",
144 help: "Message batch size handled by the coordinator.",
145 buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
146 )),
147 message_handling: registry.register(metric!(
148 name: "mz_slow_message_handling",
149 help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
150 var_labels: ["message_kind"],
151 buckets: histogram_seconds_buckets(0.000_128, 512.0),
152 )),
153 optimization_notices: registry.register(metric!(
154 name: "mz_optimization_notices",
155 help: "Number of optimization notices per notice type.",
156 var_labels: ["notice_type"],
157 )),
158 append_table_duration_seconds: registry.register(metric!(
159 name: "mz_append_table_duration_seconds",
160 help: "Latency for appending to any (user or system) table.",
161 buckets: histogram_seconds_buckets(0.128, 32.0),
162 )),
163 webhook_validation_reduce_failures: registry.register(metric!(
164 name: "mz_webhook_validation_reduce_failures",
165 help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
166 var_labels: ["reason"],
167 )),
168 webhook_get_appender: registry.register(metric!(
169 name: "mz_webhook_get_appender_count",
170 help: "Count of getting a webhook appender from the Coordinator.",
171 )),
172 check_scheduling_policies_seconds: registry.register(metric!(
173 name: "mz_check_scheduling_policies_seconds",
174 help: "The time each policy in `check_scheduling_policies` takes.",
175 var_labels: ["policy", "thread"],
176 buckets: histogram_seconds_buckets(0.000_128, 8.0),
177 )),
178 handle_scheduling_decisions_seconds: registry.register(metric!(
179 name: "mz_handle_scheduling_decisions_seconds",
180 help: "The time `handle_scheduling_decisions` takes.",
181 var_labels: ["altered_a_cluster"],
182 buckets: histogram_seconds_buckets(0.000_128, 8.0),
183 )),
184 row_set_finishing_seconds: registry.register(metric!(
185 name: "mz_row_set_finishing_seconds",
186 help: "The time it takes to run RowSetFinishing::finish.",
187 buckets: histogram_seconds_buckets(0.000_128, 16.0),
188 )),
189 session_startup_table_writes_seconds: registry.register(metric!(
190 name: "mz_session_startup_table_writes_seconds",
191 help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
192 buckets: histogram_seconds_buckets(0.000_008, 4.0),
193 )),
194 parse_seconds: registry.register(metric!(
195 name: "mz_parse_seconds",
196 help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
197 buckets: histogram_seconds_buckets(0.001, 8.0),
198 )),
199 pgwire_message_processing_seconds: registry.register(metric!(
200 name: "mz_pgwire_message_processing_seconds",
201 help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
202 var_labels: ["message_type"],
203 buckets: histogram_seconds_buckets(0.001, 512.0),
204 )),
205 result_rows_first_to_last_byte_seconds: registry.register(metric!(
206 name: "mz_result_rows_first_to_last_byte_seconds",
207 help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
208 var_labels: ["statement_type"],
209 buckets: histogram_seconds_buckets(0.001, 8192.0),
210 )),
211 pgwire_ensure_transaction_seconds: registry.register(metric!(
212 name: "mz_pgwire_ensure_transaction_seconds",
213 help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
214 var_labels: ["message_type"],
215 buckets: histogram_seconds_buckets(0.001, 512.0),
216 )),
217 catalog_snapshot_seconds: registry.register(metric!(
218 name: "mz_catalog_snapshot_seconds",
219 help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
220 var_labels: ["context"],
221 buckets: histogram_seconds_buckets(0.001, 512.0),
222 )),
223 pgwire_recv_scheduling_delay_ms: registry.register(metric!(
224 name: "mz_pgwire_recv_scheduling_delay_ms",
225 help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
226 var_labels: ["message_type"],
227 buckets: histogram_milliseconds_buckets(0.128, 512000.),
228 )),
229 catalog_transact_seconds: registry.register(metric!(
230 name: "mz_catalog_transact_seconds",
231 help: "The time it takes to run various catalog transact methods.",
232 var_labels: ["method"],
233 buckets: histogram_seconds_buckets(0.001, 32.0),
234 )),
235 apply_catalog_implications_seconds: registry.register(metric!(
236 name: "mz_apply_catalog_implications_seconds",
237 help: "The time it takes to apply catalog implications.",
238 buckets: histogram_seconds_buckets(0.001, 32.0),
239 )),
240 group_commit_confirm_leadership_seconds: registry.register(metric!(
241 name: "mz_group_commit_confirm_leadership_seconds",
242 help: "The time it takes to confirm leadership during group commit.",
243 buckets: histogram_seconds_buckets(0.001, 32.0),
244 )),
245 group_commit_table_advancement_seconds: registry.register(metric!(
246 name: "mz_group_commit_table_advancement_seconds",
247 help: "The time it takes to iterate over all catalog entries to find tables during group commit.",
248 buckets: histogram_seconds_buckets(0.001, 32.0),
249 ))
250 }
251 }
252
253 pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
254 self.row_set_finishing_seconds.clone()
255 }
256
257 pub(crate) fn session_metrics(&self) -> SessionMetrics {
258 SessionMetrics {
259 row_set_finishing_seconds: self.row_set_finishing_seconds(),
260 session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
261 query_total: self.query_total.clone(),
262 determine_timestamp: self.determine_timestamp.clone(),
263 timestamp_difference_for_strict_serializable_ms: self
264 .timestamp_difference_for_strict_serializable_ms
265 .clone(),
266 optimization_notices: self.optimization_notices.clone(),
267 statement_logging_records: self.statement_logging_records.clone(),
268 statement_logging_unsampled_bytes: self.statement_logging_unsampled_bytes.clone(),
269 statement_logging_actual_bytes: self.statement_logging_actual_bytes.clone(),
270 }
271 }
272}
273
274#[derive(Debug, Clone)]
276pub struct SessionMetrics {
277 row_set_finishing_seconds: Histogram,
278 session_startup_table_writes_seconds: Histogram,
279 query_total: IntCounterVec,
280 determine_timestamp: IntCounterVec,
281 timestamp_difference_for_strict_serializable_ms: HistogramVec,
282 optimization_notices: IntCounterVec,
283 statement_logging_records: IntCounterVec,
284 statement_logging_unsampled_bytes: IntCounter,
285 statement_logging_actual_bytes: IntCounter,
286}
287
288impl SessionMetrics {
289 pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
290 &self.row_set_finishing_seconds
291 }
292
293 pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
294 &self.session_startup_table_writes_seconds
295 }
296
297 pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
298 self.query_total.with_label_values(label_values)
299 }
300
301 pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
302 self.determine_timestamp.with_label_values(label_values)
303 }
304
305 pub(crate) fn timestamp_difference_for_strict_serializable_ms(
306 &self,
307 label_values: &[&str],
308 ) -> Histogram {
309 self.timestamp_difference_for_strict_serializable_ms
310 .with_label_values(label_values)
311 }
312
313 pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
314 self.optimization_notices.with_label_values(label_values)
315 }
316
317 pub(crate) fn statement_logging_records(
318 &self,
319 label_values: &[&str],
320 ) -> GenericCounter<AtomicU64> {
321 self.statement_logging_records
322 .with_label_values(label_values)
323 }
324
325 pub(crate) fn statement_logging_unsampled_bytes(&self) -> &IntCounter {
326 &self.statement_logging_unsampled_bytes
327 }
328
329 pub(crate) fn statement_logging_actual_bytes(&self) -> &IntCounter {
330 &self.statement_logging_actual_bytes
331 }
332}
333
334pub(crate) fn session_type_label_value(user: &User) -> &'static str {
335 match user.is_internal() {
336 true => "system",
337 false => "user",
338 }
339}
340
341pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
342where
343 T: AstInfo,
344{
345 statement_kind_label_value(StatementKind::from(stmt))
346}
347
348pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
349where
350 T: AstInfo,
351{
352 match output {
353 SubscribeOutput::Diffs => "diffs",
354 SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
355 SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
356 SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
357 }
358}