1use mz_ore::metric;
11use mz_ore::metrics::MetricsRegistry;
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21 pub query_total: IntCounterVec,
22 pub active_sessions: IntGaugeVec,
23 pub active_subscribes: IntGaugeVec,
24 pub active_copy_tos: IntGaugeVec,
25 pub queue_busy_seconds: Histogram,
26 pub determine_timestamp: IntCounterVec,
27 pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28 pub commands: IntCounterVec,
29 pub storage_usage_collection_time_seconds: Histogram,
30 pub subscribe_outputs: IntCounterVec,
31 pub canceled_peeks: IntCounter,
32 pub linearize_message_seconds: HistogramVec,
33 pub time_to_first_row_seconds: HistogramVec,
34 pub statement_logging_records: IntCounterVec,
35 pub statement_logging_unsampled_bytes: IntCounter,
36 pub statement_logging_actual_bytes: IntCounter,
37 pub message_batch: Histogram,
38 pub message_handling: HistogramVec,
39 pub optimization_notices: IntCounterVec,
40 pub append_table_duration_seconds: Histogram,
41 pub webhook_validation_reduce_failures: IntCounterVec,
42 pub webhook_get_appender: IntCounter,
43 pub check_scheduling_policies_seconds: HistogramVec,
44 pub handle_scheduling_decisions_seconds: HistogramVec,
45 pub row_set_finishing_seconds: Histogram,
46 pub session_startup_table_writes_seconds: Histogram,
47 pub parse_seconds: Histogram,
48 pub pgwire_message_processing_seconds: HistogramVec,
49 pub result_rows_first_to_last_byte_seconds: HistogramVec,
50 pub pgwire_ensure_transaction_seconds: HistogramVec,
51 pub catalog_snapshot_seconds: HistogramVec,
52 pub pgwire_recv_scheduling_delay_ms: HistogramVec,
53 pub catalog_transact_seconds: HistogramVec,
54 pub apply_catalog_implications_seconds: Histogram,
55}
56
57impl Metrics {
58 pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
59 Self {
60 query_total: registry.register(metric!(
61 name: "mz_query_total",
62 help: "The total number of queries issued of the given type since process start.",
63 var_labels: ["session_type", "statement_type"],
64 )),
65 active_sessions: registry.register(metric!(
66 name: "mz_active_sessions",
67 help: "The number of active coordinator sessions.",
68 var_labels: ["session_type"],
69 )),
70 active_subscribes: registry.register(metric!(
71 name: "mz_active_subscribes",
72 help: "The number of active SUBSCRIBE queries.",
73 var_labels: ["session_type"],
74 )),
75 active_copy_tos: registry.register(metric!(
76 name: "mz_active_copy_tos",
77 help: "The number of active COPY TO queries.",
78 var_labels: ["session_type"],
79 )),
80 queue_busy_seconds: registry.register(metric!(
81 name: "mz_coord_queue_busy_seconds",
82 help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
83 buckets: histogram_seconds_buckets(0.000_128, 32.0)
84 )),
85 determine_timestamp: registry.register(metric!(
86 name: "mz_determine_timestamp",
87 help: "The total number of calls to determine_timestamp.",
88 var_labels:["respond_immediately", "isolation_level", "compute_instance", "determination_method"],
89 )),
90 timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
91 name: "mz_timestamp_difference_for_strict_serializable_ms",
92 help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
93 var_labels:["compute_instance", "determination_method"],
94 buckets: histogram_milliseconds_buckets(1., 8000.),
95 )),
96 commands: registry.register(metric!(
97 name: "mz_adapter_commands",
98 help: "The total number of adapter commands issued of the given type since process start.",
99 var_labels: ["command_type", "status", "application_name"],
100 )),
101 storage_usage_collection_time_seconds: registry.register(metric!(
102 name: "mz_storage_usage_collection_time_seconds",
103 help: "The number of seconds the coord spends collecting usage metrics from storage.",
104 buckets: histogram_seconds_buckets(0.000_128, 8.0)
105 )),
106 subscribe_outputs: registry.register(metric!(
107 name: "mz_subscribe_outputs",
108 help: "The total number of different subscribe outputs used",
109 var_labels: ["session_type", "subscribe_output"],
110 )),
111 canceled_peeks: registry.register(metric!(
112 name: "mz_canceled_peeks_total",
113 help: "The total number of canceled peeks since process start.",
114 )),
115 linearize_message_seconds: registry.register(metric!(
116 name: "mz_linearize_message_seconds",
117 help: "The number of seconds it takes to linearize strict serializable messages",
118 var_labels: ["type", "immediately_handled"],
119 buckets: histogram_seconds_buckets(0.000_128, 8.0),
120 )),
121 time_to_first_row_seconds: registry.register(metric! {
122 name: "mz_time_to_first_row_seconds",
123 help: "Latency of an execute for a successful query from pgwire's perspective",
124 var_labels: ["instance_id", "isolation_level", "strategy"],
125 buckets: histogram_seconds_buckets(0.000_128, 32.0)
126 }),
127 statement_logging_records: registry.register(metric! {
128 name: "mz_statement_logging_record_count",
129 help: "The total number of SQL statements tagged with whether or not they were recorded.",
130 var_labels: ["sample"],
131 }),
132 statement_logging_unsampled_bytes: registry.register(metric!(
133 name: "mz_statement_logging_unsampled_bytes",
134 help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
135 )),
136 statement_logging_actual_bytes: registry.register(metric!(
137 name: "mz_statement_logging_actual_bytes",
138 help: "The total amount of SQL text that was logged by statement logging.",
139 )),
140 message_batch: registry.register(metric!(
141 name: "mz_coordinator_message_batch_size",
142 help: "Message batch size handled by the coordinator.",
143 buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
144 )),
145 message_handling: registry.register(metric!(
146 name: "mz_slow_message_handling",
147 help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
148 var_labels: ["message_kind"],
149 buckets: histogram_seconds_buckets(0.000_128, 512.0),
150 )),
151 optimization_notices: registry.register(metric!(
152 name: "mz_optimization_notices",
153 help: "Number of optimization notices per notice type.",
154 var_labels: ["notice_type"],
155 )),
156 append_table_duration_seconds: registry.register(metric!(
157 name: "mz_append_table_duration_seconds",
158 help: "Latency for appending to any (user or system) table.",
159 buckets: histogram_seconds_buckets(0.128, 32.0),
160 )),
161 webhook_validation_reduce_failures: registry.register(metric!(
162 name: "mz_webhook_validation_reduce_failures",
163 help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
164 var_labels: ["reason"],
165 )),
166 webhook_get_appender: registry.register(metric!(
167 name: "mz_webhook_get_appender_count",
168 help: "Count of getting a webhook appender from the Coordinator.",
169 )),
170 check_scheduling_policies_seconds: registry.register(metric!(
171 name: "mz_check_scheduling_policies_seconds",
172 help: "The time each policy in `check_scheduling_policies` takes.",
173 var_labels: ["policy", "thread"],
174 buckets: histogram_seconds_buckets(0.000_128, 8.0),
175 )),
176 handle_scheduling_decisions_seconds: registry.register(metric!(
177 name: "mz_handle_scheduling_decisions_seconds",
178 help: "The time `handle_scheduling_decisions` takes.",
179 var_labels: ["altered_a_cluster"],
180 buckets: histogram_seconds_buckets(0.000_128, 8.0),
181 )),
182 row_set_finishing_seconds: registry.register(metric!(
183 name: "mz_row_set_finishing_seconds",
184 help: "The time it takes to run RowSetFinishing::finish.",
185 buckets: histogram_seconds_buckets(0.000_128, 16.0),
186 )),
187 session_startup_table_writes_seconds: registry.register(metric!(
188 name: "mz_session_startup_table_writes_seconds",
189 help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
190 buckets: histogram_seconds_buckets(0.000_008, 4.0),
191 )),
192 parse_seconds: registry.register(metric!(
193 name: "mz_parse_seconds",
194 help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
195 buckets: histogram_seconds_buckets(0.001, 8.0),
196 )),
197 pgwire_message_processing_seconds: registry.register(metric!(
198 name: "mz_pgwire_message_processing_seconds",
199 help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
200 var_labels: ["message_type"],
201 buckets: histogram_seconds_buckets(0.001, 512.0),
202 )),
203 result_rows_first_to_last_byte_seconds: registry.register(metric!(
204 name: "mz_result_rows_first_to_last_byte_seconds",
205 help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
206 var_labels: ["statement_type"],
207 buckets: histogram_seconds_buckets(0.001, 8192.0),
208 )),
209 pgwire_ensure_transaction_seconds: registry.register(metric!(
210 name: "mz_pgwire_ensure_transaction_seconds",
211 help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
212 var_labels: ["message_type"],
213 buckets: histogram_seconds_buckets(0.001, 512.0),
214 )),
215 catalog_snapshot_seconds: registry.register(metric!(
216 name: "mz_catalog_snapshot_seconds",
217 help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
218 var_labels: ["context"],
219 buckets: histogram_seconds_buckets(0.001, 512.0),
220 )),
221 pgwire_recv_scheduling_delay_ms: registry.register(metric!(
222 name: "mz_pgwire_recv_scheduling_delay_ms",
223 help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
224 var_labels: ["message_type"],
225 buckets: histogram_milliseconds_buckets(0.128, 512000.),
226 )),
227 catalog_transact_seconds: registry.register(metric!(
228 name: "mz_catalog_transact_seconds",
229 help: "The time it takes to run various catalog transact methods.",
230 var_labels: ["method"],
231 buckets: histogram_seconds_buckets(0.001, 32.0),
232 )),
233 apply_catalog_implications_seconds: registry.register(metric!(
234 name: "mz_apply_catalog_implications_seconds",
235 help: "The time it takes to apply catalog implications.",
236 buckets: histogram_seconds_buckets(0.001, 32.0),
237 ))
238 }
239 }
240
241 pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
242 self.row_set_finishing_seconds.clone()
243 }
244
245 pub(crate) fn session_metrics(&self) -> SessionMetrics {
246 SessionMetrics {
247 row_set_finishing_seconds: self.row_set_finishing_seconds(),
248 session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
249 query_total: self.query_total.clone(),
250 determine_timestamp: self.determine_timestamp.clone(),
251 timestamp_difference_for_strict_serializable_ms: self
252 .timestamp_difference_for_strict_serializable_ms
253 .clone(),
254 optimization_notices: self.optimization_notices.clone(),
255 }
256 }
257}
258
259#[derive(Debug, Clone)]
261pub struct SessionMetrics {
262 row_set_finishing_seconds: Histogram,
263 session_startup_table_writes_seconds: Histogram,
264 query_total: IntCounterVec,
265 determine_timestamp: IntCounterVec,
266 timestamp_difference_for_strict_serializable_ms: HistogramVec,
267 optimization_notices: IntCounterVec,
268}
269
270impl SessionMetrics {
271 pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
272 &self.row_set_finishing_seconds
273 }
274
275 pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
276 &self.session_startup_table_writes_seconds
277 }
278
279 pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
280 self.query_total.with_label_values(label_values)
281 }
282
283 pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
284 self.determine_timestamp.with_label_values(label_values)
285 }
286
287 pub(crate) fn timestamp_difference_for_strict_serializable_ms(
288 &self,
289 label_values: &[&str],
290 ) -> Histogram {
291 self.timestamp_difference_for_strict_serializable_ms
292 .with_label_values(label_values)
293 }
294
295 pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
296 self.optimization_notices.with_label_values(label_values)
297 }
298}
299
300pub(crate) fn session_type_label_value(user: &User) -> &'static str {
301 match user.is_internal() {
302 true => "system",
303 false => "user",
304 }
305}
306
307pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
308where
309 T: AstInfo,
310{
311 statement_kind_label_value(StatementKind::from(stmt))
312}
313
314pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
315where
316 T: AstInfo,
317{
318 match output {
319 SubscribeOutput::Diffs => "diffs",
320 SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
321 SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
322 SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
323 }
324}