Skip to main content

mz_adapter/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::metric;
11use mz_ore::metrics::{MetricTag, MetricVisibility, MetricsRegistry};
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21    pub query_total: IntCounterVec,
22    pub active_sessions: IntGaugeVec,
23    pub active_subscribes: IntGaugeVec,
24    pub active_copy_tos: IntGaugeVec,
25    pub queue_busy_seconds: Histogram,
26    pub determine_timestamp: IntCounterVec,
27    pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28    pub timestamp_difference_for_bounded_staleness_ms: HistogramVec,
29    pub commands: IntCounterVec,
30    pub storage_usage_collection_time_seconds: Histogram,
31    pub arrangement_sizes_collection_time_seconds: Histogram,
32    pub arrangement_sizes_rows_written: IntCounter,
33    pub subscribe_outputs: IntCounterVec,
34    pub canceled_peeks: IntCounter,
35    pub linearize_message_seconds: HistogramVec,
36    pub time_to_first_row_seconds: HistogramVec,
37    pub statement_logging_records: IntCounterVec,
38    pub statement_logging_unsampled_bytes: IntCounter,
39    pub statement_logging_actual_bytes: IntCounter,
40    pub message_batch: Histogram,
41    pub message_handling: HistogramVec,
42    pub optimization_notices: IntCounterVec,
43    pub append_table_duration_seconds: Histogram,
44    pub webhook_validation_reduce_failures: IntCounterVec,
45    pub webhook_get_appender: IntCounter,
46    pub check_scheduling_policies_seconds: HistogramVec,
47    pub handle_scheduling_decisions_seconds: HistogramVec,
48    pub row_set_finishing_seconds: Histogram,
49    pub session_startup_table_writes_seconds: Histogram,
50    pub parse_seconds: Histogram,
51    pub pgwire_message_processing_seconds: HistogramVec,
52    pub result_rows_first_to_last_byte_seconds: HistogramVec,
53    pub pgwire_ensure_transaction_seconds: HistogramVec,
54    pub catalog_snapshot_seconds: HistogramVec,
55    pub pgwire_recv_scheduling_delay_ms: HistogramVec,
56    pub catalog_transact_seconds: HistogramVec,
57    pub apply_catalog_implications_seconds: Histogram,
58    pub group_commit_catalog_upper_seconds: Histogram,
59}
60
61impl Metrics {
62    pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
63        Self {
64            query_total: registry.register(metric!(
65                name: "mz_query_total",
66                help: "The total number of queries issued of the given type since process start.",
67                var_labels: ["session_type", "statement_type"],
68                visibility: MetricVisibility::Public,
69                tags: [MetricTag::Environment],
70            )),
71            active_sessions: registry.register(metric!(
72                name: "mz_active_sessions",
73                help: "The number of active coordinator sessions.",
74                var_labels: ["session_type"],
75                visibility: MetricVisibility::Public,
76                tags: [MetricTag::Environment],
77            )),
78            active_subscribes: registry.register(metric!(
79                name: "mz_active_subscribes",
80                help: "The number of active SUBSCRIBE queries.",
81                var_labels: ["session_type"],
82                visibility: MetricVisibility::Public,
83                tags: [MetricTag::Environment],
84            )),
85            active_copy_tos: registry.register(metric!(
86                name: "mz_active_copy_tos",
87                help: "The number of active COPY TO queries.",
88                var_labels: ["session_type"],
89            )),
90            queue_busy_seconds: registry.register(metric!(
91                name: "mz_coord_queue_busy_seconds",
92                help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
93                buckets: histogram_seconds_buckets(0.000_128, 32.0)
94            )),
95            determine_timestamp: registry.register(metric!(
96                name: "mz_determine_timestamp",
97                help: "The total number of calls to determine_timestamp.",
98                var_labels:["respond_immediately", "isolation_level", "compute_instance"],
99            )),
100            timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
101                name: "mz_timestamp_difference_for_strict_serializable_ms",
102                help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
103                var_labels:["compute_instance"],
104                buckets: histogram_milliseconds_buckets(1., 8000.),
105            )),
106            timestamp_difference_for_bounded_staleness_ms: registry.register(metric!(
107                name: "mz_timestamp_difference_for_bounded_staleness_ms",
108                help: "How much older bounded-staleness timestamps are compared to serializable, in milliseconds. Measures the actual staleness incurred.",
109                var_labels:["compute_instance"],
110                buckets: histogram_milliseconds_buckets(1., 8000.),
111            )),
112            commands: registry.register(metric!(
113                name: "mz_adapter_commands",
114                help: "The total number of adapter commands issued of the given type since process start.",
115                var_labels: ["command_type", "status", "application_name"],
116                visibility: MetricVisibility::Public,
117                tags: [MetricTag::Environment],
118            )),
119            storage_usage_collection_time_seconds: registry.register(metric!(
120                name: "mz_storage_usage_collection_time_seconds",
121                help: "The number of seconds the coord spends collecting usage metrics from storage.",
122                buckets: histogram_seconds_buckets(0.000_128, 8.0)
123            )),
124            arrangement_sizes_collection_time_seconds: registry.register(metric!(
125                name: "mz_arrangement_sizes_collection_time_seconds",
126                help: "Seconds to read mz_object_arrangement_sizes and prepare history-table updates for one snapshot.",
127                buckets: histogram_seconds_buckets(0.000_128, 8.0)
128            )),
129            arrangement_sizes_rows_written: registry.register(metric!(
130                name: "mz_arrangement_sizes_rows_written_total",
131                help: "Total rows appended to mz_object_arrangement_size_history since process start.",
132            )),
133            subscribe_outputs: registry.register(metric!(
134                name: "mz_subscribe_outputs",
135                help: "The total number of different subscribe outputs used",
136                var_labels: ["session_type", "subscribe_output"],
137            )),
138            canceled_peeks: registry.register(metric!(
139                name: "mz_canceled_peeks_total",
140                help: "The total number of canceled peeks since process start.",
141            )),
142            linearize_message_seconds: registry.register(metric!(
143                name: "mz_linearize_message_seconds",
144                help: "The number of seconds it takes to linearize strict serializable messages",
145                var_labels: ["type", "immediately_handled"],
146                buckets: histogram_seconds_buckets(0.000_128, 8.0),
147            )),
148            time_to_first_row_seconds: registry.register(metric! {
149                name: "mz_time_to_first_row_seconds",
150                help: "Latency of an execute for a successful query from pgwire's perspective",
151                var_labels: ["instance_id", "isolation_level", "strategy", "application_name"],
152                buckets: histogram_seconds_buckets(0.000_128, 32.0)
153            }),
154            statement_logging_records: registry.register(metric! {
155                name: "mz_statement_logging_record_count",
156                help: "The total number of SQL statements tagged with whether or not they were recorded.",
157                var_labels: ["sample"],
158            }),
159            statement_logging_unsampled_bytes: registry.register(metric!(
160                name: "mz_statement_logging_unsampled_bytes",
161                help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
162            )),
163            statement_logging_actual_bytes: registry.register(metric!(
164                name: "mz_statement_logging_actual_bytes",
165                help: "The total amount of SQL text that was logged by statement logging.",
166            )),
167            message_batch: registry.register(metric!(
168                name: "mz_coordinator_message_batch_size",
169                help: "Message batch size handled by the coordinator.",
170                buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
171            )),
172            message_handling: registry.register(metric!(
173                name: "mz_slow_message_handling",
174                help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
175                var_labels: ["message_kind"],
176                buckets: histogram_seconds_buckets(0.000_128, 512.0),
177            )),
178            optimization_notices: registry.register(metric!(
179                name: "mz_optimization_notices",
180                help: "Number of optimization notices per notice type.",
181                var_labels: ["notice_type"],
182            )),
183            append_table_duration_seconds: registry.register(metric!(
184                name: "mz_append_table_duration_seconds",
185                help: "Latency for appending to any (user or system) table.",
186                buckets: histogram_seconds_buckets(0.128, 32.0),
187            )),
188            webhook_validation_reduce_failures: registry.register(metric!(
189                name: "mz_webhook_validation_reduce_failures",
190                help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
191                var_labels: ["reason"],
192            )),
193            webhook_get_appender: registry.register(metric!(
194                name: "mz_webhook_get_appender_count",
195                help: "Count of getting a webhook appender from the Coordinator.",
196            )),
197            check_scheduling_policies_seconds: registry.register(metric!(
198                name: "mz_check_scheduling_policies_seconds",
199                help: "The time each policy in `check_scheduling_policies` takes.",
200                var_labels: ["policy", "thread"],
201                buckets: histogram_seconds_buckets(0.000_128, 8.0),
202            )),
203            handle_scheduling_decisions_seconds: registry.register(metric!(
204                name: "mz_handle_scheduling_decisions_seconds",
205                help: "The time `handle_scheduling_decisions` takes.",
206                var_labels: ["altered_a_cluster"],
207                buckets: histogram_seconds_buckets(0.000_128, 8.0),
208            )),
209            row_set_finishing_seconds: registry.register(metric!(
210                name: "mz_row_set_finishing_seconds",
211                help: "The time it takes to run RowSetFinishing::finish.",
212                buckets: histogram_seconds_buckets(0.000_128, 16.0),
213            )),
214            session_startup_table_writes_seconds: registry.register(metric!(
215                name: "mz_session_startup_table_writes_seconds",
216                help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
217                buckets: histogram_seconds_buckets(0.000_008, 4.0),
218            )),
219            parse_seconds: registry.register(metric!(
220                name: "mz_parse_seconds",
221                help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
222                buckets: histogram_seconds_buckets(0.001, 8.0),
223            )),
224            pgwire_message_processing_seconds: registry.register(metric!(
225                name: "mz_pgwire_message_processing_seconds",
226                help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
227                var_labels: ["message_type"],
228                buckets: histogram_seconds_buckets(0.001, 512.0),
229            )),
230            result_rows_first_to_last_byte_seconds: registry.register(metric!(
231                name: "mz_result_rows_first_to_last_byte_seconds",
232                help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
233                var_labels: ["statement_type"],
234                buckets: histogram_seconds_buckets(0.001, 8192.0),
235            )),
236            pgwire_ensure_transaction_seconds: registry.register(metric!(
237                name: "mz_pgwire_ensure_transaction_seconds",
238                help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
239                var_labels: ["message_type"],
240                buckets: histogram_seconds_buckets(0.001, 512.0),
241            )),
242            catalog_snapshot_seconds: registry.register(metric!(
243                name: "mz_catalog_snapshot_seconds",
244                help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
245                var_labels: ["context"],
246                buckets: histogram_seconds_buckets(0.001, 512.0),
247            )),
248            pgwire_recv_scheduling_delay_ms: registry.register(metric!(
249                name: "mz_pgwire_recv_scheduling_delay_ms",
250                help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
251                var_labels: ["message_type"],
252                buckets: histogram_milliseconds_buckets(0.128, 512000.),
253            )),
254            catalog_transact_seconds: registry.register(metric!(
255                name: "mz_catalog_transact_seconds",
256                help: "The time it takes to run various catalog transact methods.",
257                var_labels: ["method"],
258                buckets: histogram_seconds_buckets(0.001, 32.0),
259            )),
260            apply_catalog_implications_seconds: registry.register(metric!(
261                name: "mz_apply_catalog_implications_seconds",
262                help: "The time it takes to apply catalog implications.",
263                buckets: histogram_seconds_buckets(0.001, 32.0),
264            )),
265            group_commit_catalog_upper_seconds: registry.register(metric!(
266                name: "mz_group_commit_catalog_upper_seconds",
267                help: "The time it takes to advance the catalog shard upper during group commit.",
268                buckets: histogram_seconds_buckets(0.001, 32.0),
269            )),
270        }
271    }
272
273    pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
274        self.row_set_finishing_seconds.clone()
275    }
276
277    pub(crate) fn session_metrics(&self) -> SessionMetrics {
278        SessionMetrics {
279            row_set_finishing_seconds: self.row_set_finishing_seconds(),
280            session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
281            query_total: self.query_total.clone(),
282            determine_timestamp: self.determine_timestamp.clone(),
283            timestamp_difference_for_strict_serializable_ms: self
284                .timestamp_difference_for_strict_serializable_ms
285                .clone(),
286            timestamp_difference_for_bounded_staleness_ms: self
287                .timestamp_difference_for_bounded_staleness_ms
288                .clone(),
289            optimization_notices: self.optimization_notices.clone(),
290            statement_logging_records: self.statement_logging_records.clone(),
291            statement_logging_unsampled_bytes: self.statement_logging_unsampled_bytes.clone(),
292            statement_logging_actual_bytes: self.statement_logging_actual_bytes.clone(),
293        }
294    }
295}
296
297/// Metrics to be accessed from a [`crate::session::Session`].
298#[derive(Debug, Clone)]
299pub struct SessionMetrics {
300    row_set_finishing_seconds: Histogram,
301    session_startup_table_writes_seconds: Histogram,
302    query_total: IntCounterVec,
303    determine_timestamp: IntCounterVec,
304    timestamp_difference_for_strict_serializable_ms: HistogramVec,
305    timestamp_difference_for_bounded_staleness_ms: HistogramVec,
306    optimization_notices: IntCounterVec,
307    statement_logging_records: IntCounterVec,
308    statement_logging_unsampled_bytes: IntCounter,
309    statement_logging_actual_bytes: IntCounter,
310}
311
312impl SessionMetrics {
313    pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
314        &self.row_set_finishing_seconds
315    }
316
317    pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
318        &self.session_startup_table_writes_seconds
319    }
320
321    pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
322        self.query_total.with_label_values(label_values)
323    }
324
325    pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
326        self.determine_timestamp.with_label_values(label_values)
327    }
328
329    pub(crate) fn timestamp_difference_for_strict_serializable_ms(
330        &self,
331        label_values: &[&str],
332    ) -> Histogram {
333        self.timestamp_difference_for_strict_serializable_ms
334            .with_label_values(label_values)
335    }
336
337    pub(crate) fn timestamp_difference_for_bounded_staleness_ms(
338        &self,
339        label_values: &[&str],
340    ) -> Histogram {
341        self.timestamp_difference_for_bounded_staleness_ms
342            .with_label_values(label_values)
343    }
344
345    pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
346        self.optimization_notices.with_label_values(label_values)
347    }
348
349    pub(crate) fn statement_logging_records(
350        &self,
351        label_values: &[&str],
352    ) -> GenericCounter<AtomicU64> {
353        self.statement_logging_records
354            .with_label_values(label_values)
355    }
356
357    pub(crate) fn statement_logging_unsampled_bytes(&self) -> &IntCounter {
358        &self.statement_logging_unsampled_bytes
359    }
360
361    pub(crate) fn statement_logging_actual_bytes(&self) -> &IntCounter {
362        &self.statement_logging_actual_bytes
363    }
364}
365
366pub(crate) fn session_type_label_value(user: &User) -> &'static str {
367    match user.is_internal() {
368        true => "system",
369        false => "user",
370    }
371}
372
373pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
374where
375    T: AstInfo,
376{
377    statement_kind_label_value(StatementKind::from(stmt))
378}
379
380pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
381where
382    T: AstInfo,
383{
384    match output {
385        SubscribeOutput::Diffs => "diffs",
386        SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
387        SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
388        SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
389    }
390}