1use mz_ore::metric;
11use mz_ore::metrics::{MetricVisibility, MetricsRegistry};
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21 pub query_total: IntCounterVec,
22 pub active_sessions: IntGaugeVec,
23 pub active_subscribes: IntGaugeVec,
24 pub active_copy_tos: IntGaugeVec,
25 pub queue_busy_seconds: Histogram,
26 pub determine_timestamp: IntCounterVec,
27 pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28 pub timestamp_difference_for_bounded_staleness_ms: HistogramVec,
29 pub commands: IntCounterVec,
30 pub storage_usage_collection_time_seconds: Histogram,
31 pub arrangement_sizes_collection_time_seconds: Histogram,
32 pub arrangement_sizes_rows_written: IntCounter,
33 pub subscribe_outputs: IntCounterVec,
34 pub canceled_peeks: IntCounter,
35 pub linearize_message_seconds: HistogramVec,
36 pub time_to_first_row_seconds: HistogramVec,
37 pub statement_logging_records: IntCounterVec,
38 pub statement_logging_unsampled_bytes: IntCounter,
39 pub statement_logging_actual_bytes: IntCounter,
40 pub message_batch: Histogram,
41 pub message_handling: HistogramVec,
42 pub optimization_notices: IntCounterVec,
43 pub append_table_duration_seconds: Histogram,
44 pub webhook_validation_reduce_failures: IntCounterVec,
45 pub webhook_get_appender: IntCounter,
46 pub check_scheduling_policies_seconds: HistogramVec,
47 pub handle_scheduling_decisions_seconds: HistogramVec,
48 pub row_set_finishing_seconds: Histogram,
49 pub session_startup_table_writes_seconds: Histogram,
50 pub parse_seconds: Histogram,
51 pub pgwire_message_processing_seconds: HistogramVec,
52 pub result_rows_first_to_last_byte_seconds: HistogramVec,
53 pub pgwire_ensure_transaction_seconds: HistogramVec,
54 pub catalog_snapshot_seconds: HistogramVec,
55 pub pgwire_recv_scheduling_delay_ms: HistogramVec,
56 pub catalog_transact_seconds: HistogramVec,
57 pub apply_catalog_implications_seconds: Histogram,
58 pub group_commit_catalog_upper_seconds: Histogram,
59}
60
61impl Metrics {
62 pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
63 Self {
64 query_total: registry.register(metric!(
65 name: "mz_query_total",
66 help: "The total number of queries issued of the given type since process start.",
67 var_labels: ["session_type", "statement_type"],
68 visibility: MetricVisibility::Public,
69 )),
70 active_sessions: registry.register(metric!(
71 name: "mz_active_sessions",
72 help: "The number of active coordinator sessions.",
73 var_labels: ["session_type"],
74 visibility: MetricVisibility::Public,
75 )),
76 active_subscribes: registry.register(metric!(
77 name: "mz_active_subscribes",
78 help: "The number of active SUBSCRIBE queries.",
79 var_labels: ["session_type"],
80 visibility: MetricVisibility::Public,
81 )),
82 active_copy_tos: registry.register(metric!(
83 name: "mz_active_copy_tos",
84 help: "The number of active COPY TO queries.",
85 var_labels: ["session_type"],
86 )),
87 queue_busy_seconds: registry.register(metric!(
88 name: "mz_coord_queue_busy_seconds",
89 help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
90 buckets: histogram_seconds_buckets(0.000_128, 32.0)
91 )),
92 determine_timestamp: registry.register(metric!(
93 name: "mz_determine_timestamp",
94 help: "The total number of calls to determine_timestamp.",
95 var_labels:["respond_immediately", "isolation_level", "compute_instance"],
96 )),
97 timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
98 name: "mz_timestamp_difference_for_strict_serializable_ms",
99 help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
100 var_labels:["compute_instance"],
101 buckets: histogram_milliseconds_buckets(1., 8000.),
102 )),
103 timestamp_difference_for_bounded_staleness_ms: registry.register(metric!(
104 name: "mz_timestamp_difference_for_bounded_staleness_ms",
105 help: "How much older bounded-staleness timestamps are compared to serializable, in milliseconds. Measures the actual staleness incurred.",
106 var_labels:["compute_instance"],
107 buckets: histogram_milliseconds_buckets(1., 8000.),
108 )),
109 commands: registry.register(metric!(
110 name: "mz_adapter_commands",
111 help: "The total number of adapter commands issued of the given type since process start.",
112 var_labels: ["command_type", "status", "application_name"],
113 visibility: MetricVisibility::Public,
114 )),
115 storage_usage_collection_time_seconds: registry.register(metric!(
116 name: "mz_storage_usage_collection_time_seconds",
117 help: "The number of seconds the coord spends collecting usage metrics from storage.",
118 buckets: histogram_seconds_buckets(0.000_128, 8.0)
119 )),
120 arrangement_sizes_collection_time_seconds: registry.register(metric!(
121 name: "mz_arrangement_sizes_collection_time_seconds",
122 help: "Seconds to read mz_object_arrangement_sizes and prepare history-table updates for one snapshot.",
123 buckets: histogram_seconds_buckets(0.000_128, 8.0)
124 )),
125 arrangement_sizes_rows_written: registry.register(metric!(
126 name: "mz_arrangement_sizes_rows_written_total",
127 help: "Total rows appended to mz_object_arrangement_size_history since process start.",
128 )),
129 subscribe_outputs: registry.register(metric!(
130 name: "mz_subscribe_outputs",
131 help: "The total number of different subscribe outputs used",
132 var_labels: ["session_type", "subscribe_output"],
133 )),
134 canceled_peeks: registry.register(metric!(
135 name: "mz_canceled_peeks_total",
136 help: "The total number of canceled peeks since process start.",
137 )),
138 linearize_message_seconds: registry.register(metric!(
139 name: "mz_linearize_message_seconds",
140 help: "The number of seconds it takes to linearize strict serializable messages",
141 var_labels: ["type", "immediately_handled"],
142 buckets: histogram_seconds_buckets(0.000_128, 8.0),
143 )),
144 time_to_first_row_seconds: registry.register(metric! {
145 name: "mz_time_to_first_row_seconds",
146 help: "Latency of an execute for a successful query from pgwire's perspective",
147 var_labels: ["instance_id", "isolation_level", "strategy", "application_name"],
148 buckets: histogram_seconds_buckets(0.000_128, 32.0)
149 }),
150 statement_logging_records: registry.register(metric! {
151 name: "mz_statement_logging_record_count",
152 help: "The total number of SQL statements tagged with whether or not they were recorded.",
153 var_labels: ["sample"],
154 }),
155 statement_logging_unsampled_bytes: registry.register(metric!(
156 name: "mz_statement_logging_unsampled_bytes",
157 help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
158 )),
159 statement_logging_actual_bytes: registry.register(metric!(
160 name: "mz_statement_logging_actual_bytes",
161 help: "The total amount of SQL text that was logged by statement logging.",
162 )),
163 message_batch: registry.register(metric!(
164 name: "mz_coordinator_message_batch_size",
165 help: "Message batch size handled by the coordinator.",
166 buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
167 )),
168 message_handling: registry.register(metric!(
169 name: "mz_slow_message_handling",
170 help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
171 var_labels: ["message_kind"],
172 buckets: histogram_seconds_buckets(0.000_128, 512.0),
173 )),
174 optimization_notices: registry.register(metric!(
175 name: "mz_optimization_notices",
176 help: "Number of optimization notices per notice type.",
177 var_labels: ["notice_type"],
178 )),
179 append_table_duration_seconds: registry.register(metric!(
180 name: "mz_append_table_duration_seconds",
181 help: "Latency for appending to any (user or system) table.",
182 buckets: histogram_seconds_buckets(0.128, 32.0),
183 )),
184 webhook_validation_reduce_failures: registry.register(metric!(
185 name: "mz_webhook_validation_reduce_failures",
186 help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
187 var_labels: ["reason"],
188 )),
189 webhook_get_appender: registry.register(metric!(
190 name: "mz_webhook_get_appender_count",
191 help: "Count of getting a webhook appender from the Coordinator.",
192 )),
193 check_scheduling_policies_seconds: registry.register(metric!(
194 name: "mz_check_scheduling_policies_seconds",
195 help: "The time each policy in `check_scheduling_policies` takes.",
196 var_labels: ["policy", "thread"],
197 buckets: histogram_seconds_buckets(0.000_128, 8.0),
198 )),
199 handle_scheduling_decisions_seconds: registry.register(metric!(
200 name: "mz_handle_scheduling_decisions_seconds",
201 help: "The time `handle_scheduling_decisions` takes.",
202 var_labels: ["altered_a_cluster"],
203 buckets: histogram_seconds_buckets(0.000_128, 8.0),
204 )),
205 row_set_finishing_seconds: registry.register(metric!(
206 name: "mz_row_set_finishing_seconds",
207 help: "The time it takes to run RowSetFinishing::finish.",
208 buckets: histogram_seconds_buckets(0.000_128, 16.0),
209 )),
210 session_startup_table_writes_seconds: registry.register(metric!(
211 name: "mz_session_startup_table_writes_seconds",
212 help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
213 buckets: histogram_seconds_buckets(0.000_008, 4.0),
214 )),
215 parse_seconds: registry.register(metric!(
216 name: "mz_parse_seconds",
217 help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
218 buckets: histogram_seconds_buckets(0.001, 8.0),
219 )),
220 pgwire_message_processing_seconds: registry.register(metric!(
221 name: "mz_pgwire_message_processing_seconds",
222 help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
223 var_labels: ["message_type"],
224 buckets: histogram_seconds_buckets(0.001, 512.0),
225 )),
226 result_rows_first_to_last_byte_seconds: registry.register(metric!(
227 name: "mz_result_rows_first_to_last_byte_seconds",
228 help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
229 var_labels: ["statement_type"],
230 buckets: histogram_seconds_buckets(0.001, 8192.0),
231 )),
232 pgwire_ensure_transaction_seconds: registry.register(metric!(
233 name: "mz_pgwire_ensure_transaction_seconds",
234 help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
235 var_labels: ["message_type"],
236 buckets: histogram_seconds_buckets(0.001, 512.0),
237 )),
238 catalog_snapshot_seconds: registry.register(metric!(
239 name: "mz_catalog_snapshot_seconds",
240 help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
241 var_labels: ["context"],
242 buckets: histogram_seconds_buckets(0.001, 512.0),
243 )),
244 pgwire_recv_scheduling_delay_ms: registry.register(metric!(
245 name: "mz_pgwire_recv_scheduling_delay_ms",
246 help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
247 var_labels: ["message_type"],
248 buckets: histogram_milliseconds_buckets(0.128, 512000.),
249 )),
250 catalog_transact_seconds: registry.register(metric!(
251 name: "mz_catalog_transact_seconds",
252 help: "The time it takes to run various catalog transact methods.",
253 var_labels: ["method"],
254 buckets: histogram_seconds_buckets(0.001, 32.0),
255 )),
256 apply_catalog_implications_seconds: registry.register(metric!(
257 name: "mz_apply_catalog_implications_seconds",
258 help: "The time it takes to apply catalog implications.",
259 buckets: histogram_seconds_buckets(0.001, 32.0),
260 )),
261 group_commit_catalog_upper_seconds: registry.register(metric!(
262 name: "mz_group_commit_catalog_upper_seconds",
263 help: "The time it takes to advance the catalog shard upper during group commit.",
264 buckets: histogram_seconds_buckets(0.001, 32.0),
265 )),
266 }
267 }
268
269 pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
270 self.row_set_finishing_seconds.clone()
271 }
272
273 pub(crate) fn session_metrics(&self) -> SessionMetrics {
274 SessionMetrics {
275 row_set_finishing_seconds: self.row_set_finishing_seconds(),
276 session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
277 query_total: self.query_total.clone(),
278 determine_timestamp: self.determine_timestamp.clone(),
279 timestamp_difference_for_strict_serializable_ms: self
280 .timestamp_difference_for_strict_serializable_ms
281 .clone(),
282 timestamp_difference_for_bounded_staleness_ms: self
283 .timestamp_difference_for_bounded_staleness_ms
284 .clone(),
285 optimization_notices: self.optimization_notices.clone(),
286 statement_logging_records: self.statement_logging_records.clone(),
287 statement_logging_unsampled_bytes: self.statement_logging_unsampled_bytes.clone(),
288 statement_logging_actual_bytes: self.statement_logging_actual_bytes.clone(),
289 }
290 }
291}
292
293#[derive(Debug, Clone)]
295pub struct SessionMetrics {
296 row_set_finishing_seconds: Histogram,
297 session_startup_table_writes_seconds: Histogram,
298 query_total: IntCounterVec,
299 determine_timestamp: IntCounterVec,
300 timestamp_difference_for_strict_serializable_ms: HistogramVec,
301 timestamp_difference_for_bounded_staleness_ms: HistogramVec,
302 optimization_notices: IntCounterVec,
303 statement_logging_records: IntCounterVec,
304 statement_logging_unsampled_bytes: IntCounter,
305 statement_logging_actual_bytes: IntCounter,
306}
307
308impl SessionMetrics {
309 pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
310 &self.row_set_finishing_seconds
311 }
312
313 pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
314 &self.session_startup_table_writes_seconds
315 }
316
317 pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
318 self.query_total.with_label_values(label_values)
319 }
320
321 pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
322 self.determine_timestamp.with_label_values(label_values)
323 }
324
325 pub(crate) fn timestamp_difference_for_strict_serializable_ms(
326 &self,
327 label_values: &[&str],
328 ) -> Histogram {
329 self.timestamp_difference_for_strict_serializable_ms
330 .with_label_values(label_values)
331 }
332
333 pub(crate) fn timestamp_difference_for_bounded_staleness_ms(
334 &self,
335 label_values: &[&str],
336 ) -> Histogram {
337 self.timestamp_difference_for_bounded_staleness_ms
338 .with_label_values(label_values)
339 }
340
341 pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
342 self.optimization_notices.with_label_values(label_values)
343 }
344
345 pub(crate) fn statement_logging_records(
346 &self,
347 label_values: &[&str],
348 ) -> GenericCounter<AtomicU64> {
349 self.statement_logging_records
350 .with_label_values(label_values)
351 }
352
353 pub(crate) fn statement_logging_unsampled_bytes(&self) -> &IntCounter {
354 &self.statement_logging_unsampled_bytes
355 }
356
357 pub(crate) fn statement_logging_actual_bytes(&self) -> &IntCounter {
358 &self.statement_logging_actual_bytes
359 }
360}
361
362pub(crate) fn session_type_label_value(user: &User) -> &'static str {
363 match user.is_internal() {
364 true => "system",
365 false => "user",
366 }
367}
368
369pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
370where
371 T: AstInfo,
372{
373 statement_kind_label_value(StatementKind::from(stmt))
374}
375
376pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
377where
378 T: AstInfo,
379{
380 match output {
381 SubscribeOutput::Diffs => "diffs",
382 SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
383 SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
384 SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
385 }
386}