1use mz_ore::metric;
11use mz_ore::metrics::MetricsRegistry;
12use mz_ore::stats::{histogram_milliseconds_buckets, histogram_seconds_buckets};
13use mz_sql::ast::{AstInfo, Statement, StatementKind, SubscribeOutput};
14use mz_sql::session::user::User;
15use mz_sql_parser::ast::statement_kind_label_value;
16use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};
17
18#[derive(Debug, Clone)]
19pub struct Metrics {
20 pub query_total: IntCounterVec,
21 pub active_sessions: IntGaugeVec,
22 pub active_subscribes: IntGaugeVec,
23 pub active_copy_tos: IntGaugeVec,
24 pub queue_busy_seconds: HistogramVec,
25 pub determine_timestamp: IntCounterVec,
26 pub timestamp_difference_for_strict_serializable_ms: HistogramVec,
27 pub commands: IntCounterVec,
28 pub storage_usage_collection_time_seconds: HistogramVec,
29 pub subscribe_outputs: IntCounterVec,
30 pub canceled_peeks: IntCounterVec,
31 pub linearize_message_seconds: HistogramVec,
32 pub time_to_first_row_seconds: HistogramVec,
33 pub statement_logging_records: IntCounterVec,
34 pub statement_logging_unsampled_bytes: IntCounterVec,
35 pub statement_logging_actual_bytes: IntCounterVec,
36 pub message_batch: HistogramVec,
37 pub message_handling: HistogramVec,
38 pub optimization_notices: IntCounterVec,
39 pub append_table_duration_seconds: HistogramVec,
40 pub webhook_validation_reduce_failures: IntCounterVec,
41 pub webhook_get_appender: IntCounter,
42 pub check_scheduling_policies_seconds: HistogramVec,
43 pub handle_scheduling_decisions_seconds: HistogramVec,
44 pub row_set_finishing_seconds: HistogramVec,
45 pub session_startup_table_writes_seconds: HistogramVec,
46}
47
48impl Metrics {
49 pub(crate) fn register_into(registry: &MetricsRegistry) -> Self {
50 Self {
51 query_total: registry.register(metric!(
52 name: "mz_query_total",
53 help: "The total number of queries issued of the given type since process start.",
54 var_labels: ["session_type", "statement_type"],
55 )),
56 active_sessions: registry.register(metric!(
57 name: "mz_active_sessions",
58 help: "The number of active coordinator sessions.",
59 var_labels: ["session_type"],
60 )),
61 active_subscribes: registry.register(metric!(
62 name: "mz_active_subscribes",
63 help: "The number of active SUBSCRIBE queries.",
64 var_labels: ["session_type"],
65 )),
66 active_copy_tos: registry.register(metric!(
67 name: "mz_active_copy_tos",
68 help: "The number of active COPY TO queries.",
69 var_labels: ["session_type"],
70 )),
71 queue_busy_seconds: registry.register(metric!(
72 name: "mz_coord_queue_busy_seconds",
73 help: "The number of seconds the coord queue was processing before it was empty. This is a sampled metric and does not measure the full coord queue wait/idle times.",
74 buckets: histogram_seconds_buckets(0.000_128, 32.0)
75 )),
76 determine_timestamp: registry.register(metric!(
77 name: "mz_determine_timestamp",
78 help: "The total number of calls to determine_timestamp.",
79 var_labels:["respond_immediately", "isolation_level", "compute_instance", "determination_method"],
80 )),
81 timestamp_difference_for_strict_serializable_ms: registry.register(metric!(
82 name: "mz_timestamp_difference_for_strict_serializable_ms",
83 help: "Difference in timestamp in milliseconds for running in strict serializable vs serializable isolation level.",
84 var_labels:["compute_instance", "determination_method"],
85 buckets: histogram_milliseconds_buckets(1., 8000.),
86 )),
87 commands: registry.register(metric!(
88 name: "mz_adapter_commands",
89 help: "The total number of adapter commands issued of the given type since process start.",
90 var_labels: ["command_type", "status", "application_name"],
91 )),
92 storage_usage_collection_time_seconds: registry.register(metric!(
93 name: "mz_storage_usage_collection_time_seconds",
94 help: "The number of seconds the coord spends collecting usage metrics from storage.",
95 buckets: histogram_seconds_buckets(0.000_128, 8.0)
96 )),
97 subscribe_outputs: registry.register(metric!(
98 name: "mz_subscribe_outputs",
99 help: "The total number of different subscribe outputs used",
100 var_labels: ["session_type", "subscribe_output"],
101 )),
102 canceled_peeks: registry.register(metric!(
103 name: "mz_canceled_peeks_total",
104 help: "The total number of canceled peeks since process start.",
105 )),
106 linearize_message_seconds: registry.register(metric!(
107 name: "mz_linearize_message_seconds",
108 help: "The number of seconds it takes to linearize strict serializable messages",
109 var_labels: ["type", "immediately_handled"],
110 buckets: histogram_seconds_buckets(0.000_128, 8.0),
111 )),
112 time_to_first_row_seconds: registry.register(metric! {
113 name: "mz_time_to_first_row_seconds",
114 help: "Latency of an execute for a successful query from pgwire's perspective",
115 var_labels: ["instance_id", "isolation_level", "strategy"],
116 buckets: histogram_seconds_buckets(0.000_128, 32.0)
117 }),
118 statement_logging_records: registry.register(metric! {
119 name: "mz_statement_logging_record_count",
120 help: "The total number of SQL statements tagged with whether or not they were recorded.",
121 var_labels: ["sample"],
122 }),
123 statement_logging_unsampled_bytes: registry.register(metric!(
124 name: "mz_statement_logging_unsampled_bytes",
125 help: "The total amount of SQL text that would have been logged if statement logging were unsampled.",
126 )),
127 statement_logging_actual_bytes: registry.register(metric!(
128 name: "mz_statement_logging_actual_bytes",
129 help: "The total amount of SQL text that was logged by statement logging.",
130 )),
131 message_batch: registry.register(metric!(
132 name: "mz_coordinator_message_batch_size",
133 help: "Message batch size handled by the coordinator.",
134 buckets: vec![0., 1., 2., 3., 4., 6., 8., 12., 16., 24., 32., 48., 64.],
135 )),
136 message_handling: registry.register(metric!(
137 name: "mz_slow_message_handling",
138 help: "Latency for ALL coordinator messages. 'slow' is in the name for legacy reasons, but is not accurate.",
139 var_labels: ["message_kind"],
140 buckets: histogram_seconds_buckets(0.000_128, 32.0),
141 )),
142 optimization_notices: registry.register(metric!(
143 name: "mz_optimization_notices",
144 help: "Number of optimization notices per notice type.",
145 var_labels: ["notice_type"],
146 )),
147 append_table_duration_seconds: registry.register(metric!(
148 name: "mz_append_table_duration_seconds",
149 help: "Latency for appending to any (user or system) table.",
150 buckets: histogram_seconds_buckets(0.128, 32.0),
151 )),
152 webhook_validation_reduce_failures: registry.register(metric!(
153 name: "mz_webhook_validation_reduce_failures",
154 help: "Count of how many times we've failed to reduce a webhook source's CHECK statement.",
155 var_labels: ["reason"],
156 )),
157 webhook_get_appender: registry.register(metric!(
158 name: "mz_webhook_get_appender_count",
159 help: "Count of getting a webhook appender from the Coordinator.",
160 )),
161 check_scheduling_policies_seconds: registry.register(metric!(
162 name: "mz_check_scheduling_policies_seconds",
163 help: "The time each policy in `check_scheduling_policies` takes.",
164 var_labels: ["policy", "thread"],
165 buckets: histogram_seconds_buckets(0.000_128, 8.0),
166 )),
167 handle_scheduling_decisions_seconds: registry.register(metric!(
168 name: "mz_handle_scheduling_decisions_seconds",
169 help: "The time `handle_scheduling_decisions` takes.",
170 var_labels: ["altered_a_cluster"],
171 buckets: histogram_seconds_buckets(0.000_128, 8.0),
172 )),
173 row_set_finishing_seconds: registry.register(metric!(
174 name: "mz_row_set_finishing_seconds",
175 help: "The time it takes to run RowSetFinishing::finish.",
176 buckets: histogram_seconds_buckets(0.000_128, 16.0),
177 )),
178 session_startup_table_writes_seconds: registry.register(metric!(
179 name: "mz_session_startup_table_writes_seconds",
180 help: "If we had to wait for builtin table writes before processing a query, how long did we wait for.",
181 buckets: histogram_seconds_buckets(0.000_008, 4.0),
182 )),
183 }
184 }
185
186 pub(crate) fn row_set_finishing_seconds(&self) -> Histogram {
187 self.row_set_finishing_seconds.with_label_values(&[])
188 }
189
190 pub(crate) fn session_metrics(&self) -> SessionMetrics {
191 SessionMetrics {
192 row_set_finishing_seconds: self.row_set_finishing_seconds(),
193 session_startup_table_writes_seconds: self
194 .session_startup_table_writes_seconds
195 .with_label_values(&[]),
196 }
197 }
198}
199
200#[derive(Debug, Clone)]
202pub struct SessionMetrics {
203 row_set_finishing_seconds: Histogram,
204 session_startup_table_writes_seconds: Histogram,
205}
206
207impl SessionMetrics {
208 pub(crate) fn row_set_finishing_seconds(&self) -> &Histogram {
209 &self.row_set_finishing_seconds
210 }
211
212 pub(crate) fn session_startup_table_writes_seconds(&self) -> &Histogram {
213 &self.session_startup_table_writes_seconds
214 }
215}
216
217pub(crate) fn session_type_label_value(user: &User) -> &'static str {
218 match user.is_internal() {
219 true => "system",
220 false => "user",
221 }
222}
223
224pub(crate) fn statement_type_label_value<T>(stmt: &Statement<T>) -> &'static str
225where
226 T: AstInfo,
227{
228 statement_kind_label_value(StatementKind::from(stmt))
229}
230
231pub(crate) fn subscribe_output_label_value<T>(output: &SubscribeOutput<T>) -> &'static str
232where
233 T: AstInfo,
234{
235 match output {
236 SubscribeOutput::Diffs => "diffs",
237 SubscribeOutput::WithinTimestampOrderBy { .. } => "within_timestamp_order_by",
238 SubscribeOutput::EnvelopeUpsert { .. } => "envelope_upsert",
239 SubscribeOutput::EnvelopeDebezium { .. } => "envelope_debezium",
240 }
241}