1use mz_ore::metric;
11use mz_ore::metrics::{MetricTag, MetricVisibility, MetricsRegistry};
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21 pub query_total: IntCounterVec,
22 pub active_sessions: IntGaugeVec,
23 pub active_subscribes: IntGaugeVec,
24 pub active_copy_tos: IntGaugeVec,
25 pub queue_busy_seconds: Histogram,
26 pub determine_timestamp: IntCounterVec,
27 pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28 pub timestamp_difference_for_bounded_staleness_ms: HistogramVec,
29 pub commands: IntCounterVec,
30 pub storage_usage_collection_time_seconds: Histogram,
31 pub arrangement_sizes_collection_time_seconds: Histogram,
32 pub arrangement_sizes_rows_written: IntCounter,
33 pub subscribe_outputs: IntCounterVec,
34 pub canceled_peeks: IntCounter,
35 pub linearize_message_seconds: HistogramVec,
36 pub time_to_first_row_seconds: HistogramVec,
37 pub statement_logging_records: IntCounterVec,
38 pub statement_logging_unsampled_bytes: IntCounter,
39 pub statement_logging_actual_bytes: IntCounter,
40 pub message_batch: Histogram,
41 pub message_handling: HistogramVec,
42 pub optimization_notices: IntCounterVec,
43 pub append_table_duration_seconds: Histogram,
44 pub webhook_validation_reduce_failures: IntCounterVec,
45 pub webhook_get_appender: IntCounter,
46 pub check_scheduling_policies_seconds: HistogramVec,
47 pub handle_scheduling_decisions_seconds: HistogramVec,
48 pub row_set_finishing_seconds: Histogram,
49 pub session_startup_table_writes_seconds: Histogram,
50 pub parse_seconds: Histogram,
51 pub pgwire_message_processing_seconds: HistogramVec,
52 pub result_rows_first_to_last_byte_seconds: HistogramVec,
53 pub pgwire_ensure_transaction_seconds: HistogramVec,
54 pub catalog_snapshot_seconds: HistogramVec,
55 pub pgwire_recv_scheduling_delay_ms: HistogramVec,
56 pub catalog_transact_seconds: HistogramVec,
57 pub apply_catalog_implications_seconds: Histogram,
58 pub group_commit_catalog_upper_seconds: Histogram,
59}
60
61impl Metrics {
62 pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
63 Self {
64 query_total: registry.register(metric!(
65 name: "mz_query_total",
66 help: "The total number of queries issued of the given type since process start.",
67 var_labels: ["session_type", "statement_type"],
68 visibility: MetricVisibility::Public,
69 tags: [MetricTag::Environment],
70 )),
71 active_sessions: registry.register(metric!(
72 name: "mz_active_sessions",
73 help: "The number of active coordinator sessions.",
74 var_labels: ["session_type"],
75 visibility: MetricVisibility::Public,
76 tags: [MetricTag::Environment],
77 )),
78 active_subscribes: registry.register(metric!(
79 name: "mz_active_subscribes",
80 help: "The number of active SUBSCRIBE queries.",
81 var_labels: ["session_type"],
82 visibility: MetricVisibility::Public,
83 tags: [MetricTag::Environment],
84 )),
85 active_copy_tos: registry.register(metric!(
86 name: "mz_active_copy_tos",
87 help: "The number of active COPY TO queries.",
88 var_labels: ["session_type"],
89 )),
90 queue_busy_seconds: registry.register(metric!(
91 name: "mz_coord_queue_busy_seconds",
92 help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
93 buckets: histogram_seconds_buckets(0.000_128, 32.0)
94 )),
95 determine_timestamp: registry.register(metric!(
96 name: "mz_determine_timestamp",
97 help: "The total number of calls to determine_timestamp.",
98 var_labels:["respond_immediately", "isolation_level", "compute_instance"],
99 )),
100 timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
101 name: "mz_timestamp_difference_for_strict_serializable_ms",
102 help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
103 var_labels:["compute_instance"],
104 buckets: histogram_milliseconds_buckets(1., 8000.),
105 )),
106 timestamp_difference_for_bounded_staleness_ms: registry.register(metric!(
107 name: "mz_timestamp_difference_for_bounded_staleness_ms",
108 help: "How much older bounded-staleness timestamps are compared to serializable, in milliseconds. Measures the actual staleness incurred.",
109 var_labels:["compute_instance"],
110 buckets: histogram_milliseconds_buckets(1., 8000.),
111 )),
112 commands: registry.register(metric!(
113 name: "mz_adapter_commands",
114 help: "The total number of adapter commands issued of the given type since process start.",
115 var_labels: ["command_type", "status", "application_name"],
116 visibility: MetricVisibility::Public,
117 tags: [MetricTag::Environment],
118 )),
119 storage_usage_collection_time_seconds: registry.register(metric!(
120 name: "mz_storage_usage_collection_time_seconds",
121 help: "The number of seconds the coord spends collecting usage metrics from storage.",
122 buckets: histogram_seconds_buckets(0.000_128, 8.0)
123 )),
124 arrangement_sizes_collection_time_seconds: registry.register(metric!(
125 name: "mz_arrangement_sizes_collection_time_seconds",
126 help: "Seconds to read mz_object_arrangement_sizes and prepare history-table updates for one snapshot.",
127 buckets: histogram_seconds_buckets(0.000_128, 8.0)
128 )),
129 arrangement_sizes_rows_written: registry.register(metric!(
130 name: "mz_arrangement_sizes_rows_written_total",
131 help: "Total rows appended to mz_object_arrangement_size_history since process start.",
132 )),
133 subscribe_outputs: registry.register(metric!(
134 name: "mz_subscribe_outputs",
135 help: "The total number of different subscribe outputs used",
136 var_labels: ["session_type", "subscribe_output"],
137 )),
138 canceled_peeks: registry.register(metric!(
139 name: "mz_canceled_peeks_total",
140 help: "The total number of canceled peeks since process start.",
141 )),
142 linearize_message_seconds: registry.register(metric!(
143 name: "mz_linearize_message_seconds",
144 help: "The number of seconds it takes to linearize strict serializable messages",
145 var_labels: ["type", "immediately_handled"],
146 buckets: histogram_seconds_buckets(0.000_128, 8.0),
147 )),
148 time_to_first_row_seconds: registry.register(metric! {
149 name: "mz_time_to_first_row_seconds",
150 help: "Latency of an execute for a successful query from pgwire's perspective",
151 var_labels: ["instance_id", "isolation_level", "strategy", "application_name"],
152 buckets: histogram_seconds_buckets(0.000_128, 32.0)
153 }),
154 statement_logging_records: registry.register(metric! {
155 name: "mz_statement_logging_record_count",
156 help: "The total number of SQL statements tagged with whether or not they were recorded.",
157 var_labels: ["sample"],
158 }),
159 statement_logging_unsampled_bytes: registry.register(metric!(
160 name: "mz_statement_logging_unsampled_bytes",
161 help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
162 )),
163 statement_logging_actual_bytes: registry.register(metric!(
164 name: "mz_statement_logging_actual_bytes",
165 help: "The total amount of SQL text that was logged by statement logging.",
166 )),
167 message_batch: registry.register(metric!(
168 name: "mz_coordinator_message_batch_size",
169 help: "Message batch size handled by the coordinator.",
170 buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
171 )),
172 message_handling: registry.register(metric!(
173 name: "mz_slow_message_handling",
174 help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
175 var_labels: ["message_kind"],
176 buckets: histogram_seconds_buckets(0.000_128, 512.0),
177 )),
178 optimization_notices: registry.register(metric!(
179 name: "mz_optimization_notices",
180 help: "Number of optimization notices per notice type.",
181 var_labels: ["notice_type"],
182 )),
183 append_table_duration_seconds: registry.register(metric!(
184 name: "mz_append_table_duration_seconds",
185 help: "Latency for appending to any (user or system) table.",
186 buckets: histogram_seconds_buckets(0.128, 32.0),
187 )),
188 webhook_validation_reduce_failures: registry.register(metric!(
189 name: "mz_webhook_validation_reduce_failures",
190 help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
191 var_labels: ["reason"],
192 )),
193 webhook_get_appender: registry.register(metric!(
194 name: "mz_webhook_get_appender_count",
195 help: "Count of getting a webhook appender from the Coordinator.",
196 )),
197 check_scheduling_policies_seconds: registry.register(metric!(
198 name: "mz_check_scheduling_policies_seconds",
199 help: "The time each policy in `check_scheduling_policies` takes.",
200 var_labels: ["policy", "thread"],
201 buckets: histogram_seconds_buckets(0.000_128, 8.0),
202 )),
203 handle_scheduling_decisions_seconds: registry.register(metric!(
204 name: "mz_handle_scheduling_decisions_seconds",
205 help: "The time `handle_scheduling_decisions` takes.",
206 var_labels: ["altered_a_cluster"],
207 buckets: histogram_seconds_buckets(0.000_128, 8.0),
208 )),
209 row_set_finishing_seconds: registry.register(metric!(
210 name: "mz_row_set_finishing_seconds",
211 help: "The time it takes to run RowSetFinishing::finish.",
212 buckets: histogram_seconds_buckets(0.000_128, 16.0),
213 )),
214 session_startup_table_writes_seconds: registry.register(metric!(
215 name: "mz_session_startup_table_writes_seconds",
216 help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
217 buckets: histogram_seconds_buckets(0.000_008, 4.0),
218 )),
219 parse_seconds: registry.register(metric!(
220 name: "mz_parse_seconds",
221 help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
222 buckets: histogram_seconds_buckets(0.001, 8.0),
223 )),
224 pgwire_message_processing_seconds: registry.register(metric!(
225 name: "mz_pgwire_message_processing_seconds",
226 help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
227 var_labels: ["message_type"],
228 buckets: histogram_seconds_buckets(0.001, 512.0),
229 )),
230 result_rows_first_to_last_byte_seconds: registry.register(metric!(
231 name: "mz_result_rows_first_to_last_byte_seconds",
232 help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
233 var_labels: ["statement_type"],
234 buckets: histogram_seconds_buckets(0.001, 8192.0),
235 )),
236 pgwire_ensure_transaction_seconds: registry.register(metric!(
237 name: "mz_pgwire_ensure_transaction_seconds",
238 help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
239 var_labels: ["message_type"],
240 buckets: histogram_seconds_buckets(0.001, 512.0),
241 )),
242 catalog_snapshot_seconds: registry.register(metric!(
243 name: "mz_catalog_snapshot_seconds",
244 help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
245 var_labels: ["context"],
246 buckets: histogram_seconds_buckets(0.001, 512.0),
247 )),
248 pgwire_recv_scheduling_delay_ms: registry.register(metric!(
249 name: "mz_pgwire_recv_scheduling_delay_ms",
250 help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
251 var_labels: ["message_type"],
252 buckets: histogram_milliseconds_buckets(0.128, 512000.),
253 )),
254 catalog_transact_seconds: registry.register(metric!(
255 name: "mz_catalog_transact_seconds",
256 help: "The time it takes to run various catalog transact methods.",
257 var_labels: ["method"],
258 buckets: histogram_seconds_buckets(0.001, 32.0),
259 )),
260 apply_catalog_implications_seconds: registry.register(metric!(
261 name: "mz_apply_catalog_implications_seconds",
262 help: "The time it takes to apply catalog implications.",
263 buckets: histogram_seconds_buckets(0.001, 32.0),
264 )),
265 group_commit_catalog_upper_seconds: registry.register(metric!(
266 name: "mz_group_commit_catalog_upper_seconds",
267 help: "The time it takes to advance the catalog shard upper during group commit.",
268 buckets: histogram_seconds_buckets(0.001, 32.0),
269 )),
270 }
271 }
272
273 pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
274 self.row_set_finishing_seconds.clone()
275 }
276
277 pub(crate) fn session_metrics(&self) -> SessionMetrics {
278 SessionMetrics {
279 row_set_finishing_seconds: self.row_set_finishing_seconds(),
280 session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
281 query_total: self.query_total.clone(),
282 determine_timestamp: self.determine_timestamp.clone(),
283 timestamp_difference_for_strict_serializable_ms: self
284 .timestamp_difference_for_strict_serializable_ms
285 .clone(),
286 timestamp_difference_for_bounded_staleness_ms: self
287 .timestamp_difference_for_bounded_staleness_ms
288 .clone(),
289 optimization_notices: self.optimization_notices.clone(),
290 statement_logging_records: self.statement_logging_records.clone(),
291 statement_logging_unsampled_bytes: self.statement_logging_unsampled_bytes.clone(),
292 statement_logging_actual_bytes: self.statement_logging_actual_bytes.clone(),
293 }
294 }
295}
296
297#[derive(Debug, Clone)]
299pub struct SessionMetrics {
300 row_set_finishing_seconds: Histogram,
301 session_startup_table_writes_seconds: Histogram,
302 query_total: IntCounterVec,
303 determine_timestamp: IntCounterVec,
304 timestamp_difference_for_strict_serializable_ms: HistogramVec,
305 timestamp_difference_for_bounded_staleness_ms: HistogramVec,
306 optimization_notices: IntCounterVec,
307 statement_logging_records: IntCounterVec,
308 statement_logging_unsampled_bytes: IntCounter,
309 statement_logging_actual_bytes: IntCounter,
310}
311
312impl SessionMetrics {
313 pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
314 &self.row_set_finishing_seconds
315 }
316
317 pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
318 &self.session_startup_table_writes_seconds
319 }
320
321 pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
322 self.query_total.with_label_values(label_values)
323 }
324
325 pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
326 self.determine_timestamp.with_label_values(label_values)
327 }
328
329 pub(crate) fn timestamp_difference_for_strict_serializable_ms(
330 &self,
331 label_values: &[&str],
332 ) -> Histogram {
333 self.timestamp_difference_for_strict_serializable_ms
334 .with_label_values(label_values)
335 }
336
337 pub(crate) fn timestamp_difference_for_bounded_staleness_ms(
338 &self,
339 label_values: &[&str],
340 ) -> Histogram {
341 self.timestamp_difference_for_bounded_staleness_ms
342 .with_label_values(label_values)
343 }
344
345 pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
346 self.optimization_notices.with_label_values(label_values)
347 }
348
349 pub(crate) fn statement_logging_records(
350 &self,
351 label_values: &[&str],
352 ) -> GenericCounter<AtomicU64> {
353 self.statement_logging_records
354 .with_label_values(label_values)
355 }
356
357 pub(crate) fn statement_logging_unsampled_bytes(&self) -> &IntCounter {
358 &self.statement_logging_unsampled_bytes
359 }
360
361 pub(crate) fn statement_logging_actual_bytes(&self) -> &IntCounter {
362 &self.statement_logging_actual_bytes
363 }
364}
365
366pub(crate) fn session_type_label_value(user: &User) -> &'static str {
367 match user.is_internal() {
368 true => "system",
369 false => "user",
370 }
371}
372
373pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
374where
375 T: AstInfo,
376{
377 statement_kind_label_value(StatementKind::from(stmt))
378}
379
380pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
381where
382 T: AstInfo,
383{
384 match output {
385 SubscribeOutput::Diffs => "diffs",
386 SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
387 SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
388 SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
389 }
390}