Skip to main content

mz_adapter/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::metric;
11use mz_ore::metrics::{MetricVisibility, MetricsRegistry};
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::core::{AtomicU64, GenericCounter};
17use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
18
19#[derive(Debug, Clone)]
20pub struct Metrics {
21    pub query_total: IntCounterVec,
22    pub active_sessions: IntGaugeVec,
23    pub active_subscribes: IntGaugeVec,
24    pub active_copy_tos: IntGaugeVec,
25    pub queue_busy_seconds: Histogram,
26    pub determine_timestamp: IntCounterVec,
27    pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
28    pub timestamp_difference_for_bounded_staleness_ms: HistogramVec,
29    pub commands: IntCounterVec,
30    pub storage_usage_collection_time_seconds: Histogram,
31    pub arrangement_sizes_collection_time_seconds: Histogram,
32    pub arrangement_sizes_rows_written: IntCounter,
33    pub subscribe_outputs: IntCounterVec,
34    pub canceled_peeks: IntCounter,
35    pub linearize_message_seconds: HistogramVec,
36    pub time_to_first_row_seconds: HistogramVec,
37    pub statement_logging_records: IntCounterVec,
38    pub statement_logging_unsampled_bytes: IntCounter,
39    pub statement_logging_actual_bytes: IntCounter,
40    pub message_batch: Histogram,
41    pub message_handling: HistogramVec,
42    pub optimization_notices: IntCounterVec,
43    pub append_table_duration_seconds: Histogram,
44    pub webhook_validation_reduce_failures: IntCounterVec,
45    pub webhook_get_appender: IntCounter,
46    pub check_scheduling_policies_seconds: HistogramVec,
47    pub handle_scheduling_decisions_seconds: HistogramVec,
48    pub row_set_finishing_seconds: Histogram,
49    pub session_startup_table_writes_seconds: Histogram,
50    pub parse_seconds: Histogram,
51    pub pgwire_message_processing_seconds: HistogramVec,
52    pub result_rows_first_to_last_byte_seconds: HistogramVec,
53    pub pgwire_ensure_transaction_seconds: HistogramVec,
54    pub catalog_snapshot_seconds: HistogramVec,
55    pub pgwire_recv_scheduling_delay_ms: HistogramVec,
56    pub catalog_transact_seconds: HistogramVec,
57    pub apply_catalog_implications_seconds: Histogram,
58    pub group_commit_catalog_upper_seconds: Histogram,
59}
60
61impl Metrics {
62    pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
63        Self {
64            query_total: registry.register(metric!(
65                name: "mz_query_total",
66                help: "The total number of queries issued of the given type since process start.",
67                var_labels: ["session_type", "statement_type"],
68                visibility: MetricVisibility::Public,
69            )),
70            active_sessions: registry.register(metric!(
71                name: "mz_active_sessions",
72                help: "The number of active coordinator sessions.",
73                var_labels: ["session_type"],
74                visibility: MetricVisibility::Public,
75            )),
76            active_subscribes: registry.register(metric!(
77                name: "mz_active_subscribes",
78                help: "The number of active SUBSCRIBE queries.",
79                var_labels: ["session_type"],
80                visibility: MetricVisibility::Public,
81            )),
82            active_copy_tos: registry.register(metric!(
83                name: "mz_active_copy_tos",
84                help: "The number of active COPY TO queries.",
85                var_labels: ["session_type"],
86            )),
87            queue_busy_seconds: registry.register(metric!(
88                name: "mz_coord_queue_busy_seconds",
89                help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
90                buckets: histogram_seconds_buckets(0.000_128, 32.0)
91            )),
92            determine_timestamp: registry.register(metric!(
93                name: "mz_determine_timestamp",
94                help: "The total number of calls to determine_timestamp.",
95                var_labels:["respond_immediately", "isolation_level", "compute_instance"],
96            )),
97            timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
98                name: "mz_timestamp_difference_for_strict_serializable_ms",
99                help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
100                var_labels:["compute_instance"],
101                buckets: histogram_milliseconds_buckets(1., 8000.),
102            )),
103            timestamp_difference_for_bounded_staleness_ms: registry.register(metric!(
104                name: "mz_timestamp_difference_for_bounded_staleness_ms",
105                help: "How much older bounded-staleness timestamps are compared to serializable, in milliseconds. Measures the actual staleness incurred.",
106                var_labels:["compute_instance"],
107                buckets: histogram_milliseconds_buckets(1., 8000.),
108            )),
109            commands: registry.register(metric!(
110                name: "mz_adapter_commands",
111                help: "The total number of adapter commands issued of the given type since process start.",
112                var_labels: ["command_type", "status", "application_name"],
113                visibility: MetricVisibility::Public,
114            )),
115            storage_usage_collection_time_seconds: registry.register(metric!(
116                name: "mz_storage_usage_collection_time_seconds",
117                help: "The number of seconds the coord spends collecting usage metrics from storage.",
118                buckets: histogram_seconds_buckets(0.000_128, 8.0)
119            )),
120            arrangement_sizes_collection_time_seconds: registry.register(metric!(
121                name: "mz_arrangement_sizes_collection_time_seconds",
122                help: "Seconds to read mz_object_arrangement_sizes and prepare history-table updates for one snapshot.",
123                buckets: histogram_seconds_buckets(0.000_128, 8.0)
124            )),
125            arrangement_sizes_rows_written: registry.register(metric!(
126                name: "mz_arrangement_sizes_rows_written_total",
127                help: "Total rows appended to mz_object_arrangement_size_history since process start.",
128            )),
129            subscribe_outputs: registry.register(metric!(
130                name: "mz_subscribe_outputs",
131                help: "The total number of different subscribe outputs used",
132                var_labels: ["session_type", "subscribe_output"],
133            )),
134            canceled_peeks: registry.register(metric!(
135                name: "mz_canceled_peeks_total",
136                help: "The total number of canceled peeks since process start.",
137            )),
138            linearize_message_seconds: registry.register(metric!(
139                name: "mz_linearize_message_seconds",
140                help: "The number of seconds it takes to linearize strict serializable messages",
141                var_labels: ["type", "immediately_handled"],
142                buckets: histogram_seconds_buckets(0.000_128, 8.0),
143            )),
144            time_to_first_row_seconds: registry.register(metric! {
145                name: "mz_time_to_first_row_seconds",
146                help: "Latency of an execute for a successful query from pgwire's perspective",
147                var_labels: ["instance_id", "isolation_level", "strategy", "application_name"],
148                buckets: histogram_seconds_buckets(0.000_128, 32.0)
149            }),
150            statement_logging_records: registry.register(metric! {
151                name: "mz_statement_logging_record_count",
152                help: "The total number of SQL statements tagged with whether or not they were recorded.",
153                var_labels: ["sample"],
154            }),
155            statement_logging_unsampled_bytes: registry.register(metric!(
156                name: "mz_statement_logging_unsampled_bytes",
157                help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
158            )),
159            statement_logging_actual_bytes: registry.register(metric!(
160                name: "mz_statement_logging_actual_bytes",
161                help: "The total amount of SQL text that was logged by statement logging.",
162            )),
163            message_batch: registry.register(metric!(
164                name: "mz_coordinator_message_batch_size",
165                help: "Message batch size handled by the coordinator.",
166                buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
167            )),
168            message_handling: registry.register(metric!(
169                name: "mz_slow_message_handling",
170                help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
171                var_labels: ["message_kind"],
172                buckets: histogram_seconds_buckets(0.000_128, 512.0),
173            )),
174            optimization_notices: registry.register(metric!(
175                name: "mz_optimization_notices",
176                help: "Number of optimization notices per notice type.",
177                var_labels: ["notice_type"],
178            )),
179            append_table_duration_seconds: registry.register(metric!(
180                name: "mz_append_table_duration_seconds",
181                help: "Latency for appending to any (user or system) table.",
182                buckets: histogram_seconds_buckets(0.128, 32.0),
183            )),
184            webhook_validation_reduce_failures: registry.register(metric!(
185                name: "mz_webhook_validation_reduce_failures",
186                help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
187                var_labels: ["reason"],
188            )),
189            webhook_get_appender: registry.register(metric!(
190                name: "mz_webhook_get_appender_count",
191                help: "Count of getting a webhook appender from the Coordinator.",
192            )),
193            check_scheduling_policies_seconds: registry.register(metric!(
194                name: "mz_check_scheduling_policies_seconds",
195                help: "The time each policy in `check_scheduling_policies` takes.",
196                var_labels: ["policy", "thread"],
197                buckets: histogram_seconds_buckets(0.000_128, 8.0),
198            )),
199            handle_scheduling_decisions_seconds: registry.register(metric!(
200                name: "mz_handle_scheduling_decisions_seconds",
201                help: "The time `handle_scheduling_decisions` takes.",
202                var_labels: ["altered_a_cluster"],
203                buckets: histogram_seconds_buckets(0.000_128, 8.0),
204            )),
205            row_set_finishing_seconds: registry.register(metric!(
206                name: "mz_row_set_finishing_seconds",
207                help: "The time it takes to run RowSetFinishing::finish.",
208                buckets: histogram_seconds_buckets(0.000_128, 16.0),
209            )),
210            session_startup_table_writes_seconds: registry.register(metric!(
211                name: "mz_session_startup_table_writes_seconds",
212                help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
213                buckets: histogram_seconds_buckets(0.000_008, 4.0),
214            )),
215            parse_seconds: registry.register(metric!(
216                name: "mz_parse_seconds",
217                help: "The time it takes to parse a SQL statement. (Works for both Simple Queries and the Extended Query protocol.)",
218                buckets: histogram_seconds_buckets(0.001, 8.0),
219            )),
220            pgwire_message_processing_seconds: registry.register(metric!(
221                name: "mz_pgwire_message_processing_seconds",
222                help: "The time it takes to process each of the pgwire message types, measured in the Adapter frontend",
223                var_labels: ["message_type"],
224                buckets: histogram_seconds_buckets(0.001, 512.0),
225            )),
226            result_rows_first_to_last_byte_seconds: registry.register(metric!(
227                name: "mz_result_rows_first_to_last_byte_seconds",
228                help: "The time from just before sending the first result row to sending a final response message after having successfully flushed the last result row to the connection. (This can span multiple FETCH statements.) (This is never observed for unbounded SUBSCRIBEs, i.e., which have no last result row.)",
229                var_labels: ["statement_type"],
230                buckets: histogram_seconds_buckets(0.001, 8192.0),
231            )),
232            pgwire_ensure_transaction_seconds: registry.register(metric!(
233                name: "mz_pgwire_ensure_transaction_seconds",
234                help: "The time it takes to run `ensure_transactions` when processing pgwire messages.",
235                var_labels: ["message_type"],
236                buckets: histogram_seconds_buckets(0.001, 512.0),
237            )),
238            catalog_snapshot_seconds: registry.register(metric!(
239                name: "mz_catalog_snapshot_seconds",
240                help: "The time it takes to run `catalog_snapshot` when fetching the catalog.",
241                var_labels: ["context"],
242                buckets: histogram_seconds_buckets(0.001, 512.0),
243            )),
244            pgwire_recv_scheduling_delay_ms: registry.register(metric!(
245                name: "mz_pgwire_recv_scheduling_delay_ms",
246                help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
247                var_labels: ["message_type"],
248                buckets: histogram_milliseconds_buckets(0.128, 512000.),
249            )),
250            catalog_transact_seconds: registry.register(metric!(
251                name: "mz_catalog_transact_seconds",
252                help: "The time it takes to run various catalog transact methods.",
253                var_labels: ["method"],
254                buckets: histogram_seconds_buckets(0.001, 32.0),
255            )),
256            apply_catalog_implications_seconds: registry.register(metric!(
257                name: "mz_apply_catalog_implications_seconds",
258                help: "The time it takes to apply catalog implications.",
259                buckets: histogram_seconds_buckets(0.001, 32.0),
260            )),
261            group_commit_catalog_upper_seconds: registry.register(metric!(
262                name: "mz_group_commit_catalog_upper_seconds",
263                help: "The time it takes to advance the catalog shard upper during group commit.",
264                buckets: histogram_seconds_buckets(0.001, 32.0),
265            )),
266        }
267    }
268
269    pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
270        self.row_set_finishing_seconds.clone()
271    }
272
273    pub(crate) fn session_metrics(&self) -> SessionMetrics {
274        SessionMetrics {
275            row_set_finishing_seconds: self.row_set_finishing_seconds(),
276            session_startup_table_writes_seconds: self.session_startup_table_writes_seconds.clone(),
277            query_total: self.query_total.clone(),
278            determine_timestamp: self.determine_timestamp.clone(),
279            timestamp_difference_for_strict_serializable_ms: self
280                .timestamp_difference_for_strict_serializable_ms
281                .clone(),
282            timestamp_difference_for_bounded_staleness_ms: self
283                .timestamp_difference_for_bounded_staleness_ms
284                .clone(),
285            optimization_notices: self.optimization_notices.clone(),
286            statement_logging_records: self.statement_logging_records.clone(),
287            statement_logging_unsampled_bytes: self.statement_logging_unsampled_bytes.clone(),
288            statement_logging_actual_bytes: self.statement_logging_actual_bytes.clone(),
289        }
290    }
291}
292
293/// Metrics to be accessed from a [`crate::session::Session`].
294#[derive(Debug, Clone)]
295pub struct SessionMetrics {
296    row_set_finishing_seconds: Histogram,
297    session_startup_table_writes_seconds: Histogram,
298    query_total: IntCounterVec,
299    determine_timestamp: IntCounterVec,
300    timestamp_difference_for_strict_serializable_ms: HistogramVec,
301    timestamp_difference_for_bounded_staleness_ms: HistogramVec,
302    optimization_notices: IntCounterVec,
303    statement_logging_records: IntCounterVec,
304    statement_logging_unsampled_bytes: IntCounter,
305    statement_logging_actual_bytes: IntCounter,
306}
307
308impl SessionMetrics {
309    pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
310        &self.row_set_finishing_seconds
311    }
312
313    pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
314        &self.session_startup_table_writes_seconds
315    }
316
317    pub(crate) fn query_total(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
318        self.query_total.with_label_values(label_values)
319    }
320
321    pub(crate) fn determine_timestamp(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
322        self.determine_timestamp.with_label_values(label_values)
323    }
324
325    pub(crate) fn timestamp_difference_for_strict_serializable_ms(
326        &self,
327        label_values: &[&str],
328    ) -> Histogram {
329        self.timestamp_difference_for_strict_serializable_ms
330            .with_label_values(label_values)
331    }
332
333    pub(crate) fn timestamp_difference_for_bounded_staleness_ms(
334        &self,
335        label_values: &[&str],
336    ) -> Histogram {
337        self.timestamp_difference_for_bounded_staleness_ms
338            .with_label_values(label_values)
339    }
340
341    pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter<AtomicU64> {
342        self.optimization_notices.with_label_values(label_values)
343    }
344
345    pub(crate) fn statement_logging_records(
346        &self,
347        label_values: &[&str],
348    ) -> GenericCounter<AtomicU64> {
349        self.statement_logging_records
350            .with_label_values(label_values)
351    }
352
353    pub(crate) fn statement_logging_unsampled_bytes(&self) -> &IntCounter {
354        &self.statement_logging_unsampled_bytes
355    }
356
357    pub(crate) fn statement_logging_actual_bytes(&self) -> &IntCounter {
358        &self.statement_logging_actual_bytes
359    }
360}
361
362pub(crate) fn session_type_label_value(user: &User) -> &'static str {
363    match user.is_internal() {
364        true => "system",
365        false => "user",
366    }
367}
368
369pub fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
370where
371    T: AstInfo,
372{
373    statement_kind_label_value(StatementKind::from(stmt))
374}
375
376pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
377where
378    T: AstInfo,
379{
380    match output {
381        SubscribeOutput::Diffs => "diffs",
382        SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
383        SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
384        SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
385    }
386}