mz_adapter/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::metric;
11use mz_ore::metrics::MetricsRegistry;
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21    pub query_total: IntCounterVec,
22    pub active_sessions: IntGaugeVec,
23    pub active_subscribes: IntGaugeVec,
24    pub active_copy_tos: IntGaugeVec,
25    pub queue_busy_seconds: Histogram,
26    pub determine_timestamp: IntCounterVec,
27    pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28    pub commands: IntCounterVec,
29    pub storage_usage_collection_time_seconds: Histogram,
30    pub subscribe_outputs: IntCounterVec,
31    pub canceled_peeks: IntCounter,
32    pub linearize_message_seconds: HistogramVec,
33    pub time_to_first_row_seconds: HistogramVec,
34    pub statement_logging_records: IntCounterVec,
35    pub statement_logging_unsampled_bytes: IntCounter,
36    pub statement_logging_actual_bytes: IntCounter,
37    pub message_batch: Histogram,
38    pub message_handling: HistogramVec,
39    pub optimization_notices: IntCounterVec,
40    pub append_table_duration_seconds: Histogram,
41    pub webhook_validation_reduce_failures: IntCounterVec,
42    pub webhook_get_appender: IntCounter,
43    pub check_scheduling_policies_seconds: HistogramVec,
44    pub handle_scheduling_decisions_seconds: HistogramVec,
45    pub row_set_finishing_seconds: Histogram,
46    pub session_startup_table_writes_seconds: Histogram,
47    pub parse_seconds: Histogram,
48    pub pgwire_message_processing_seconds: HistogramVec,
49    pub result_rows_first_to_last_byte_seconds: HistogramVec,
50    pub pgwire_ensure_transaction_seconds: HistogramVec,
51    pub catalog_snapshot_seconds: HistogramVec,
52    pub pgwire_recv_scheduling_delay_ms: HistogramVec,
53    pub catalog_transact_seconds: HistogramVec,
54    pub apply_catalog_implications_seconds: Histogram,
55    pub group_commit_confirm_leadership_seconds: Histogram,
56    pub group_commit_table_advancement_seconds: Histogram,
57}
58
59impl Metrics {
60    pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
61        Self {
62            query_total: registry.register(metric!(
63                name: "mz_query_total",
64                help: "The total number of queries issued of the given type since process start.",
65                var_labels: ["session_type", "statement_type"],
66            )),
67            active_sessions: registry.register(metric!(
68                name: "mz_active_sessions",
69                help: "The number of active coordinator sessions.",
70                var_labels: ["session_type"],
71            )),
72            active_subscribes: registry.register(metric!(
73                name: "mz_active_subscribes",
74                help: "The number of active SUBSCRIBE queries.",
75                var_labels: ["session_type"],
76            )),
77            active_copy_tos: registry.register(metric!(
78                name: "mz_active_copy_tos",
79                help: "The number of active COPY TO queries.",
80                var_labels: ["session_type"],
81            )),
82            queue_busy_seconds: registry.register(metric!(
83                name: "mz_coord_queue_busy_seconds",
84                help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
85                buckets: histogram_seconds_buckets(0.000_128, 32.0)
86            )),
87            determine_timestamp: registry.register(metric!(
88                name: "mz_determine_timestamp",
89                help: "The total number of calls to determine_timestamp.",
90                var_labels:["respond_immediately", "isolation_level", "compute_instance", "determination_method"],
91            )),
92            timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
93                name: "mz_timestamp_difference_for_strict_serializable_ms",
94                help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
95                var_labels:["compute_instance", "determination_method"],
96                buckets: histogram_milliseconds_buckets(1., 8000.),
97            )),
98            commands: registry.register(metric!(
99                name: "mz_adapter_commands",
100                help: "The total number of adapter commands issued of the given type since process start.",
101                var_labels: ["command_type", "status", "application_name"],
102            )),
103            storage_usage_collection_time_seconds: registry.register(metric!(
104                name: "mz_storage_usage_collection_time_seconds",
105                help: "The number of seconds the coord spends collecting usage metrics from storage.",
106                buckets: histogram_seconds_buckets(0.000_128, 8.0)
107            )),
108            subscribe_outputs: registry.register(metric!(
109                name: "mz_subscribe_outputs",
110                help: "The total number of different subscribe outputs used",
111                var_labels: ["session_type", "subscribe_output"],
112            )),
113            canceled_peeks: registry.register(metric!(
114                name: "mz_canceled_peeks_total",
115                help: "The total number of canceled peeks since process start.",
116            )),
117            linearize_message_seconds: registry.register(metric!(
118                name: "mz_linearize_message_seconds",
119                help: "The number of seconds it takes to linearize strict serializable messages",
120                var_labels: ["type", "immediately_handled"],
121                buckets: histogram_seconds_buckets(0.000_128, 8.0),
122            )),
123            time_to_first_row_seconds: registry.register(metric! {
124                name: "mz_time_to_first_row_seconds",
125                help: "Latency of an execute for a successful query from pgwire's perspective",
126                var_labels: ["instance_id", "isolation_level", "strategy"],
127                buckets: histogram_seconds_buckets(0.000_128, 32.0)
128            }),
129            statement_logging_records: registry.register(metric! {
130                name: "mz_statement_logging_record_count",
131                help: "The total number of SQL statements tagged with whether or not they were recorded.",
132                var_labels: ["sample"],
133            }),
134            statement_logging_unsampled_bytes: registry.register(metric!(
135                name: "mz_statement_logging_unsampled_bytes",
136                help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
137            )),
138            statement_logging_actual_bytes: registry.register(metric!(
139                name: "mz_statement_logging_actual_bytes",
140                help: "The total amount of SQL text that was logged by statement logging.",
141            )),
142            message_batch: registry.register(metric!(
143                name: "mz_coordinator_message_batch_size",
144                help: "Message batch size handled by the coordinator.",
145                buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
146            )),
147            message_handling: registry.register(metric!(
148                name: "mz_slow_message_handling",
149                help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
150                var_labels: ["message_kind"],
151                buckets: histogram_seconds_buckets(0.000_128, 512.0),
152            )),
153            optimization_notices: registry.register(metric!(
154                name: "mz_optimization_notices",
155                help: "Number of optimization notices per notice type.",
156                var_labels: ["notice_type"],
157            )),
158            append_table_duration_seconds: registry.register(metric!(
159                name: "mz_append_table_duration_seconds",
160                help: "Latency for appending to any (user or system) table.",
161                buckets: histogram_seconds_buckets(0.128, 32.0),
162            )),
163            webhook_validation_reduce_failures: registry.register(metric!(
164                name: "mz_webhook_validation_reduce_failures",
165                help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
166                var_labels: ["reason"],
167            )),
168            webhook_get_appender: registry.register(metric!(
169                name: "mz_webhook_get_appender_count",
170                help: "Count of getting a webhook appender from the Coordinator.",
171            )),
172            check_scheduling_policies_seconds: registry.register(metric!(
173                name: "mz_check_scheduling_policies_seconds",
174                help: "The time each policy in `check_scheduling_policies` takes.",
175                var_labels: ["policy", "thread"],
176                buckets: histogram_seconds_buckets(0.000_128, 8.0),
177            )),
178            handle_scheduling_decisions_seconds: registry.register(metric!(
179                name: "mz_handle_scheduling_decisions_seconds",
180                help: "The time `handle_scheduling_decisions` takes.",
181                var_labels: ["altered_a_cluster"],
182                buckets: histogram_seconds_buckets(0.000_128, 8.0),
183            )),
184            row_set_finishing_seconds: registry.register(metric!(
185                name: "mz_row_set_finishing_seconds",
186                help: "The time it takes to run RowSetFinishing::finish.",
187                buckets: histogram_seconds_buckets(0.000_128, 16.0),
188            )),
189            session_startup_table_writes_seconds: registry.register(metric!(
190                name: "mz_session_startup_table_writes_seconds",
191                help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
192                buckets: histogram_seconds_buckets(0.000_008, 4.0),
193            )),
194            parse_seconds: registry.register(metric!(
195                name: "mz_parse_seconds",
196                help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
197                buckets: histogram_seconds_buckets(0.001, 8.0),
198            )),
199            pgwire_message_processing_seconds: registry.register(metric!(
200                name: "mz_pgwire_message_processing_seconds",
201                help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
202                var_labels: ["message_type"],
203                buckets: histogram_seconds_buckets(0.001, 512.0),
204            )),
205            result_rows_first_to_last_byte_seconds: registry.register(metric!(
206                name: "mz_result_rows_first_to_last_byte_seconds",
207                help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
208                var_labels: ["statement_type"],
209                buckets: histogram_seconds_buckets(0.001, 8192.0),
210            )),
211            pgwire_ensure_transaction_seconds: registry.register(metric!(
212                name: "mz_pgwire_ensure_transaction_seconds",
213                help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
214                var_labels: ["message_type"],
215                buckets: histogram_seconds_buckets(0.001, 512.0),
216            )),
217            catalog_snapshot_seconds: registry.register(metric!(
218                name: "mz_catalog_snapshot_seconds",
219                help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
220                var_labels: ["context"],
221                buckets: histogram_seconds_buckets(0.001, 512.0),
222            )),
223            pgwire_recv_scheduling_delay_ms: registry.register(metric!(
224                name: "mz_pgwire_recv_scheduling_delay_ms",
225                help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
226                var_labels: ["message_type"],
227                buckets: histogram_milliseconds_buckets(0.128, 512000.),
228            )),
229            catalog_transact_seconds: registry.register(metric!(
230                name: "mz_catalog_transact_seconds",
231                help: "The time it takes to run various catalog transact methods.",
232                var_labels: ["method"],
233                buckets: histogram_seconds_buckets(0.001, 32.0),
234            )),
235            apply_catalog_implications_seconds: registry.register(metric!(
236                name: "mz_apply_catalog_implications_seconds",
237                help: "The time it takes to apply catalog implications.",
238                buckets: histogram_seconds_buckets(0.001, 32.0),
239            )),
240            group_commit_confirm_leadership_seconds: registry.register(metric!(
241                name: "mz_group_commit_confirm_leadership_seconds",
242                help: "The time it takes to confirm leadership during group commit.",
243                buckets: histogram_seconds_buckets(0.001, 32.0),
244            )),
245            group_commit_table_advancement_seconds: registry.register(metric!(
246                name: "mz_group_commit_table_advancement_seconds",
247                help: "The time it takes to iterate over all catalog entries to find tables during group commit.",
248                buckets: histogram_seconds_buckets(0.001, 32.0),
249            ))
250        }
251    }
252
253    pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
254        self.row_set_finishing_seconds.clone()
255    }
256
257    pub(crate) fn session_metrics(&self) -> SessionMetrics {
258        SessionMetrics {
259            row_set_finishing_seconds: self.row_set_finishing_seconds(),
260            session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
261            query_total: self.query_total.clone(),
262            determine_timestamp: self.determine_timestamp.clone(),
263            timestamp_difference_for_strict_serializable_ms: self
264                .timestamp_difference_for_strict_serializable_ms
265                .clone(),
266            optimization_notices: self.optimization_notices.clone(),
267            statement_logging_records: self.statement_logging_records.clone(),
268            statement_logging_unsampled_bytes: self.statement_logging_unsampled_bytes.clone(),
269            statement_logging_actual_bytes: self.statement_logging_actual_bytes.clone(),
270        }
271    }
272}
273
274/// Metrics to be accessed from a [`crate::session::Session`].
275#[derive(Debug, Clone)]
276pub struct SessionMetrics {
277    row_set_finishing_seconds: Histogram,
278    session_startup_table_writes_seconds: Histogram,
279    query_total: IntCounterVec,
280    determine_timestamp: IntCounterVec,
281    timestamp_difference_for_strict_serializable_ms: HistogramVec,
282    optimization_notices: IntCounterVec,
283    statement_logging_records: IntCounterVec,
284    statement_logging_unsampled_bytes: IntCounter,
285    statement_logging_actual_bytes: IntCounter,
286}
287
288impl SessionMetrics {
289    pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
290        &self.row_set_finishing_seconds
291    }
292
293    pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
294        &self.session_startup_table_writes_seconds
295    }
296
297    pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
298        self.query_total.with_label_values(label_values)
299    }
300
301    pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
302        self.determine_timestamp.with_label_values(label_values)
303    }
304
305    pub(crate) fn timestamp_difference_for_strict_serializable_ms(
306        &self,
307        label_values: &[&str],
308    ) -> Histogram {
309        self.timestamp_difference_for_strict_serializable_ms
310            .with_label_values(label_values)
311    }
312
313    pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
314        self.optimization_notices.with_label_values(label_values)
315    }
316
317    pub(crate) fn statement_logging_records(
318        &self,
319        label_values: &[&str],
320    ) -> GenericCounter<AtomicU64> {
321        self.statement_logging_records
322            .with_label_values(label_values)
323    }
324
325    pub(crate) fn statement_logging_unsampled_bytes(&self) -> &IntCounter {
326        &self.statement_logging_unsampled_bytes
327    }
328
329    pub(crate) fn statement_logging_actual_bytes(&self) -> &IntCounter {
330        &self.statement_logging_actual_bytes
331    }
332}
333
334pub(crate) fn session_type_label_value(user: &User) -> &'static str {
335    match user.is_internal() {
336        true => "system",
337        false => "user",
338    }
339}
340
341pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
342where
343    T: AstInfo,
344{
345    statement_kind_label_value(StatementKind::from(stmt))
346}
347
348pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
349where
350    T: AstInfo,
351{
352    match output {
353        SubscribeOutput::Diffs => "diffs",
354        SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
355        SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
356        SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
357    }
358}