mz_adapter/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::metric;
11use mz_ore::metrics::MetricsRegistry;
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
17
18#[derive(Debug, Clone)]
19pub struct Metrics {
20    pub query_total: IntCounterVec,
21    pub active_sessions: IntGaugeVec,
22    pub active_subscribes: IntGaugeVec,
23    pub active_copy_tos: IntGaugeVec,
24    pub queue_busy_seconds: HistogramVec,
25    pub determine_timestamp: IntCounterVec,
26    pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
27    pub commands: IntCounterVec,
28    pub storage_usage_collection_time_seconds: HistogramVec,
29    pub subscribe_outputs: IntCounterVec,
30    pub canceled_peeks: IntCounterVec,
31    pub linearize_message_seconds: HistogramVec,
32    pub time_to_first_row_seconds: HistogramVec,
33    pub statement_logging_records: IntCounterVec,
34    pub statement_logging_unsampled_bytes: IntCounterVec,
35    pub statement_logging_actual_bytes: IntCounterVec,
36    pub message_batch: HistogramVec,
37    pub message_handling: HistogramVec,
38    pub optimization_notices: IntCounterVec,
39    pub append_table_duration_seconds: HistogramVec,
40    pub webhook_validation_reduce_failures: IntCounterVec,
41    pub webhook_get_appender: IntCounter,
42    pub check_scheduling_policies_seconds: HistogramVec,
43    pub handle_scheduling_decisions_seconds: HistogramVec,
44    pub row_set_finishing_seconds: HistogramVec,
45    pub session_startup_table_writes_seconds: HistogramVec,
46    pub parse_seconds: HistogramVec,
47    pub pgwire_message_processing_seconds: HistogramVec,
48    pub result_rows_first_to_last_byte_seconds: HistogramVec,
49    pub pgwire_ensure_transaction_seconds: HistogramVec,
50    pub catalog_snapshot_seconds: HistogramVec,
51    pub pgwire_recv_scheduling_delay_ms: HistogramVec,
52}
53
54impl Metrics {
55    pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
56        Self {
57            query_total: registry.register(metric!(
58                name: "mz_query_total",
59                help: "The total number of queries issued of the given type since process start.",
60                var_labels: ["session_type", "statement_type"],
61            )),
62            active_sessions: registry.register(metric!(
63                name: "mz_active_sessions",
64                help: "The number of active coordinator sessions.",
65                var_labels: ["session_type"],
66            )),
67            active_subscribes: registry.register(metric!(
68                name: "mz_active_subscribes",
69                help: "The number of active SUBSCRIBE queries.",
70                var_labels: ["session_type"],
71            )),
72            active_copy_tos: registry.register(metric!(
73                name: "mz_active_copy_tos",
74                help: "The number of active COPY TO queries.",
75                var_labels: ["session_type"],
76            )),
77            queue_busy_seconds: registry.register(metric!(
78                name: "mz_coord_queue_busy_seconds",
79                help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
80                buckets: histogram_seconds_buckets(0.000_128, 32.0)
81            )),
82            determine_timestamp: registry.register(metric!(
83                name: "mz_determine_timestamp",
84                help: "The total number of calls to determine_timestamp.",
85                var_labels:["respond_immediately", "isolation_level", "compute_instance", "determination_method"],
86            )),
87            timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
88                name: "mz_timestamp_difference_for_strict_serializable_ms",
89                help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
90                var_labels:["compute_instance", "determination_method"],
91                buckets: histogram_milliseconds_buckets(1., 8000.),
92            )),
93            commands: registry.register(metric!(
94                name: "mz_adapter_commands",
95                help: "The total number of adapter commands issued of the given type since process start.",
96                var_labels: ["command_type", "status", "application_name"],
97            )),
98            storage_usage_collection_time_seconds: registry.register(metric!(
99                name: "mz_storage_usage_collection_time_seconds",
100                help: "The number of seconds the coord spends collecting usage metrics from storage.",
101                buckets: histogram_seconds_buckets(0.000_128, 8.0)
102            )),
103            subscribe_outputs: registry.register(metric!(
104                name: "mz_subscribe_outputs",
105                help: "The total number of different subscribe outputs used",
106                var_labels: ["session_type", "subscribe_output"],
107            )),
108            canceled_peeks: registry.register(metric!(
109                name: "mz_canceled_peeks_total",
110                help: "The total number of canceled peeks since process start.",
111            )),
112            linearize_message_seconds: registry.register(metric!(
113                name: "mz_linearize_message_seconds",
114                help: "The number of seconds it takes to linearize strict serializable messages",
115                var_labels: ["type", "immediately_handled"],
116                buckets: histogram_seconds_buckets(0.000_128, 8.0),
117            )),
118            time_to_first_row_seconds: registry.register(metric! {
119                name: "mz_time_to_first_row_seconds",
120                help: "Latency of an execute for a successful query from pgwire's perspective",
121                var_labels: ["instance_id", "isolation_level", "strategy"],
122                buckets: histogram_seconds_buckets(0.000_128, 32.0)
123            }),
124            statement_logging_records: registry.register(metric! {
125                name: "mz_statement_logging_record_count",
126                help: "The total number of SQL statements tagged with whether or not they were recorded.",
127                var_labels: ["sample"],
128            }),
129            statement_logging_unsampled_bytes: registry.register(metric!(
130                name: "mz_statement_logging_unsampled_bytes",
131                help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
132            )),
133            statement_logging_actual_bytes: registry.register(metric!(
134                name: "mz_statement_logging_actual_bytes",
135                help: "The total amount of SQL text that was logged by statement logging.",
136            )),
137            message_batch: registry.register(metric!(
138                name: "mz_coordinator_message_batch_size",
139                help: "Message batch size handled by the coordinator.",
140                buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
141            )),
142            message_handling: registry.register(metric!(
143                name: "mz_slow_message_handling",
144                help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
145                var_labels: ["message_kind"],
146                buckets: histogram_seconds_buckets(0.000_128, 512.0),
147            )),
148            optimization_notices: registry.register(metric!(
149                name: "mz_optimization_notices",
150                help: "Number of optimization notices per notice type.",
151                var_labels: ["notice_type"],
152            )),
153            append_table_duration_seconds: registry.register(metric!(
154                name: "mz_append_table_duration_seconds",
155                help: "Latency for appending to any (user or system) table.",
156                buckets: histogram_seconds_buckets(0.128, 32.0),
157            )),
158            webhook_validation_reduce_failures: registry.register(metric!(
159                name: "mz_webhook_validation_reduce_failures",
160                help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
161                var_labels: ["reason"],
162            )),
163            webhook_get_appender: registry.register(metric!(
164                name: "mz_webhook_get_appender_count",
165                help: "Count of getting a webhook appender from the Coordinator.",
166            )),
167            check_scheduling_policies_seconds: registry.register(metric!(
168                name: "mz_check_scheduling_policies_seconds",
169                help: "The time each policy in `check_scheduling_policies` takes.",
170                var_labels: ["policy", "thread"],
171                buckets: histogram_seconds_buckets(0.000_128, 8.0),
172            )),
173            handle_scheduling_decisions_seconds: registry.register(metric!(
174                name: "mz_handle_scheduling_decisions_seconds",
175                help: "The time `handle_scheduling_decisions` takes.",
176                var_labels: ["altered_a_cluster"],
177                buckets: histogram_seconds_buckets(0.000_128, 8.0),
178            )),
179            row_set_finishing_seconds: registry.register(metric!(
180                name: "mz_row_set_finishing_seconds",
181                help: "The time it takes to run RowSetFinishing::finish.",
182                buckets: histogram_seconds_buckets(0.000_128, 16.0),
183            )),
184            session_startup_table_writes_seconds: registry.register(metric!(
185                name: "mz_session_startup_table_writes_seconds",
186                help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
187                buckets: histogram_seconds_buckets(0.000_008, 4.0),
188            )),
189            parse_seconds: registry.register(metric!(
190                name: "mz_parse_seconds",
191                help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
192                buckets: histogram_seconds_buckets(0.001, 8.0),
193            )),
194            pgwire_message_processing_seconds: registry.register(metric!(
195                name: "mz_pgwire_message_processing_seconds",
196                help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
197                var_labels: ["message_type"],
198                buckets: histogram_seconds_buckets(0.001, 512.0),
199            )),
200            result_rows_first_to_last_byte_seconds: registry.register(metric!(
201                name: "mz_result_rows_first_to_last_byte_seconds",
202                help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
203                var_labels: ["statement_type"],
204                buckets: histogram_seconds_buckets(0.001, 8192.0),
205            )),
206            pgwire_ensure_transaction_seconds: registry.register(metric!(
207                name: "mz_pgwire_ensure_transaction_seconds",
208                help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
209                var_labels: ["message_type"],
210                buckets: histogram_seconds_buckets(0.001, 512.0),
211            )),
212            catalog_snapshot_seconds: registry.register(metric!(
213                name: "mz_catalog_snapshot_seconds",
214                help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
215                var_labels: ["context"],
216                buckets: histogram_seconds_buckets(0.001, 512.0),
217            )),
218            pgwire_recv_scheduling_delay_ms: registry.register(metric!(
219                name: "mz_pgwire_recv_scheduling_delay_ms",
220                help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
221                var_labels: ["message_type"],
222                buckets: histogram_milliseconds_buckets(0.128, 512000.),
223            ))
224        }
225    }
226
227    pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
228        self.row_set_finishing_seconds.with_label_values(&[])
229    }
230
231    pub(crate) fn session_metrics(&self) -> SessionMetrics {
232        SessionMetrics {
233            row_set_finishing_seconds: self.row_set_finishing_seconds(),
234            session_startup_table_writes_seconds: self
235                .session_startup_table_writes_seconds
236                .with_label_values(&[]),
237        }
238    }
239}
240
241/// Metrics associated with a [`crate::session::Session`].
242#[derive(Debug, Clone)]
243pub struct SessionMetrics {
244    row_set_finishing_seconds: Histogram,
245    session_startup_table_writes_seconds: Histogram,
246}
247
248impl SessionMetrics {
249    pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
250        &self.row_set_finishing_seconds
251    }
252
253    pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
254        &self.session_startup_table_writes_seconds
255    }
256}
257
258pub(crate) fn session_type_label_value(user: &User) -> &'static str {
259    match user.is_internal() {
260        true => "system",
261        false => "user",
262    }
263}
264
265pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
266where
267    T: AstInfo,
268{
269    statement_kind_label_value(StatementKind::from(stmt))
270}
271
272pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
273where
274    T: AstInfo,
275{
276    match output {
277        SubscribeOutput::Diffs => "diffs",
278        SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
279        SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
280        SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
281    }
282}