Skip to main content

mz_adapter/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::metric;
11use mz_ore::metrics::MetricsRegistry;
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21    pub query_total: IntCounterVec,
22    pub active_sessions: IntGaugeVec,
23    pub active_subscribes: IntGaugeVec,
24    pub active_copy_tos: IntGaugeVec,
25    pub queue_busy_seconds: Histogram,
26    pub determine_timestamp: IntCounterVec,
27    pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28    pub commands: IntCounterVec,
29    pub storage_usage_collection_time_seconds: Histogram,
30    pub arrangement_sizes_collection_time_seconds: Histogram,
31    pub arrangement_sizes_rows_written: IntCounter,
32    pub subscribe_outputs: IntCounterVec,
33    pub canceled_peeks: IntCounter,
34    pub linearize_message_seconds: HistogramVec,
35    pub time_to_first_row_seconds: HistogramVec,
36    pub statement_logging_records: IntCounterVec,
37    pub statement_logging_unsampled_bytes: IntCounter,
38    pub statement_logging_actual_bytes: IntCounter,
39    pub message_batch: Histogram,
40    pub message_handling: HistogramVec,
41    pub optimization_notices: IntCounterVec,
42    pub append_table_duration_seconds: Histogram,
43    pub webhook_validation_reduce_failures: IntCounterVec,
44    pub webhook_get_appender: IntCounter,
45    pub check_scheduling_policies_seconds: HistogramVec,
46    pub handle_scheduling_decisions_seconds: HistogramVec,
47    pub row_set_finishing_seconds: Histogram,
48    pub session_startup_table_writes_seconds: Histogram,
49    pub parse_seconds: Histogram,
50    pub pgwire_message_processing_seconds: HistogramVec,
51    pub result_rows_first_to_last_byte_seconds: HistogramVec,
52    pub pgwire_ensure_transaction_seconds: HistogramVec,
53    pub catalog_snapshot_seconds: HistogramVec,
54    pub pgwire_recv_scheduling_delay_ms: HistogramVec,
55    pub catalog_transact_seconds: HistogramVec,
56    pub apply_catalog_implications_seconds: Histogram,
57    pub group_commit_catalog_upper_seconds: Histogram,
58    pub group_commit_table_advancement_seconds: Histogram,
59}
60
61impl Metrics {
62    pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
63        Self {
64            query_total: registry.register(metric!(
65                name: "mz_query_total",
66                help: "The total number of queries issued of the given type since process start.",
67                var_labels: ["session_type", "statement_type"],
68            )),
69            active_sessions: registry.register(metric!(
70                name: "mz_active_sessions",
71                help: "The number of active coordinator sessions.",
72                var_labels: ["session_type"],
73            )),
74            active_subscribes: registry.register(metric!(
75                name: "mz_active_subscribes",
76                help: "The number of active SUBSCRIBE queries.",
77                var_labels: ["session_type"],
78            )),
79            active_copy_tos: registry.register(metric!(
80                name: "mz_active_copy_tos",
81                help: "The number of active COPY TO queries.",
82                var_labels: ["session_type"],
83            )),
84            queue_busy_seconds: registry.register(metric!(
85                name: "mz_coord_queue_busy_seconds",
86                help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
87                buckets: histogram_seconds_buckets(0.000_128, 32.0)
88            )),
89            determine_timestamp: registry.register(metric!(
90                name: "mz_determine_timestamp",
91                help: "The total number of calls to determine_timestamp.",
92                var_labels:["respond_immediately", "isolation_level", "compute_instance"],
93            )),
94            timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
95                name: "mz_timestamp_difference_for_strict_serializable_ms",
96                help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
97                var_labels:["compute_instance"],
98                buckets: histogram_milliseconds_buckets(1., 8000.),
99            )),
100            commands: registry.register(metric!(
101                name: "mz_adapter_commands",
102                help: "The total number of adapter commands issued of the given type since process start.",
103                var_labels: ["command_type", "status", "application_name"],
104            )),
105            storage_usage_collection_time_seconds: registry.register(metric!(
106                name: "mz_storage_usage_collection_time_seconds",
107                help: "The number of seconds the coord spends collecting usage metrics from storage.",
108                buckets: histogram_seconds_buckets(0.000_128, 8.0)
109            )),
110            arrangement_sizes_collection_time_seconds: registry.register(metric!(
111                name: "mz_arrangement_sizes_collection_time_seconds",
112                help: "Seconds to read mz_object_arrangement_sizes and prepare history-table updates for one snapshot.",
113                buckets: histogram_seconds_buckets(0.000_128, 8.0)
114            )),
115            arrangement_sizes_rows_written: registry.register(metric!(
116                name: "mz_arrangement_sizes_rows_written_total",
117                help: "Total rows appended to mz_object_arrangement_size_history since process start.",
118            )),
119            subscribe_outputs: registry.register(metric!(
120                name: "mz_subscribe_outputs",
121                help: "The total number of different subscribe outputs used",
122                var_labels: ["session_type", "subscribe_output"],
123            )),
124            canceled_peeks: registry.register(metric!(
125                name: "mz_canceled_peeks_total",
126                help: "The total number of canceled peeks since process start.",
127            )),
128            linearize_message_seconds: registry.register(metric!(
129                name: "mz_linearize_message_seconds",
130                help: "The number of seconds it takes to linearize strict serializable messages",
131                var_labels: ["type", "immediately_handled"],
132                buckets: histogram_seconds_buckets(0.000_128, 8.0),
133            )),
134            time_to_first_row_seconds: registry.register(metric! {
135                name: "mz_time_to_first_row_seconds",
136                help: "Latency of an execute for a successful query from pgwire's perspective",
137                var_labels: ["instance_id", "isolation_level", "strategy"],
138                buckets: histogram_seconds_buckets(0.000_128, 32.0)
139            }),
140            statement_logging_records: registry.register(metric! {
141                name: "mz_statement_logging_record_count",
142                help: "The total number of SQL statements tagged with whether or not they were recorded.",
143                var_labels: ["sample"],
144            }),
145            statement_logging_unsampled_bytes: registry.register(metric!(
146                name: "mz_statement_logging_unsampled_bytes",
147                help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
148            )),
149            statement_logging_actual_bytes: registry.register(metric!(
150                name: "mz_statement_logging_actual_bytes",
151                help: "The total amount of SQL text that was logged by statement logging.",
152            )),
153            message_batch: registry.register(metric!(
154                name: "mz_coordinator_message_batch_size",
155                help: "Message batch size handled by the coordinator.",
156                buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
157            )),
158            message_handling: registry.register(metric!(
159                name: "mz_slow_message_handling",
160                help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
161                var_labels: ["message_kind"],
162                buckets: histogram_seconds_buckets(0.000_128, 512.0),
163            )),
164            optimization_notices: registry.register(metric!(
165                name: "mz_optimization_notices",
166                help: "Number of optimization notices per notice type.",
167                var_labels: ["notice_type"],
168            )),
169            append_table_duration_seconds: registry.register(metric!(
170                name: "mz_append_table_duration_seconds",
171                help: "Latency for appending to any (user or system) table.",
172                buckets: histogram_seconds_buckets(0.128, 32.0),
173            )),
174            webhook_validation_reduce_failures: registry.register(metric!(
175                name: "mz_webhook_validation_reduce_failures",
176                help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
177                var_labels: ["reason"],
178            )),
179            webhook_get_appender: registry.register(metric!(
180                name: "mz_webhook_get_appender_count",
181                help: "Count of getting a webhook appender from the Coordinator.",
182            )),
183            check_scheduling_policies_seconds: registry.register(metric!(
184                name: "mz_check_scheduling_policies_seconds",
185                help: "The time each policy in `check_scheduling_policies` takes.",
186                var_labels: ["policy", "thread"],
187                buckets: histogram_seconds_buckets(0.000_128, 8.0),
188            )),
189            handle_scheduling_decisions_seconds: registry.register(metric!(
190                name: "mz_handle_scheduling_decisions_seconds",
191                help: "The time `handle_scheduling_decisions` takes.",
192                var_labels: ["altered_a_cluster"],
193                buckets: histogram_seconds_buckets(0.000_128, 8.0),
194            )),
195            row_set_finishing_seconds: registry.register(metric!(
196                name: "mz_row_set_finishing_seconds",
197                help: "The time it takes to run RowSetFinishing::finish.",
198                buckets: histogram_seconds_buckets(0.000_128, 16.0),
199            )),
200            session_startup_table_writes_seconds: registry.register(metric!(
201                name: "mz_session_startup_table_writes_seconds",
202                help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
203                buckets: histogram_seconds_buckets(0.000_008, 4.0),
204            )),
205            parse_seconds: registry.register(metric!(
206                name: "mz_parse_seconds",
207                help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
208                buckets: histogram_seconds_buckets(0.001, 8.0),
209            )),
210            pgwire_message_processing_seconds: registry.register(metric!(
211                name: "mz_pgwire_message_processing_seconds",
212                help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
213                var_labels: ["message_type"],
214                buckets: histogram_seconds_buckets(0.001, 512.0),
215            )),
216            result_rows_first_to_last_byte_seconds: registry.register(metric!(
217                name: "mz_result_rows_first_to_last_byte_seconds",
218                help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
219                var_labels: ["statement_type"],
220                buckets: histogram_seconds_buckets(0.001, 8192.0),
221            )),
222            pgwire_ensure_transaction_seconds: registry.register(metric!(
223                name: "mz_pgwire_ensure_transaction_seconds",
224                help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
225                var_labels: ["message_type"],
226                buckets: histogram_seconds_buckets(0.001, 512.0),
227            )),
228            catalog_snapshot_seconds: registry.register(metric!(
229                name: "mz_catalog_snapshot_seconds",
230                help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
231                var_labels: ["context"],
232                buckets: histogram_seconds_buckets(0.001, 512.0),
233            )),
234            pgwire_recv_scheduling_delay_ms: registry.register(metric!(
235                name: "mz_pgwire_recv_scheduling_delay_ms",
236                help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
237                var_labels: ["message_type"],
238                buckets: histogram_milliseconds_buckets(0.128, 512000.),
239            )),
240            catalog_transact_seconds: registry.register(metric!(
241                name: "mz_catalog_transact_seconds",
242                help: "The time it takes to run various catalog transact methods.",
243                var_labels: ["method"],
244                buckets: histogram_seconds_buckets(0.001, 32.0),
245            )),
246            apply_catalog_implications_seconds: registry.register(metric!(
247                name: "mz_apply_catalog_implications_seconds",
248                help: "The time it takes to apply catalog implications.",
249                buckets: histogram_seconds_buckets(0.001, 32.0),
250            )),
251            group_commit_catalog_upper_seconds: registry.register(metric!(
252                name: "mz_group_commit_catalog_upper_seconds",
253                help: "The time it takes to advance the catalog shard upper during group commit.",
254                buckets: histogram_seconds_buckets(0.001, 32.0),
255            )),
256            group_commit_table_advancement_seconds: registry.register(metric!(
257                name: "mz_group_commit_table_advancement_seconds",
258                help: "The time it takes to iterate over all catalog entries to find tables during group commit.",
259                buckets: histogram_seconds_buckets(0.001, 32.0),
260            ))
261        }
262    }
263
264    pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
265        self.row_set_finishing_seconds.clone()
266    }
267
268    pub(crate) fn session_metrics(&self) -> SessionMetrics {
269        SessionMetrics {
270            row_set_finishing_seconds: self.row_set_finishing_seconds(),
271            session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
272            query_total: self.query_total.clone(),
273            determine_timestamp: self.determine_timestamp.clone(),
274            timestamp_difference_for_strict_serializable_ms: self
275                .timestamp_difference_for_strict_serializable_ms
276                .clone(),
277            optimization_notices: self.optimization_notices.clone(),
278            statement_logging_records: self.statement_logging_records.clone(),
279            statement_logging_unsampled_bytes: self.statement_logging_unsampled_bytes.clone(),
280            statement_logging_actual_bytes: self.statement_logging_actual_bytes.clone(),
281        }
282    }
283}
284
285/// Metrics to be accessed from a [`crate::session::Session`].
286#[derive(Debug, Clone)]
287pub struct SessionMetrics {
288    row_set_finishing_seconds: Histogram,
289    session_startup_table_writes_seconds: Histogram,
290    query_total: IntCounterVec,
291    determine_timestamp: IntCounterVec,
292    timestamp_difference_for_strict_serializable_ms: HistogramVec,
293    optimization_notices: IntCounterVec,
294    statement_logging_records: IntCounterVec,
295    statement_logging_unsampled_bytes: IntCounter,
296    statement_logging_actual_bytes: IntCounter,
297}
298
299impl SessionMetrics {
300    pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
301        &self.row_set_finishing_seconds
302    }
303
304    pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
305        &self.session_startup_table_writes_seconds
306    }
307
308    pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
309        self.query_total.with_label_values(label_values)
310    }
311
312    pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
313        self.determine_timestamp.with_label_values(label_values)
314    }
315
316    pub(crate) fn timestamp_difference_for_strict_serializable_ms(
317        &self,
318        label_values: &[&str],
319    ) -> Histogram {
320        self.timestamp_difference_for_strict_serializable_ms
321            .with_label_values(label_values)
322    }
323
324    pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
325        self.optimization_notices.with_label_values(label_values)
326    }
327
328    pub(crate) fn statement_logging_records(
329        &self,
330        label_values: &[&str],
331    ) -> GenericCounter<AtomicU64> {
332        self.statement_logging_records
333            .with_label_values(label_values)
334    }
335
336    pub(crate) fn statement_logging_unsampled_bytes(&self) -> &IntCounter {
337        &self.statement_logging_unsampled_bytes
338    }
339
340    pub(crate) fn statement_logging_actual_bytes(&self) -> &IntCounter {
341        &self.statement_logging_actual_bytes
342    }
343}
344
345pub(crate) fn session_type_label_value(user: &User) -> &'static str {
346    match user.is_internal() {
347        true => "system",
348        false => "user",
349    }
350}
351
352pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
353where
354    T: AstInfo,
355{
356    statement_kind_label_value(StatementKind::from(stmt))
357}
358
359pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
360where
361    T: AstInfo,
362{
363    match output {
364        SubscribeOutput::Diffs => "diffs",
365        SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
366        SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
367        SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
368    }
369}