1use std::sync::{Arc, Mutex};
11
12use mz_compute_client::metrics::{CommandMetrics, HistoryMetrics};
13use mz_ore::cast::CastFrom;
14use mz_ore::metric;
15use mz_ore::metrics::{MetricsRegistry, UIntGauge, raw};
16use mz_repr::{GlobalId, SharedRow};
17use prometheus::core::{AtomicF64, GenericCounter};
18use prometheus::proto::LabelPair;
19use prometheus::{Histogram, HistogramVec};
20
21#[derive(Clone, Debug)]
31pub struct ComputeMetrics {
32 workload_class: Arc<Mutex<Option<String>>>,
34
35 history_command_count: raw::UIntGaugeVec,
37 history_dataflow_count: raw::UIntGaugeVec,
38
39 reconciliation_reused_dataflows_count_total: raw::IntCounterVec,
41 reconciliation_replaced_dataflows_count_total: raw::IntCounterVec,
42
43 arrangement_maintenance_seconds_total: raw::CounterVec,
45 arrangement_maintenance_active_info: raw::UIntGaugeVec,
46
47 timely_step_duration_seconds: HistogramVec,
58 persist_peek_seconds: HistogramVec,
59 stashed_peek_seconds: HistogramVec,
60 handle_command_duration_seconds: HistogramVec,
61
62 shared_row_heap_capacity_bytes: raw::UIntGaugeVec,
64
65 replica_expiration_timestamp_seconds: raw::UIntGaugeVec,
67 replica_expiration_remaining_seconds: raw::GaugeVec,
68
69 collection_count: raw::UIntGaugeVec,
71}
72
73impl ComputeMetrics {
74 pub fn register_with(registry: &MetricsRegistry) -> Self {
75 let workload_class = Arc::new(Mutex::new(None));
76
77 registry.register_postprocessor({
80 let workload_class = Arc::clone(&workload_class);
81 move |metrics| {
82 let workload_class: Option<String> =
83 workload_class.lock().expect("lock poisoned").clone();
84 let Some(workload_class) = workload_class else {
85 return;
86 };
87 for metric in metrics {
88 for metric in metric.mut_metric() {
89 let mut label = LabelPair::default();
90 label.set_name("workload_class".into());
91 label.set_value(workload_class.clone());
92
93 let mut labels = metric.take_label();
94 labels.push(label);
95 metric.set_label(labels);
96 }
97 }
98 }
99 });
100
101 Self {
102 workload_class,
103 history_command_count: registry.register(metric!(
104 name: "mz_compute_replica_history_command_count",
105 help: "The number of commands in the replica's command history.",
106 var_labels: ["worker_id", "command_type"],
107 )),
108 history_dataflow_count: registry.register(metric!(
109 name: "mz_compute_replica_history_dataflow_count",
110 help: "The number of dataflows in the replica's command history.",
111 var_labels: ["worker_id"],
112 )),
113 reconciliation_reused_dataflows_count_total: registry.register(metric!(
114 name: "mz_compute_reconciliation_reused_dataflows_count_total",
115 help: "The total number of dataflows that were reused during compute reconciliation.",
116 var_labels: ["worker_id"],
117 )),
118 reconciliation_replaced_dataflows_count_total: registry.register(metric!(
119 name: "mz_compute_reconciliation_replaced_dataflows_count_total",
120 help: "The total number of dataflows that were replaced during compute reconciliation.",
121 var_labels: ["worker_id", "reason"],
122 )),
123 arrangement_maintenance_seconds_total: registry.register(metric!(
124 name: "mz_arrangement_maintenance_seconds_total",
125 help: "The total time spent maintaining arrangements.",
126 var_labels: ["worker_id"],
127 )),
128 arrangement_maintenance_active_info: registry.register(metric!(
129 name: "mz_arrangement_maintenance_active_info",
130 help: "Whether maintenance is currently occuring.",
131 var_labels: ["worker_id"],
132 )),
133 timely_step_duration_seconds: registry.register(metric!(
134 name: "mz_timely_step_duration_seconds",
135 help: "The time spent in each compute step_or_park call",
136 const_labels: {"cluster" => "compute"},
137 var_labels: ["worker_id"],
138 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 32.0),
139 )),
140 shared_row_heap_capacity_bytes: registry.register(metric!(
141 name: "mz_dataflow_shared_row_heap_capacity_bytes",
142 help: "The heap capacity of the shared row.",
143 var_labels: ["worker_id"],
144 )),
145 persist_peek_seconds: registry.register(metric!(
146 name: "mz_persist_peek_seconds",
147 help: "Time spent in (experimental) Persist fast-path peeks.",
148 var_labels: ["worker_id"],
149 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
150 )),
151 stashed_peek_seconds: registry.register(metric!(
152 name: "mz_stashed_peek_seconds",
153 help: "Time spent reading a peek result and stashing it in the peek result stash (aka. persist blob).",
154 var_labels: ["worker_id"],
155 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
156 )),
157 handle_command_duration_seconds: registry.register(metric!(
158 name: "mz_cluster_handle_command_duration_seconds",
159 help: "Time spent in handling commands.",
160 const_labels: {"cluster" => "compute"},
161 var_labels: ["worker_id", "command_type"],
162 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0),
163 )),
164 replica_expiration_timestamp_seconds: registry.register(metric!(
165 name: "mz_dataflow_replica_expiration_timestamp_seconds",
166 help: "The replica expiration timestamp in seconds since epoch.",
167 var_labels: ["worker_id"],
168 )),
169 replica_expiration_remaining_seconds: registry.register(metric!(
170 name: "mz_dataflow_replica_expiration_remaining_seconds",
171 help: "The remaining seconds until replica expiration. Can go negative, can lag behind.",
172 var_labels: ["worker_id"],
173 )),
174 collection_count: registry.register(metric!(
175 name: "mz_compute_collection_count",
176 help: "The number and hydration status of maintained compute collections.",
177 var_labels: ["worker_id", "type", "hydrated"],
178 )),
179 }
180 }
181
182 pub fn set_workload_class(&self, workload_class: Option<String>) {
184 let mut guard = self.workload_class.lock().expect("lock poisoned");
185 *guard = workload_class
186 }
187
188 pub fn for_worker(&self, worker_id: usize) -> WorkerMetrics {
189 let worker = worker_id.to_string();
190 let arrangement_maintenance_seconds_total = self
191 .arrangement_maintenance_seconds_total
192 .with_label_values(&[&worker]);
193 let arrangement_maintenance_active_info = self
194 .arrangement_maintenance_active_info
195 .with_label_values(&[&worker]);
196 let timely_step_duration_seconds = self
197 .timely_step_duration_seconds
198 .with_label_values(&[&worker]);
199 let persist_peek_seconds = self.persist_peek_seconds.with_label_values(&[&worker]);
200 let stashed_peek_seconds = self.stashed_peek_seconds.with_label_values(&[&worker]);
201 let handle_command_duration_seconds = CommandMetrics::build(|typ| {
202 self.handle_command_duration_seconds
203 .with_label_values(&[&worker, typ])
204 });
205 let replica_expiration_timestamp_seconds = self
206 .replica_expiration_timestamp_seconds
207 .with_label_values(&[&worker]);
208 let replica_expiration_remaining_seconds = self
209 .replica_expiration_remaining_seconds
210 .with_label_values(&[&worker]);
211 let shared_row_heap_capacity_bytes = self
212 .shared_row_heap_capacity_bytes
213 .with_label_values(&[&worker]);
214
215 WorkerMetrics {
216 worker_label: worker,
217 metrics: self.clone(),
218 arrangement_maintenance_seconds_total,
219 arrangement_maintenance_active_info,
220 timely_step_duration_seconds,
221 persist_peek_seconds,
222 stashed_peek_seconds,
223 handle_command_duration_seconds,
224 replica_expiration_timestamp_seconds,
225 replica_expiration_remaining_seconds,
226 shared_row_heap_capacity_bytes,
227 }
228 }
229}
230
231#[derive(Clone, Debug)]
233pub struct WorkerMetrics {
234 worker_label: String,
235 metrics: ComputeMetrics,
236
237 pub(crate) arrangement_maintenance_seconds_total: GenericCounter<AtomicF64>,
239 pub(crate) arrangement_maintenance_active_info: UIntGauge,
245 pub(crate) timely_step_duration_seconds: Histogram,
247 pub(crate) persist_peek_seconds: Histogram,
249 pub(crate) stashed_peek_seconds: Histogram,
251 pub(crate) handle_command_duration_seconds: CommandMetrics<Histogram>,
253 pub(crate) replica_expiration_timestamp_seconds: UIntGauge,
255 pub(crate) replica_expiration_remaining_seconds: raw::Gauge,
257 shared_row_heap_capacity_bytes: UIntGauge,
259}
260
261impl WorkerMetrics {
262 pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
263 let command_counts = CommandMetrics::build(|typ| {
264 self.metrics
265 .history_command_count
266 .with_label_values(&[&self.worker_label, typ])
267 });
268 let dataflow_count = self
269 .metrics
270 .history_dataflow_count
271 .with_label_values(&[&self.worker_label]);
272
273 HistoryMetrics {
274 command_counts,
275 dataflow_count,
276 }
277 }
278
279 pub fn record_dataflow_reconciliation(
293 &self,
294 compatible: bool,
295 uncompacted: bool,
296 subscribe_free: bool,
297 copy_to_free: bool,
298 dependencies_retained: bool,
299 ) {
300 if !compatible {
301 self.metrics
302 .reconciliation_replaced_dataflows_count_total
303 .with_label_values(&[&self.worker_label, "incompatible"])
304 .inc();
305 } else if !uncompacted {
306 self.metrics
307 .reconciliation_replaced_dataflows_count_total
308 .with_label_values(&[&self.worker_label, "compacted"])
309 .inc();
310 } else if !subscribe_free {
311 self.metrics
312 .reconciliation_replaced_dataflows_count_total
313 .with_label_values(&[&self.worker_label, "subscribe"])
314 .inc();
315 } else if !copy_to_free {
316 self.metrics
317 .reconciliation_replaced_dataflows_count_total
318 .with_label_values(&[&self.worker_label, "copy-to"])
319 .inc();
320 } else if !dependencies_retained {
321 self.metrics
322 .reconciliation_replaced_dataflows_count_total
323 .with_label_values(&[&self.worker_label, "dependency"])
324 .inc();
325 } else {
326 self.metrics
327 .reconciliation_reused_dataflows_count_total
328 .with_label_values(&[&self.worker_label])
329 .inc();
330 }
331 }
332
333 pub fn record_shared_row_metrics(&self) {
335 let binding = SharedRow::get();
336 self.shared_row_heap_capacity_bytes
337 .set(u64::cast_from(binding.byte_capacity()));
338 }
339
340 fn inc_collection_count(&self, collection_type: &str, hydrated: bool) {
342 let hydrated = if hydrated { "1" } else { "0" };
343 self.metrics
344 .collection_count
345 .with_label_values(&[&self.worker_label, collection_type, hydrated])
346 .inc();
347 }
348
349 fn dec_collection_count(&self, collection_type: &str, hydrated: bool) {
351 let hydrated = if hydrated { "1" } else { "0" };
352 self.metrics
353 .collection_count
354 .with_label_values(&[&self.worker_label, collection_type, hydrated])
355 .dec();
356 }
357
358 pub fn set_workload_class(&self, workload_class: Option<String>) {
360 self.metrics.set_workload_class(workload_class);
361 }
362
363 pub fn for_collection(&self, id: GlobalId) -> CollectionMetrics {
364 CollectionMetrics::new(id, self.clone())
365 }
366}
367
368#[derive(Clone, Debug)]
374pub struct CollectionMetrics {
375 metrics: WorkerMetrics,
376 collection_type: &'static str,
377 collection_hydrated: bool,
378}
379
380impl CollectionMetrics {
381 pub fn new(collection_id: GlobalId, metrics: WorkerMetrics) -> Self {
382 let collection_type = match collection_id {
383 GlobalId::System(_) => "system",
384 GlobalId::IntrospectionSourceIndex(_) => "log",
385 GlobalId::User(_) => "user",
386 GlobalId::Transient(_) => "transient",
387 GlobalId::Explain => "explain",
388 };
389 let collection_hydrated = false;
390
391 metrics.inc_collection_count(collection_type, collection_hydrated);
392
393 Self {
394 metrics,
395 collection_type,
396 collection_hydrated,
397 }
398 }
399
400 pub fn record_collection_hydrated(&mut self) {
402 if self.collection_hydrated {
403 return;
404 }
405
406 self.metrics
407 .dec_collection_count(self.collection_type, false);
408 self.metrics
409 .inc_collection_count(self.collection_type, true);
410 self.collection_hydrated = true;
411 }
412}
413
414impl Drop for CollectionMetrics {
415 fn drop(&mut self) {
416 self.metrics
417 .dec_collection_count(self.collection_type, self.collection_hydrated);
418 }
419}