1use mz_ore::metric;
11use mz_ore::metrics::MetricsRegistry;
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21 pub query_total: IntCounterVec,
22 pub active_sessions: IntGaugeVec,
23 pub active_subscribes: IntGaugeVec,
24 pub active_copy_tos: IntGaugeVec,
25 pub queue_busy_seconds: Histogram,
26 pub determine_timestamp: IntCounterVec,
27 pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28 pub commands: IntCounterVec,
29 pub storage_usage_collection_time_seconds: Histogram,
30 pub arrangement_sizes_collection_time_seconds: Histogram,
31 pub arrangement_sizes_rows_written: IntCounter,
32 pub subscribe_outputs: IntCounterVec,
33 pub canceled_peeks: IntCounter,
34 pub linearize_message_seconds: HistogramVec,
35 pub time_to_first_row_seconds: HistogramVec,
36 pub statement_logging_records: IntCounterVec,
37 pub statement_logging_unsampled_bytes: IntCounter,
38 pub statement_logging_actual_bytes: IntCounter,
39 pub message_batch: Histogram,
40 pub message_handling: HistogramVec,
41 pub optimization_notices: IntCounterVec,
42 pub append_table_duration_seconds: Histogram,
43 pub webhook_validation_reduce_failures: IntCounterVec,
44 pub webhook_get_appender: IntCounter,
45 pub check_scheduling_policies_seconds: HistogramVec,
46 pub handle_scheduling_decisions_seconds: HistogramVec,
47 pub row_set_finishing_seconds: Histogram,
48 pub session_startup_table_writes_seconds: Histogram,
49 pub parse_seconds: Histogram,
50 pub pgwire_message_processing_seconds: HistogramVec,
51 pub result_rows_first_to_last_byte_seconds: HistogramVec,
52 pub pgwire_ensure_transaction_seconds: HistogramVec,
53 pub catalog_snapshot_seconds: HistogramVec,
54 pub pgwire_recv_scheduling_delay_ms: HistogramVec,
55 pub catalog_transact_seconds: HistogramVec,
56 pub apply_catalog_implications_seconds: Histogram,
57 pub group_commit_catalog_upper_seconds: Histogram,
58 pub group_commit_table_advancement_seconds: Histogram,
59}
60
61impl Metrics {
62 pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
63 Self {
64 query_total: registry.register(metric!(
65 name: "mz_query_total",
66 help: "The total number of queries issued of the given type since process start.",
67 var_labels: ["session_type", "statement_type"],
68 )),
69 active_sessions: registry.register(metric!(
70 name: "mz_active_sessions",
71 help: "The number of active coordinator sessions.",
72 var_labels: ["session_type"],
73 )),
74 active_subscribes: registry.register(metric!(
75 name: "mz_active_subscribes",
76 help: "The number of active SUBSCRIBE queries.",
77 var_labels: ["session_type"],
78 )),
79 active_copy_tos: registry.register(metric!(
80 name: "mz_active_copy_tos",
81 help: "The number of active COPY TO queries.",
82 var_labels: ["session_type"],
83 )),
84 queue_busy_seconds: registry.register(metric!(
85 name: "mz_coord_queue_busy_seconds",
86 help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
87 buckets: histogram_seconds_buckets(0.000_128, 32.0)
88 )),
89 determine_timestamp: registry.register(metric!(
90 name: "mz_determine_timestamp",
91 help: "The total number of calls to determine_timestamp.",
92 var_labels:["respond_immediately", "isolation_level", "compute_instance"],
93 )),
94 timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
95 name: "mz_timestamp_difference_for_strict_serializable_ms",
96 help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
97 var_labels:["compute_instance"],
98 buckets: histogram_milliseconds_buckets(1., 8000.),
99 )),
100 commands: registry.register(metric!(
101 name: "mz_adapter_commands",
102 help: "The total number of adapter commands issued of the given type since process start.",
103 var_labels: ["command_type", "status", "application_name"],
104 )),
105 storage_usage_collection_time_seconds: registry.register(metric!(
106 name: "mz_storage_usage_collection_time_seconds",
107 help: "The number of seconds the coord spends collecting usage metrics from storage.",
108 buckets: histogram_seconds_buckets(0.000_128, 8.0)
109 )),
110 arrangement_sizes_collection_time_seconds: registry.register(metric!(
111 name: "mz_arrangement_sizes_collection_time_seconds",
112 help: "Seconds to read mz_object_arrangement_sizes and prepare history-table updates for one snapshot.",
113 buckets: histogram_seconds_buckets(0.000_128, 8.0)
114 )),
115 arrangement_sizes_rows_written: registry.register(metric!(
116 name: "mz_arrangement_sizes_rows_written_total",
117 help: "Total rows appended to mz_object_arrangement_size_history since process start.",
118 )),
119 subscribe_outputs: registry.register(metric!(
120 name: "mz_subscribe_outputs",
121 help: "The total number of different subscribe outputs used",
122 var_labels: ["session_type", "subscribe_output"],
123 )),
124 canceled_peeks: registry.register(metric!(
125 name: "mz_canceled_peeks_total",
126 help: "The total number of canceled peeks since process start.",
127 )),
128 linearize_message_seconds: registry.register(metric!(
129 name: "mz_linearize_message_seconds",
130 help: "The number of seconds it takes to linearize strict serializable messages",
131 var_labels: ["type", "immediately_handled"],
132 buckets: histogram_seconds_buckets(0.000_128, 8.0),
133 )),
134 time_to_first_row_seconds: registry.register(metric! {
135 name: "mz_time_to_first_row_seconds",
136 help: "Latency of an execute for a successful query from pgwire's perspective",
137 var_labels: ["instance_id", "isolation_level", "strategy"],
138 buckets: histogram_seconds_buckets(0.000_128, 32.0)
139 }),
140 statement_logging_records: registry.register(metric! {
141 name: "mz_statement_logging_record_count",
142 help: "The total number of SQL statements tagged with whether or not they were recorded.",
143 var_labels: ["sample"],
144 }),
145 statement_logging_unsampled_bytes: registry.register(metric!(
146 name: "mz_statement_logging_unsampled_bytes",
147 help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
148 )),
149 statement_logging_actual_bytes: registry.register(metric!(
150 name: "mz_statement_logging_actual_bytes",
151 help: "The total amount of SQL text that was logged by statement logging.",
152 )),
153 message_batch: registry.register(metric!(
154 name: "mz_coordinator_message_batch_size",
155 help: "Message batch size handled by the coordinator.",
156 buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
157 )),
158 message_handling: registry.register(metric!(
159 name: "mz_slow_message_handling",
160 help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
161 var_labels: ["message_kind"],
162 buckets: histogram_seconds_buckets(0.000_128, 512.0),
163 )),
164 optimization_notices: registry.register(metric!(
165 name: "mz_optimization_notices",
166 help: "Number of optimization notices per notice type.",
167 var_labels: ["notice_type"],
168 )),
169 append_table_duration_seconds: registry.register(metric!(
170 name: "mz_append_table_duration_seconds",
171 help: "Latency for appending to any (user or system) table.",
172 buckets: histogram_seconds_buckets(0.128, 32.0),
173 )),
174 webhook_validation_reduce_failures: registry.register(metric!(
175 name: "mz_webhook_validation_reduce_failures",
176 help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
177 var_labels: ["reason"],
178 )),
179 webhook_get_appender: registry.register(metric!(
180 name: "mz_webhook_get_appender_count",
181 help: "Count of getting a webhook appender from the Coordinator.",
182 )),
183 check_scheduling_policies_seconds: registry.register(metric!(
184 name: "mz_check_scheduling_policies_seconds",
185 help: "The time each policy in `check_scheduling_policies` takes.",
186 var_labels: ["policy", "thread"],
187 buckets: histogram_seconds_buckets(0.000_128, 8.0),
188 )),
189 handle_scheduling_decisions_seconds: registry.register(metric!(
190 name: "mz_handle_scheduling_decisions_seconds",
191 help: "The time `handle_scheduling_decisions` takes.",
192 var_labels: ["altered_a_cluster"],
193 buckets: histogram_seconds_buckets(0.000_128, 8.0),
194 )),
195 row_set_finishing_seconds: registry.register(metric!(
196 name: "mz_row_set_finishing_seconds",
197 help: "The time it takes to run RowSetFinishing::finish.",
198 buckets: histogram_seconds_buckets(0.000_128, 16.0),
199 )),
200 session_startup_table_writes_seconds: registry.register(metric!(
201 name: "mz_session_startup_table_writes_seconds",
202 help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
203 buckets: histogram_seconds_buckets(0.000_008, 4.0),
204 )),
205 parse_seconds: registry.register(metric!(
206 name: "mz_parse_seconds",
207 help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
208 buckets: histogram_seconds_buckets(0.001, 8.0),
209 )),
210 pgwire_message_processing_seconds: registry.register(metric!(
211 name: "mz_pgwire_message_processing_seconds",
212 help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
213 var_labels: ["message_type"],
214 buckets: histogram_seconds_buckets(0.001, 512.0),
215 )),
216 result_rows_first_to_last_byte_seconds: registry.register(metric!(
217 name: "mz_result_rows_first_to_last_byte_seconds",
218 help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
219 var_labels: ["statement_type"],
220 buckets: histogram_seconds_buckets(0.001, 8192.0),
221 )),
222 pgwire_ensure_transaction_seconds: registry.register(metric!(
223 name: "mz_pgwire_ensure_transaction_seconds",
224 help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
225 var_labels: ["message_type"],
226 buckets: histogram_seconds_buckets(0.001, 512.0),
227 )),
228 catalog_snapshot_seconds: registry.register(metric!(
229 name: "mz_catalog_snapshot_seconds",
230 help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
231 var_labels: ["context"],
232 buckets: histogram_seconds_buckets(0.001, 512.0),
233 )),
234 pgwire_recv_scheduling_delay_ms: registry.register(metric!(
235 name: "mz_pgwire_recv_scheduling_delay_ms",
236 help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
237 var_labels: ["message_type"],
238 buckets: histogram_milliseconds_buckets(0.128, 512000.),
239 )),
240 catalog_transact_seconds: registry.register(metric!(
241 name: "mz_catalog_transact_seconds",
242 help: "The time it takes to run various catalog transact methods.",
243 var_labels: ["method"],
244 buckets: histogram_seconds_buckets(0.001, 32.0),
245 )),
246 apply_catalog_implications_seconds: registry.register(metric!(
247 name: "mz_apply_catalog_implications_seconds",
248 help: "The time it takes to apply catalog implications.",
249 buckets: histogram_seconds_buckets(0.001, 32.0),
250 )),
251 group_commit_catalog_upper_seconds: registry.register(metric!(
252 name: "mz_group_commit_catalog_upper_seconds",
253 help: "The time it takes to advance the catalog shard upper during group commit.",
254 buckets: histogram_seconds_buckets(0.001, 32.0),
255 )),
256 group_commit_table_advancement_seconds: registry.register(metric!(
257 name: "mz_group_commit_table_advancement_seconds",
258 help: "The time it takes to iterate over all catalog entries to find tables during group commit.",
259 buckets: histogram_seconds_buckets(0.001, 32.0),
260 ))
261 }
262 }
263
264 pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
265 self.row_set_finishing_seconds.clone()
266 }
267
268 pub(crate) fn session_metrics(&self) -> SessionMetrics {
269 SessionMetrics {
270 row_set_finishing_seconds: self.row_set_finishing_seconds(),
271 session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
272 query_total: self.query_total.clone(),
273 determine_timestamp: self.determine_timestamp.clone(),
274 timestamp_difference_for_strict_serializable_ms: self
275 .timestamp_difference_for_strict_serializable_ms
276 .clone(),
277 optimization_notices: self.optimization_notices.clone(),
278 statement_logging_records: self.statement_logging_records.clone(),
279 statement_logging_unsampled_bytes: self.statement_logging_unsampled_bytes.clone(),
280 statement_logging_actual_bytes: self.statement_logging_actual_bytes.clone(),
281 }
282 }
283}
284
285#[derive(Debug, Clone)]
287pub struct SessionMetrics {
288 row_set_finishing_seconds: Histogram,
289 session_startup_table_writes_seconds: Histogram,
290 query_total: IntCounterVec,
291 determine_timestamp: IntCounterVec,
292 timestamp_difference_for_strict_serializable_ms: HistogramVec,
293 optimization_notices: IntCounterVec,
294 statement_logging_records: IntCounterVec,
295 statement_logging_unsampled_bytes: IntCounter,
296 statement_logging_actual_bytes: IntCounter,
297}
298
299impl SessionMetrics {
300 pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
301 &self.row_set_finishing_seconds
302 }
303
304 pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
305 &self.session_startup_table_writes_seconds
306 }
307
308 pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
309 self.query_total.with_label_values(label_values)
310 }
311
312 pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
313 self.determine_timestamp.with_label_values(label_values)
314 }
315
316 pub(crate) fn timestamp_difference_for_strict_serializable_ms(
317 &self,
318 label_values: &[&str],
319 ) -> Histogram {
320 self.timestamp_difference_for_strict_serializable_ms
321 .with_label_values(label_values)
322 }
323
324 pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
325 self.optimization_notices.with_label_values(label_values)
326 }
327
328 pub(crate) fn statement_logging_records(
329 &self,
330 label_values: &[&str],
331 ) -> GenericCounter<AtomicU64> {
332 self.statement_logging_records
333 .with_label_values(label_values)
334 }
335
336 pub(crate) fn statement_logging_unsampled_bytes(&self) -> &IntCounter {
337 &self.statement_logging_unsampled_bytes
338 }
339
340 pub(crate) fn statement_logging_actual_bytes(&self) -> &IntCounter {
341 &self.statement_logging_actual_bytes
342 }
343}
344
345pub(crate) fn session_type_label_value(user: &User) -> &'static str {
346 match user.is_internal() {
347 true => "system",
348 false => "user",
349 }
350}
351
352pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
353where
354 T: AstInfo,
355{
356 statement_kind_label_value(StatementKind::from(stmt))
357}
358
359pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
360where
361 T: AstInfo,
362{
363 match output {
364 SubscribeOutput::Diffs => "diffs",
365 SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
366 SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
367 SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
368 }
369}