Skip to main content

mz_adapter/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::metric;
11use mz_ore::metrics::MetricsRegistry;
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21    pub query_total: IntCounterVec,
22    pub active_sessions: IntGaugeVec,
23    pub active_subscribes: IntGaugeVec,
24    pub active_copy_tos: IntGaugeVec,
25    pub queue_busy_seconds: Histogram,
26    pub determine_timestamp: IntCounterVec,
27    pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28    pub commands: IntCounterVec,
29    pub storage_usage_collection_time_seconds: Histogram,
30    pub arrangement_sizes_collection_time_seconds: Histogram,
31    pub arrangement_sizes_rows_written: IntCounter,
32    pub subscribe_outputs: IntCounterVec,
33    pub canceled_peeks: IntCounter,
34    pub linearize_message_seconds: HistogramVec,
35    pub time_to_first_row_seconds: HistogramVec,
36    pub statement_logging_records: IntCounterVec,
37    pub statement_logging_unsampled_bytes: IntCounter,
38    pub statement_logging_actual_bytes: IntCounter,
39    pub message_batch: Histogram,
40    pub message_handling: HistogramVec,
41    pub optimization_notices: IntCounterVec,
42    pub append_table_duration_seconds: Histogram,
43    pub webhook_validation_reduce_failures: IntCounterVec,
44    pub webhook_get_appender: IntCounter,
45    pub check_scheduling_policies_seconds: HistogramVec,
46    pub handle_scheduling_decisions_seconds: HistogramVec,
47    pub row_set_finishing_seconds: Histogram,
48    pub session_startup_table_writes_seconds: Histogram,
49    pub parse_seconds: Histogram,
50    pub pgwire_message_processing_seconds: HistogramVec,
51    pub result_rows_first_to_last_byte_seconds: HistogramVec,
52    pub pgwire_ensure_transaction_seconds: HistogramVec,
53    pub catalog_snapshot_seconds: HistogramVec,
54    pub pgwire_recv_scheduling_delay_ms: HistogramVec,
55    pub catalog_transact_seconds: HistogramVec,
56    pub apply_catalog_implications_seconds: Histogram,
57    pub group_commit_catalog_upper_seconds: Histogram,
58}
59
60impl Metrics {
61    pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
62        Self {
63            query_total: registry.register(metric!(
64                name: "mz_query_total",
65                help: "The total number of queries issued of the given type since process start.",
66                var_labels: ["session_type", "statement_type"],
67            )),
68            active_sessions: registry.register(metric!(
69                name: "mz_active_sessions",
70                help: "The number of active coordinator sessions.",
71                var_labels: ["session_type"],
72            )),
73            active_subscribes: registry.register(metric!(
74                name: "mz_active_subscribes",
75                help: "The number of active SUBSCRIBE queries.",
76                var_labels: ["session_type"],
77            )),
78            active_copy_tos: registry.register(metric!(
79                name: "mz_active_copy_tos",
80                help: "The number of active COPY TO queries.",
81                var_labels: ["session_type"],
82            )),
83            queue_busy_seconds: registry.register(metric!(
84                name: "mz_coord_queue_busy_seconds",
85                help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
86                buckets: histogram_seconds_buckets(0.000_128, 32.0)
87            )),
88            determine_timestamp: registry.register(metric!(
89                name: "mz_determine_timestamp",
90                help: "The total number of calls to determine_timestamp.",
91                var_labels:["respond_immediately", "isolation_level", "compute_instance"],
92            )),
93            timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
94                name: "mz_timestamp_difference_for_strict_serializable_ms",
95                help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
96                var_labels:["compute_instance"],
97                buckets: histogram_milliseconds_buckets(1., 8000.),
98            )),
99            commands: registry.register(metric!(
100                name: "mz_adapter_commands",
101                help: "The total number of adapter commands issued of the given type since process start.",
102                var_labels: ["command_type", "status", "application_name"],
103            )),
104            storage_usage_collection_time_seconds: registry.register(metric!(
105                name: "mz_storage_usage_collection_time_seconds",
106                help: "The number of seconds the coord spends collecting usage metrics from storage.",
107                buckets: histogram_seconds_buckets(0.000_128, 8.0)
108            )),
109            arrangement_sizes_collection_time_seconds: registry.register(metric!(
110                name: "mz_arrangement_sizes_collection_time_seconds",
111                help: "Seconds to read mz_object_arrangement_sizes and prepare history-table updates for one snapshot.",
112                buckets: histogram_seconds_buckets(0.000_128, 8.0)
113            )),
114            arrangement_sizes_rows_written: registry.register(metric!(
115                name: "mz_arrangement_sizes_rows_written_total",
116                help: "Total rows appended to mz_object_arrangement_size_history since process start.",
117            )),
118            subscribe_outputs: registry.register(metric!(
119                name: "mz_subscribe_outputs",
120                help: "The total number of different subscribe outputs used",
121                var_labels: ["session_type", "subscribe_output"],
122            )),
123            canceled_peeks: registry.register(metric!(
124                name: "mz_canceled_peeks_total",
125                help: "The total number of canceled peeks since process start.",
126            )),
127            linearize_message_seconds: registry.register(metric!(
128                name: "mz_linearize_message_seconds",
129                help: "The number of seconds it takes to linearize strict serializable messages",
130                var_labels: ["type", "immediately_handled"],
131                buckets: histogram_seconds_buckets(0.000_128, 8.0),
132            )),
133            time_to_first_row_seconds: registry.register(metric! {
134                name: "mz_time_to_first_row_seconds",
135                help: "Latency of an execute for a successful query from pgwire's perspective",
136                var_labels: ["instance_id", "isolation_level", "strategy"],
137                buckets: histogram_seconds_buckets(0.000_128, 32.0)
138            }),
139            statement_logging_records: registry.register(metric! {
140                name: "mz_statement_logging_record_count",
141                help: "The total number of SQL statements tagged with whether or not they were recorded.",
142                var_labels: ["sample"],
143            }),
144            statement_logging_unsampled_bytes: registry.register(metric!(
145                name: "mz_statement_logging_unsampled_bytes",
146                help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
147            )),
148            statement_logging_actual_bytes: registry.register(metric!(
149                name: "mz_statement_logging_actual_bytes",
150                help: "The total amount of SQL text that was logged by statement logging.",
151            )),
152            message_batch: registry.register(metric!(
153                name: "mz_coordinator_message_batch_size",
154                help: "Message batch size handled by the coordinator.",
155                buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
156            )),
157            message_handling: registry.register(metric!(
158                name: "mz_slow_message_handling",
159                help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
160                var_labels: ["message_kind"],
161                buckets: histogram_seconds_buckets(0.000_128, 512.0),
162            )),
163            optimization_notices: registry.register(metric!(
164                name: "mz_optimization_notices",
165                help: "Number of optimization notices per notice type.",
166                var_labels: ["notice_type"],
167            )),
168            append_table_duration_seconds: registry.register(metric!(
169                name: "mz_append_table_duration_seconds",
170                help: "Latency for appending to any (user or system) table.",
171                buckets: histogram_seconds_buckets(0.128, 32.0),
172            )),
173            webhook_validation_reduce_failures: registry.register(metric!(
174                name: "mz_webhook_validation_reduce_failures",
175                help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
176                var_labels: ["reason"],
177            )),
178            webhook_get_appender: registry.register(metric!(
179                name: "mz_webhook_get_appender_count",
180                help: "Count of getting a webhook appender from the Coordinator.",
181            )),
182            check_scheduling_policies_seconds: registry.register(metric!(
183                name: "mz_check_scheduling_policies_seconds",
184                help: "The time each policy in `check_scheduling_policies` takes.",
185                var_labels: ["policy", "thread"],
186                buckets: histogram_seconds_buckets(0.000_128, 8.0),
187            )),
188            handle_scheduling_decisions_seconds: registry.register(metric!(
189                name: "mz_handle_scheduling_decisions_seconds",
190                help: "The time `handle_scheduling_decisions` takes.",
191                var_labels: ["altered_a_cluster"],
192                buckets: histogram_seconds_buckets(0.000_128, 8.0),
193            )),
194            row_set_finishing_seconds: registry.register(metric!(
195                name: "mz_row_set_finishing_seconds",
196                help: "The time it takes to run RowSetFinishing::finish.",
197                buckets: histogram_seconds_buckets(0.000_128, 16.0),
198            )),
199            session_startup_table_writes_seconds: registry.register(metric!(
200                name: "mz_session_startup_table_writes_seconds",
201                help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
202                buckets: histogram_seconds_buckets(0.000_008, 4.0),
203            )),
204            parse_seconds: registry.register(metric!(
205                name: "mz_parse_seconds",
206                help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
207                buckets: histogram_seconds_buckets(0.001, 8.0),
208            )),
209            pgwire_message_processing_seconds: registry.register(metric!(
210                name: "mz_pgwire_message_processing_seconds",
211                help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
212                var_labels: ["message_type"],
213                buckets: histogram_seconds_buckets(0.001, 512.0),
214            )),
215            result_rows_first_to_last_byte_seconds: registry.register(metric!(
216                name: "mz_result_rows_first_to_last_byte_seconds",
217                help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
218                var_labels: ["statement_type"],
219                buckets: histogram_seconds_buckets(0.001, 8192.0),
220            )),
221            pgwire_ensure_transaction_seconds: registry.register(metric!(
222                name: "mz_pgwire_ensure_transaction_seconds",
223                help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
224                var_labels: ["message_type"],
225                buckets: histogram_seconds_buckets(0.001, 512.0),
226            )),
227            catalog_snapshot_seconds: registry.register(metric!(
228                name: "mz_catalog_snapshot_seconds",
229                help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
230                var_labels: ["context"],
231                buckets: histogram_seconds_buckets(0.001, 512.0),
232            )),
233            pgwire_recv_scheduling_delay_ms: registry.register(metric!(
234                name: "mz_pgwire_recv_scheduling_delay_ms",
235                help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
236                var_labels: ["message_type"],
237                buckets: histogram_milliseconds_buckets(0.128, 512000.),
238            )),
239            catalog_transact_seconds: registry.register(metric!(
240                name: "mz_catalog_transact_seconds",
241                help: "The time it takes to run various catalog transact methods.",
242                var_labels: ["method"],
243                buckets: histogram_seconds_buckets(0.001, 32.0),
244            )),
245            apply_catalog_implications_seconds: registry.register(metric!(
246                name: "mz_apply_catalog_implications_seconds",
247                help: "The time it takes to apply catalog implications.",
248                buckets: histogram_seconds_buckets(0.001, 32.0),
249            )),
250            group_commit_catalog_upper_seconds: registry.register(metric!(
251                name: "mz_group_commit_catalog_upper_seconds",
252                help: "The time it takes to advance the catalog shard upper during group commit.",
253                buckets: histogram_seconds_buckets(0.001, 32.0),
254            )),
255        }
256    }
257
258    pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
259        self.row_set_finishing_seconds.clone()
260    }
261
262    pub(crate) fn session_metrics(&self) -> SessionMetrics {
263        SessionMetrics {
264            row_set_finishing_seconds: self.row_set_finishing_seconds(),
265            session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
266            query_total: self.query_total.clone(),
267            determine_timestamp: self.determine_timestamp.clone(),
268            timestamp_difference_for_strict_serializable_ms: self
269                .timestamp_difference_for_strict_serializable_ms
270                .clone(),
271            optimization_notices: self.optimization_notices.clone(),
272            statement_logging_records: self.statement_logging_records.clone(),
273            statement_logging_unsampled_bytes: self.statement_logging_unsampled_bytes.clone(),
274            statement_logging_actual_bytes: self.statement_logging_actual_bytes.clone(),
275        }
276    }
277}
278
279/// Metrics to be accessed from a [`crate::session::Session`].
280#[derive(Debug, Clone)]
281pub struct SessionMetrics {
282    row_set_finishing_seconds: Histogram,
283    session_startup_table_writes_seconds: Histogram,
284    query_total: IntCounterVec,
285    determine_timestamp: IntCounterVec,
286    timestamp_difference_for_strict_serializable_ms: HistogramVec,
287    optimization_notices: IntCounterVec,
288    statement_logging_records: IntCounterVec,
289    statement_logging_unsampled_bytes: IntCounter,
290    statement_logging_actual_bytes: IntCounter,
291}
292
293impl SessionMetrics {
294    pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
295        &self.row_set_finishing_seconds
296    }
297
298    pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
299        &self.session_startup_table_writes_seconds
300    }
301
302    pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
303        self.query_total.with_label_values(label_values)
304    }
305
306    pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
307        self.determine_timestamp.with_label_values(label_values)
308    }
309
310    pub(crate) fn timestamp_difference_for_strict_serializable_ms(
311        &self,
312        label_values: &[&str],
313    ) -> Histogram {
314        self.timestamp_difference_for_strict_serializable_ms
315            .with_label_values(label_values)
316    }
317
318    pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
319        self.optimization_notices.with_label_values(label_values)
320    }
321
322    pub(crate) fn statement_logging_records(
323        &self,
324        label_values: &[&str],
325    ) -> GenericCounter<AtomicU64> {
326        self.statement_logging_records
327            .with_label_values(label_values)
328    }
329
330    pub(crate) fn statement_logging_unsampled_bytes(&self) -> &IntCounter {
331        &self.statement_logging_unsampled_bytes
332    }
333
334    pub(crate) fn statement_logging_actual_bytes(&self) -> &IntCounter {
335        &self.statement_logging_actual_bytes
336    }
337}
338
339pub(crate) fn session_type_label_value(user: &User) -> &'static str {
340    match user.is_internal() {
341        true => "system",
342        false => "user",
343    }
344}
345
346pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
347where
348    T: AstInfo,
349{
350    statement_kind_label_value(StatementKind::from(stmt))
351}
352
353pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
354where
355    T: AstInfo,
356{
357    match output {
358        SubscribeOutput::Diffs => "diffs",
359        SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
360        SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
361        SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
362    }
363}