mz_adapter/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::metric;
11use mz_ore::metrics::MetricsRegistry;
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
17
18#[derive(Debug, Clone)]
19pub struct Metrics {
20    pub query_total: IntCounterVec,
21    pub active_sessions: IntGaugeVec,
22    pub active_subscribes: IntGaugeVec,
23    pub active_copy_tos: IntGaugeVec,
24    pub queue_busy_seconds: HistogramVec,
25    pub determine_timestamp: IntCounterVec,
26    pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
27    pub commands: IntCounterVec,
28    pub storage_usage_collection_time_seconds: HistogramVec,
29    pub subscribe_outputs: IntCounterVec,
30    pub canceled_peeks: IntCounterVec,
31    pub linearize_message_seconds: HistogramVec,
32    pub time_to_first_row_seconds: HistogramVec,
33    pub statement_logging_records: IntCounterVec,
34    pub statement_logging_unsampled_bytes: IntCounterVec,
35    pub statement_logging_actual_bytes: IntCounterVec,
36    pub message_batch: HistogramVec,
37    pub message_handling: HistogramVec,
38    pub optimization_notices: IntCounterVec,
39    pub append_table_duration_seconds: HistogramVec,
40    pub webhook_validation_reduce_failures: IntCounterVec,
41    pub webhook_get_appender: IntCounter,
42    pub check_scheduling_policies_seconds: HistogramVec,
43    pub handle_scheduling_decisions_seconds: HistogramVec,
44    pub row_set_finishing_seconds: HistogramVec,
45    pub session_startup_table_writes_seconds: HistogramVec,
46}
47
48impl Metrics {
49    pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
50        Self {
51            query_total: registry.register(metric!(
52                name: "mz_query_total",
53                help: "The total number of queries issued of the given type since process start.",
54                var_labels: ["session_type", "statement_type"],
55            )),
56            active_sessions: registry.register(metric!(
57                name: "mz_active_sessions",
58                help: "The number of active coordinator sessions.",
59                var_labels: ["session_type"],
60            )),
61            active_subscribes: registry.register(metric!(
62                name: "mz_active_subscribes",
63                help: "The number of active SUBSCRIBE queries.",
64                var_labels: ["session_type"],
65            )),
66            active_copy_tos: registry.register(metric!(
67                name: "mz_active_copy_tos",
68                help: "The number of active COPY TO queries.",
69                var_labels: ["session_type"],
70            )),
71            queue_busy_seconds: registry.register(metric!(
72                name: "mz_coord_queue_busy_seconds",
73                help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
74                buckets: histogram_seconds_buckets(0.000_128, 32.0)
75            )),
76            determine_timestamp: registry.register(metric!(
77                name: "mz_determine_timestamp",
78                help: "The total number of calls to determine_timestamp.",
79                var_labels:["respond_immediately", "isolation_level", "compute_instance", "determination_method"],
80            )),
81            timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
82                name: "mz_timestamp_difference_for_strict_serializable_ms",
83                help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
84                var_labels:["compute_instance", "determination_method"],
85                buckets: histogram_milliseconds_buckets(1., 8000.),
86            )),
87            commands: registry.register(metric!(
88                name: "mz_adapter_commands",
89                help: "The total number of adapter commands issued of the given type since process start.",
90                var_labels: ["command_type", "status", "application_name"],
91            )),
92            storage_usage_collection_time_seconds: registry.register(metric!(
93                name: "mz_storage_usage_collection_time_seconds",
94                help: "The number of seconds the coord spends collecting usage metrics from storage.",
95                buckets: histogram_seconds_buckets(0.000_128, 8.0)
96            )),
97            subscribe_outputs: registry.register(metric!(
98                name: "mz_subscribe_outputs",
99                help: "The total number of different subscribe outputs used",
100                var_labels: ["session_type", "subscribe_output"],
101            )),
102            canceled_peeks: registry.register(metric!(
103                name: "mz_canceled_peeks_total",
104                help: "The total number of canceled peeks since process start.",
105            )),
106            linearize_message_seconds: registry.register(metric!(
107                name: "mz_linearize_message_seconds",
108                help: "The number of seconds it takes to linearize strict serializable messages",
109                var_labels: ["type", "immediately_handled"],
110                buckets: histogram_seconds_buckets(0.000_128, 8.0),
111            )),
112            time_to_first_row_seconds: registry.register(metric! {
113                name: "mz_time_to_first_row_seconds",
114                help: "Latency of an execute for a successful query from pgwire's perspective",
115                var_labels: ["instance_id", "isolation_level", "strategy"],
116                buckets: histogram_seconds_buckets(0.000_128, 32.0)
117            }),
118            statement_logging_records: registry.register(metric! {
119                name: "mz_statement_logging_record_count",
120                help: "The total number of SQL statements tagged with whether or not they were recorded.",
121                var_labels: ["sample"],
122            }),
123            statement_logging_unsampled_bytes: registry.register(metric!(
124                name: "mz_statement_logging_unsampled_bytes",
125                help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
126            )),
127            statement_logging_actual_bytes: registry.register(metric!(
128                name: "mz_statement_logging_actual_bytes",
129                help: "The total amount of SQL text that was logged by statement logging.",
130            )),
131            message_batch: registry.register(metric!(
132                name: "mz_coordinator_message_batch_size",
133                help: "Message batch size handled by the coordinator.",
134                buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
135            )),
136            message_handling: registry.register(metric!(
137                name: "mz_slow_message_handling",
138                help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
139                var_labels: ["message_kind"],
140                buckets: histogram_seconds_buckets(0.000_128, 32.0),
141            )),
142            optimization_notices: registry.register(metric!(
143                name: "mz_optimization_notices",
144                help: "Number of optimization notices per notice type.",
145                var_labels: ["notice_type"],
146            )),
147            append_table_duration_seconds: registry.register(metric!(
148                name: "mz_append_table_duration_seconds",
149                help: "Latency for appending to any (user or system) table.",
150                buckets: histogram_seconds_buckets(0.128, 32.0),
151            )),
152            webhook_validation_reduce_failures: registry.register(metric!(
153                name: "mz_webhook_validation_reduce_failures",
154                help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
155                var_labels: ["reason"],
156            )),
157            webhook_get_appender: registry.register(metric!(
158                name: "mz_webhook_get_appender_count",
159                help: "Count of getting a webhook appender from the Coordinator.",
160            )),
161            check_scheduling_policies_seconds: registry.register(metric!(
162                name: "mz_check_scheduling_policies_seconds",
163                help: "The time each policy in `check_scheduling_policies` takes.",
164                var_labels: ["policy", "thread"],
165                buckets: histogram_seconds_buckets(0.000_128, 8.0),
166            )),
167            handle_scheduling_decisions_seconds: registry.register(metric!(
168                name: "mz_handle_scheduling_decisions_seconds",
169                help: "The time `handle_scheduling_decisions` takes.",
170                var_labels: ["altered_a_cluster"],
171                buckets: histogram_seconds_buckets(0.000_128, 8.0),
172            )),
173            row_set_finishing_seconds: registry.register(metric!(
174                name: "mz_row_set_finishing_seconds",
175                help: "The time it takes to run RowSetFinishing::finish.",
176                buckets: histogram_seconds_buckets(0.000_128, 16.0),
177            )),
178            session_startup_table_writes_seconds: registry.register(metric!(
179                name: "mz_session_startup_table_writes_seconds",
180                help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
181                buckets: histogram_seconds_buckets(0.000_008, 4.0),
182            )),
183        }
184    }
185
186    pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
187        self.row_set_finishing_seconds.with_label_values(&[])
188    }
189
190    pub(crate) fn session_metrics(&self) -> SessionMetrics {
191        SessionMetrics {
192            row_set_finishing_seconds: self.row_set_finishing_seconds(),
193            session_startup_table_writes_seconds: self
194                .session_startup_table_writes_seconds
195                .with_label_values(&[]),
196        }
197    }
198}
199
200/// Metrics associated with a [`crate::session::Session`].
201#[derive(Debug, Clone)]
202pub struct SessionMetrics {
203    row_set_finishing_seconds: Histogram,
204    session_startup_table_writes_seconds: Histogram,
205}
206
207impl SessionMetrics {
208    pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
209        &self.row_set_finishing_seconds
210    }
211
212    pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
213        &self.session_startup_table_writes_seconds
214    }
215}
216
217pub(crate) fn session_type_label_value(user: &User) -> &'static str {
218    match user.is_internal() {
219        true => "system",
220        false => "user",
221    }
222}
223
224pub(crate) fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
225where
226    T: AstInfo,
227{
228    statement_kind_label_value(StatementKind::from(stmt))
229}
230
231pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
232where
233    T: AstInfo,
234{
235    match output {
236        SubscribeOutput::Diffs => "diffs",
237        SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
238        SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
239        SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
240    }
241}