1use mz_ore::metric;
11use mz_ore::metrics::MetricsRegistry;
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21 pub query_total: IntCounterVec,
22 pub active_sessions: IntGaugeVec,
23 pub active_subscribes: IntGaugeVec,
24 pub active_copy_tos: IntGaugeVec,
25 pub queue_busy_seconds: Histogram,
26 pub determine_timestamp: IntCounterVec,
27 pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28 pub commands: IntCounterVec,
29 pub storage_usage_collection_time_seconds: Histogram,
30 pub arrangement_sizes_collection_time_seconds: Histogram,
31 pub arrangement_sizes_rows_written: IntCounter,
32 pub subscribe_outputs: IntCounterVec,
33 pub canceled_peeks: IntCounter,
34 pub linearize_message_seconds: HistogramVec,
35 pub time_to_first_row_seconds: HistogramVec,
36 pub statement_logging_records: IntCounterVec,
37 pub statement_logging_unsampled_bytes: IntCounter,
38 pub statement_logging_actual_bytes: IntCounter,
39 pub message_batch: Histogram,
40 pub message_handling: HistogramVec,
41 pub optimization_notices: IntCounterVec,
42 pub append_table_duration_seconds: Histogram,
43 pub webhook_validation_reduce_failures: IntCounterVec,
44 pub webhook_get_appender: IntCounter,
45 pub check_scheduling_policies_seconds: HistogramVec,
46 pub handle_scheduling_decisions_seconds: HistogramVec,
47 pub row_set_finishing_seconds: Histogram,
48 pub session_startup_table_writes_seconds: Histogram,
49 pub parse_seconds: Histogram,
50 pub pgwire_message_processing_seconds: HistogramVec,
51 pub result_rows_first_to_last_byte_seconds: HistogramVec,
52 pub pgwire_ensure_transaction_seconds: HistogramVec,
53 pub catalog_snapshot_seconds: HistogramVec,
54 pub pgwire_recv_scheduling_delay_ms: HistogramVec,
55 pub catalog_transact_seconds: HistogramVec,
56 pub apply_catalog_implications_seconds: Histogram,
57 pub group_commit_catalog_upper_seconds: Histogram,
58}
59
60impl Metrics {
61 pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
62 Self {
63 query_total: registry.register(metric!(
64 name: "mz_query_total",
65 help: "The total number of queries issued of the given type since process start.",
66 var_labels: ["session_type", "statement_type"],
67 )),
68 active_sessions: registry.register(metric!(
69 name: "mz_active_sessions",
70 help: "The number of active coordinator sessions.",
71 var_labels: ["session_type"],
72 )),
73 active_subscribes: registry.register(metric!(
74 name: "mz_active_subscribes",
75 help: "The number of active SUBSCRIBE queries.",
76 var_labels: ["session_type"],
77 )),
78 active_copy_tos: registry.register(metric!(
79 name: "mz_active_copy_tos",
80 help: "The number of active COPY TO queries.",
81 var_labels: ["session_type"],
82 )),
83 queue_busy_seconds: registry.register(metric!(
84 name: "mz_coord_queue_busy_seconds",
85 help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
86 buckets: histogram_seconds_buckets(0.000_128, 32.0)
87 )),
88 determine_timestamp: registry.register(metric!(
89 name: "mz_determine_timestamp",
90 help: "The total number of calls to determine_timestamp.",
91 var_labels:["respond_immediately", "isolation_level", "compute_instance"],
92 )),
93 timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
94 name: "mz_timestamp_difference_for_strict_serializable_ms",
95 help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
96 var_labels:["compute_instance"],
97 buckets: histogram_milliseconds_buckets(1., 8000.),
98 )),
99 commands: registry.register(metric!(
100 name: "mz_adapter_commands",
101 help: "The total number of adapter commands issued of the given type since process start.",
102 var_labels: ["command_type", "status", "application_name"],
103 )),
104 storage_usage_collection_time_seconds: registry.register(metric!(
105 name: "mz_storage_usage_collection_time_seconds",
106 help: "The number of seconds the coord spends collecting usage metrics from storage.",
107 buckets: histogram_seconds_buckets(0.000_128, 8.0)
108 )),
109 arrangement_sizes_collection_time_seconds: registry.register(metric!(
110 name: "mz_arrangement_sizes_collection_time_seconds",
111 help: "Seconds to read mz_object_arrangement_sizes and prepare history-table updates for one snapshot.",
112 buckets: histogram_seconds_buckets(0.000_128, 8.0)
113 )),
114 arrangement_sizes_rows_written: registry.register(metric!(
115 name: "mz_arrangement_sizes_rows_written_total",
116 help: "Total rows appended to mz_object_arrangement_size_history since process start.",
117 )),
118 subscribe_outputs: registry.register(metric!(
119 name: "mz_subscribe_outputs",
120 help: "The total number of different subscribe outputs used",
121 var_labels: ["session_type", "subscribe_output"],
122 )),
123 canceled_peeks: registry.register(metric!(
124 name: "mz_canceled_peeks_total",
125 help: "The total number of canceled peeks since process start.",
126 )),
127 linearize_message_seconds: registry.register(metric!(
128 name: "mz_linearize_message_seconds",
129 help: "The number of seconds it takes to linearize strict serializable messages",
130 var_labels: ["type", "immediately_handled"],
131 buckets: histogram_seconds_buckets(0.000_128, 8.0),
132 )),
133 time_to_first_row_seconds: registry.register(metric! {
134 name: "mz_time_to_first_row_seconds",
135 help: "Latency of an execute for a successful query from pgwire's perspective",
136 var_labels: ["instance_id", "isolation_level", "strategy"],
137 buckets: histogram_seconds_buckets(0.000_128, 32.0)
138 }),
139 statement_logging_records: registry.register(metric! {
140 name: "mz_statement_logging_record_count",
141 help: "The total number of SQL statements tagged with whether or not they were recorded.",
142 var_labels: ["sample"],
143 }),
144 statement_logging_unsampled_bytes: registry.register(metric!(
145 name: "mz_statement_logging_unsampled_bytes",
146 help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
147 )),
148 statement_logging_actual_bytes: registry.register(metric!(
149 name: "mz_statement_logging_actual_bytes",
150 help: "The total amount of SQL text that was logged by statement logging.",
151 )),
152 message_batch: registry.register(metric!(
153 name: "mz_coordinator_message_batch_size",
154 help: "Message batch size handled by the coordinator.",
155 buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
156 )),
157 message_handling: registry.register(metric!(
158 name: "mz_slow_message_handling",
159 help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
160 var_labels: ["message_kind"],
161 buckets: histogram_seconds_buckets(0.000_128, 512.0),
162 )),
163 optimization_notices: registry.register(metric!(
164 name: "mz_optimization_notices",
165 help: "Number of optimization notices per notice type.",
166 var_labels: ["notice_type"],
167 )),
168 append_table_duration_seconds: registry.register(metric!(
169 name: "mz_append_table_duration_seconds",
170 help: "Latency for appending to any (user or system) table.",
171 buckets: histogram_seconds_buckets(0.128, 32.0),
172 )),
173 webhook_validation_reduce_failures: registry.register(metric!(
174 name: "mz_webhook_validation_reduce_failures",
175 help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
176 var_labels: ["reason"],
177 )),
178 webhook_get_appender: registry.register(metric!(
179 name: "mz_webhook_get_appender_count",
180 help: "Count of getting a webhook appender from the Coordinator.",
181 )),
182 check_scheduling_policies_seconds: registry.register(metric!(
183 name: "mz_check_scheduling_policies_seconds",
184 help: "The time each policy in `check_scheduling_policies` takes.",
185 var_labels: ["policy", "thread"],
186 buckets: histogram_seconds_buckets(0.000_128, 8.0),
187 )),
188 handle_scheduling_decisions_seconds: registry.register(metric!(
189 name: "mz_handle_scheduling_decisions_seconds",
190 help: "The time `handle_scheduling_decisions` takes.",
191 var_labels: ["altered_a_cluster"],
192 buckets: histogram_seconds_buckets(0.000_128, 8.0),
193 )),
194 row_set_finishing_seconds: registry.register(metric!(
195 name: "mz_row_set_finishing_seconds",
196 help: "The time it takes to run RowSetFinishing::finish.",
197 buckets: histogram_seconds_buckets(0.000_128, 16.0),
198 )),
199 session_startup_table_writes_seconds: registry.register(metric!(
200 name: "mz_session_startup_table_writes_seconds",
201 help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
202 buckets: histogram_seconds_buckets(0.000_008, 4.0),
203 )),
204 parse_seconds: registry.register(metric!(
205 name: "mz_parse_seconds",
206 help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
207 buckets: histogram_seconds_buckets(0.001, 8.0),
208 )),
209 pgwire_message_processing_seconds: registry.register(metric!(
210 name: "mz_pgwire_message_processing_seconds",
211 help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
212 var_labels: ["message_type"],
213 buckets: histogram_seconds_buckets(0.001, 512.0),
214 )),
215 result_rows_first_to_last_byte_seconds: registry.register(metric!(
216 name: "mz_result_rows_first_to_last_byte_seconds",
217 help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
218 var_labels: ["statement_type"],
219 buckets: histogram_seconds_buckets(0.001, 8192.0),
220 )),
221 pgwire_ensure_transaction_seconds: registry.register(metric!(
222 name: "mz_pgwire_ensure_transaction_seconds",
223 help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
224 var_labels: ["message_type"],
225 buckets: histogram_seconds_buckets(0.001, 512.0),
226 )),
227 catalog_snapshot_seconds: registry.register(metric!(
228 name: "mz_catalog_snapshot_seconds",
229 help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
230 var_labels: ["context"],
231 buckets: histogram_seconds_buckets(0.001, 512.0),
232 )),
233 pgwire_recv_scheduling_delay_ms: registry.register(metric!(
234 name: "mz_pgwire_recv_scheduling_delay_ms",
235 help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
236 var_labels: ["message_type"],
237 buckets: histogram_milliseconds_buckets(0.128, 512000.),
238 )),
239 catalog_transact_seconds: registry.register(metric!(
240 name: "mz_catalog_transact_seconds",
241 help: "The time it takes to run various catalog transact methods.",
242 var_labels: ["method"],
243 buckets: histogram_seconds_buckets(0.001, 32.0),
244 )),
245 apply_catalog_implications_seconds: registry.register(metric!(
246 name: "mz_apply_catalog_implications_seconds",
247 help: "The time it takes to apply catalog implications.",
248 buckets: histogram_seconds_buckets(0.001, 32.0),
249 )),
250 group_commit_catalog_upper_seconds: registry.register(metric!(
251 name: "mz_group_commit_catalog_upper_seconds",
252 help: "The time it takes to advance the catalog shard upper during group commit.",
253 buckets: histogram_seconds_buckets(0.001, 32.0),
254 )),
255 }
256 }
257
258 pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
259 self.row_set_finishing_seconds.clone()
260 }
261
262 pub(crate) fn session_metrics(&self) -> SessionMetrics {
263 SessionMetrics {
264 row_set_finishing_seconds: self.row_set_finishing_seconds(),
265 session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
266 query_total: self.query_total.clone(),
267 determine_timestamp: self.determine_timestamp.clone(),
268 timestamp_difference_for_strict_serializable_ms: self
269 .timestamp_difference_for_strict_serializable_ms
270 .clone(),
271 optimization_notices: self.optimization_notices.clone(),
272 statement_logging_records: self.statement_logging_records.clone(),
273 statement_logging_unsampled_bytes: self.statement_logging_unsampled_bytes.clone(),
274 statement_logging_actual_bytes: self.statement_logging_actual_bytes.clone(),
275 }
276 }
277}
278
279#[derive(Debug, Clone)]
281pub struct SessionMetrics {
282 row_set_finishing_seconds: Histogram,
283 session_startup_table_writes_seconds: Histogram,
284 query_total: IntCounterVec,
285 determine_timestamp: IntCounterVec,
286 timestamp_difference_for_strict_serializable_ms: HistogramVec,
287 optimization_notices: IntCounterVec,
288 statement_logging_records: IntCounterVec,
289 statement_logging_unsampled_bytes: IntCounter,
290 statement_logging_actual_bytes: IntCounter,
291}
292
293impl SessionMetrics {
294 pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
295 &self.row_set_finishing_seconds
296 }
297
298 pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
299 &self.session_startup_table_writes_seconds
300 }
301
302 pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
303 self.query_total.with_label_values(label_values)
304 }
305
306 pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
307 self.determine_timestamp.with_label_values(label_values)
308 }
309
310 pub(crate) fn timestamp_difference_for_strict_serializable_ms(
311 &self,
312 label_values: &[&str],
313 ) -> Histogram {
314 self.timestamp_difference_for_strict_serializable_ms
315 .with_label_values(label_values)
316 }
317
318 pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
319 self.optimization_notices.with_label_values(label_values)
320 }
321
322 pub(crate) fn statement_logging_records(
323 &self,
324 label_values: &[&str],
325 ) -> GenericCounter<AtomicU64> {
326 self.statement_logging_records
327 .with_label_values(label_values)
328 }
329
330 pub(crate) fn statement_logging_unsampled_bytes(&self) -> &IntCounter {
331 &self.statement_logging_unsampled_bytes
332 }
333
334 pub(crate) fn statement_logging_actual_bytes(&self) -> &IntCounter {
335 &self.statement_logging_actual_bytes
336 }
337}
338
339pub(crate) fn session_type_label_value(user: &User) -> &'static str {
340 match user.is_internal() {
341 true => "system",
342 false => "user",
343 }
344}
345
346pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
347where
348 T: AstInfo,
349{
350 statement_kind_label_value(StatementKind::from(stmt))
351}
352
353pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
354where
355 T: AstInfo,
356{
357 match output {
358 SubscribeOutput::Diffs => "diffs",
359 SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
360 SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
361 SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
362 }
363}