mz_adapter/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::metric;
11use mz_ore::metrics::MetricsRegistry;
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21    pub query_total: IntCounterVec,
22    pub active_sessions: IntGaugeVec,
23    pub active_subscribes: IntGaugeVec,
24    pub active_copy_tos: IntGaugeVec,
25    pub queue_busy_seconds: Histogram,
26    pub determine_timestamp: IntCounterVec,
27    pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28    pub commands: IntCounterVec,
29    pub storage_usage_collection_time_seconds: Histogram,
30    pub subscribe_outputs: IntCounterVec,
31    pub canceled_peeks: IntCounter,
32    pub linearize_message_seconds: HistogramVec,
33    pub time_to_first_row_seconds: HistogramVec,
34    pub statement_logging_records: IntCounterVec,
35    pub statement_logging_unsampled_bytes: IntCounter,
36    pub statement_logging_actual_bytes: IntCounter,
37    pub message_batch: Histogram,
38    pub message_handling: HistogramVec,
39    pub optimization_notices: IntCounterVec,
40    pub append_table_duration_seconds: Histogram,
41    pub webhook_validation_reduce_failures: IntCounterVec,
42    pub webhook_get_appender: IntCounter,
43    pub check_scheduling_policies_seconds: HistogramVec,
44    pub handle_scheduling_decisions_seconds: HistogramVec,
45    pub row_set_finishing_seconds: Histogram,
46    pub session_startup_table_writes_seconds: Histogram,
47    pub parse_seconds: Histogram,
48    pub pgwire_message_processing_seconds: HistogramVec,
49    pub result_rows_first_to_last_byte_seconds: HistogramVec,
50    pub pgwire_ensure_transaction_seconds: HistogramVec,
51    pub catalog_snapshot_seconds: HistogramVec,
52    pub pgwire_recv_scheduling_delay_ms: HistogramVec,
53    pub catalog_transact_seconds: HistogramVec,
54    pub apply_catalog_implications_seconds: Histogram,
55}
56
57impl Metrics {
58    pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
59        Self {
60            query_total: registry.register(metric!(
61                name: "mz_query_total",
62                help: "The total number of queries issued of the given type since process start.",
63                var_labels: ["session_type", "statement_type"],
64            )),
65            active_sessions: registry.register(metric!(
66                name: "mz_active_sessions",
67                help: "The number of active coordinator sessions.",
68                var_labels: ["session_type"],
69            )),
70            active_subscribes: registry.register(metric!(
71                name: "mz_active_subscribes",
72                help: "The number of active SUBSCRIBE queries.",
73                var_labels: ["session_type"],
74            )),
75            active_copy_tos: registry.register(metric!(
76                name: "mz_active_copy_tos",
77                help: "The number of active COPY TO queries.",
78                var_labels: ["session_type"],
79            )),
80            queue_busy_seconds: registry.register(metric!(
81                name: "mz_coord_queue_busy_seconds",
82                help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
83                buckets: histogram_seconds_buckets(0.000_128, 32.0)
84            )),
85            determine_timestamp: registry.register(metric!(
86                name: "mz_determine_timestamp",
87                help: "The total number of calls to determine_timestamp.",
88                var_labels:["respond_immediately", "isolation_level", "compute_instance", "determination_method"],
89            )),
90            timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
91                name: "mz_timestamp_difference_for_strict_serializable_ms",
92                help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
93                var_labels:["compute_instance", "determination_method"],
94                buckets: histogram_milliseconds_buckets(1., 8000.),
95            )),
96            commands: registry.register(metric!(
97                name: "mz_adapter_commands",
98                help: "The total number of adapter commands issued of the given type since process start.",
99                var_labels: ["command_type", "status", "application_name"],
100            )),
101            storage_usage_collection_time_seconds: registry.register(metric!(
102                name: "mz_storage_usage_collection_time_seconds",
103                help: "The number of seconds the coord spends collecting usage metrics from storage.",
104                buckets: histogram_seconds_buckets(0.000_128, 8.0)
105            )),
106            subscribe_outputs: registry.register(metric!(
107                name: "mz_subscribe_outputs",
108                help: "The total number of different subscribe outputs used",
109                var_labels: ["session_type", "subscribe_output"],
110            )),
111            canceled_peeks: registry.register(metric!(
112                name: "mz_canceled_peeks_total",
113                help: "The total number of canceled peeks since process start.",
114            )),
115            linearize_message_seconds: registry.register(metric!(
116                name: "mz_linearize_message_seconds",
117                help: "The number of seconds it takes to linearize strict serializable messages",
118                var_labels: ["type", "immediately_handled"],
119                buckets: histogram_seconds_buckets(0.000_128, 8.0),
120            )),
121            time_to_first_row_seconds: registry.register(metric! {
122                name: "mz_time_to_first_row_seconds",
123                help: "Latency of an execute for a successful query from pgwire's perspective",
124                var_labels: ["instance_id", "isolation_level", "strategy"],
125                buckets: histogram_seconds_buckets(0.000_128, 32.0)
126            }),
127            statement_logging_records: registry.register(metric! {
128                name: "mz_statement_logging_record_count",
129                help: "The total number of SQL statements tagged with whether or not they were recorded.",
130                var_labels: ["sample"],
131            }),
132            statement_logging_unsampled_bytes: registry.register(metric!(
133                name: "mz_statement_logging_unsampled_bytes",
134                help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
135            )),
136            statement_logging_actual_bytes: registry.register(metric!(
137                name: "mz_statement_logging_actual_bytes",
138                help: "The total amount of SQL text that was logged by statement logging.",
139            )),
140            message_batch: registry.register(metric!(
141                name: "mz_coordinator_message_batch_size",
142                help: "Message batch size handled by the coordinator.",
143                buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
144            )),
145            message_handling: registry.register(metric!(
146                name: "mz_slow_message_handling",
147                help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
148                var_labels: ["message_kind"],
149                buckets: histogram_seconds_buckets(0.000_128, 512.0),
150            )),
151            optimization_notices: registry.register(metric!(
152                name: "mz_optimization_notices",
153                help: "Number of optimization notices per notice type.",
154                var_labels: ["notice_type"],
155            )),
156            append_table_duration_seconds: registry.register(metric!(
157                name: "mz_append_table_duration_seconds",
158                help: "Latency for appending to any (user or system) table.",
159                buckets: histogram_seconds_buckets(0.128, 32.0),
160            )),
161            webhook_validation_reduce_failures: registry.register(metric!(
162                name: "mz_webhook_validation_reduce_failures",
163                help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
164                var_labels: ["reason"],
165            )),
166            webhook_get_appender: registry.register(metric!(
167                name: "mz_webhook_get_appender_count",
168                help: "Count of getting a webhook appender from the Coordinator.",
169            )),
170            check_scheduling_policies_seconds: registry.register(metric!(
171                name: "mz_check_scheduling_policies_seconds",
172                help: "The time each policy in `check_scheduling_policies` takes.",
173                var_labels: ["policy", "thread"],
174                buckets: histogram_seconds_buckets(0.000_128, 8.0),
175            )),
176            handle_scheduling_decisions_seconds: registry.register(metric!(
177                name: "mz_handle_scheduling_decisions_seconds",
178                help: "The time `handle_scheduling_decisions` takes.",
179                var_labels: ["altered_a_cluster"],
180                buckets: histogram_seconds_buckets(0.000_128, 8.0),
181            )),
182            row_set_finishing_seconds: registry.register(metric!(
183                name: "mz_row_set_finishing_seconds",
184                help: "The time it takes to run RowSetFinishing::finish.",
185                buckets: histogram_seconds_buckets(0.000_128, 16.0),
186            )),
187            session_startup_table_writes_seconds: registry.register(metric!(
188                name: "mz_session_startup_table_writes_seconds",
189                help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
190                buckets: histogram_seconds_buckets(0.000_008, 4.0),
191            )),
192            parse_seconds: registry.register(metric!(
193                name: "mz_parse_seconds",
194                help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
195                buckets: histogram_seconds_buckets(0.001, 8.0),
196            )),
197            pgwire_message_processing_seconds: registry.register(metric!(
198                name: "mz_pgwire_message_processing_seconds",
199                help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
200                var_labels: ["message_type"],
201                buckets: histogram_seconds_buckets(0.001, 512.0),
202            )),
203            result_rows_first_to_last_byte_seconds: registry.register(metric!(
204                name: "mz_result_rows_first_to_last_byte_seconds",
205                help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
206                var_labels: ["statement_type"],
207                buckets: histogram_seconds_buckets(0.001, 8192.0),
208            )),
209            pgwire_ensure_transaction_seconds: registry.register(metric!(
210                name: "mz_pgwire_ensure_transaction_seconds",
211                help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
212                var_labels: ["message_type"],
213                buckets: histogram_seconds_buckets(0.001, 512.0),
214            )),
215            catalog_snapshot_seconds: registry.register(metric!(
216                name: "mz_catalog_snapshot_seconds",
217                help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
218                var_labels: ["context"],
219                buckets: histogram_seconds_buckets(0.001, 512.0),
220            )),
221            pgwire_recv_scheduling_delay_ms: registry.register(metric!(
222                name: "mz_pgwire_recv_scheduling_delay_ms",
223                help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
224                var_labels: ["message_type"],
225                buckets: histogram_milliseconds_buckets(0.128, 512000.),
226            )),
227            catalog_transact_seconds: registry.register(metric!(
228                name: "mz_catalog_transact_seconds",
229                help: "The time it takes to run various catalog transact methods.",
230                var_labels: ["method"],
231                buckets: histogram_seconds_buckets(0.001, 32.0),
232            )),
233            apply_catalog_implications_seconds: registry.register(metric!(
234                name: "mz_apply_catalog_implications_seconds",
235                help: "The time it takes to apply catalog implications.",
236                buckets: histogram_seconds_buckets(0.001, 32.0),
237            ))
238        }
239    }
240
241    pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
242        self.row_set_finishing_seconds.clone()
243    }
244
245    pub(crate) fn session_metrics(&self) -> SessionMetrics {
246        SessionMetrics {
247            row_set_finishing_seconds: self.row_set_finishing_seconds(),
248            session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
249            query_total: self.query_total.clone(),
250            determine_timestamp: self.determine_timestamp.clone(),
251            timestamp_difference_for_strict_serializable_ms: self
252                .timestamp_difference_for_strict_serializable_ms
253                .clone(),
254            optimization_notices: self.optimization_notices.clone(),
255        }
256    }
257}
258
259/// Metrics associated with a [`crate::session::Session`].
260#[derive(Debug, Clone)]
261pub struct SessionMetrics {
262    row_set_finishing_seconds: Histogram,
263    session_startup_table_writes_seconds: Histogram,
264    query_total: IntCounterVec,
265    determine_timestamp: IntCounterVec,
266    timestamp_difference_for_strict_serializable_ms: HistogramVec,
267    optimization_notices: IntCounterVec,
268}
269
270impl SessionMetrics {
271    pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
272        &self.row_set_finishing_seconds
273    }
274
275    pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
276        &self.session_startup_table_writes_seconds
277    }
278
279    pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
280        self.query_total.with_label_values(label_values)
281    }
282
283    pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
284        self.determine_timestamp.with_label_values(label_values)
285    }
286
287    pub(crate) fn timestamp_difference_for_strict_serializable_ms(
288        &self,
289        label_values: &[&str],
290    ) -> Histogram {
291        self.timestamp_difference_for_strict_serializable_ms
292            .with_label_values(label_values)
293    }
294
295    pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
296        self.optimization_notices.with_label_values(label_values)
297    }
298}
299
300pub(crate) fn session_type_label_value(user: &User) -> &'static str {
301    match user.is_internal() {
302        true => "system",
303        false => "user",
304    }
305}
306
307pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
308where
309    T: AstInfo,
310{
311    statement_kind_label_value(StatementKind::from(stmt))
312}
313
314pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
315where
316    T: AstInfo,
317{
318    match output {
319        SubscribeOutput::Diffs => "diffs",
320        SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
321        SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
322        SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
323    }
324}