1use std::collections::BTreeMap;
13use std::rc::Rc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::dyncfgs::COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL;
17use mz_dyncfg::ConfigSet;
18use mz_ore::cast::{CastFrom, CastLossy};
19use mz_ore::collections::CollectionExt;
20use mz_ore::metrics::MetricsRegistry;
21use mz_ore::soft_panic_or_log;
22use mz_repr::{Datum, Diff, Timestamp};
23use mz_timely_util::columnar::builder::ColumnBuilder;
24use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
25use prometheus::proto::MetricType;
26use timely::dataflow::Scope;
27use timely::dataflow::channels::pact::ExchangeCore;
28use timely::dataflow::operators::generic::OutputBuilder;
29use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
30
31use crate::extensions::arrange::MzArrangeCore;
32use crate::logging::{ComputeLog, LogCollection, LogVariant, PermutedRowPacker};
33use crate::row_spine::RowRowBuilder;
34use crate::typedefs::RowRowSpine;
35
36pub(super) struct Return {
38 pub collections: BTreeMap<LogVariant, LogCollection>,
40}
41
42type SnapshotKey = (String, Vec<(String, String)>);
44type SnapshotValue = (f64, &'static str, String);
46
47pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
49 scope: G,
50 config: &mz_compute_client::logging::LoggingConfig,
51 metrics_registry: MetricsRegistry,
52 now: Instant,
53 start_offset: Duration,
54 worker_config: Rc<ConfigSet>,
55 workers_per_process: usize,
56) -> Return {
57 let variant = LogVariant::Compute(ComputeLog::PrometheusMetrics);
58 let mut collections = BTreeMap::new();
59 let interval = config.interval;
60 let interval_ms = std::cmp::max(1, interval.as_millis());
61
62 if !config.index_logs.contains_key(&variant) {
63 return Return { collections };
64 }
65
66 let process_id = scope.index() / workers_per_process;
67 let enable = scope.index() % workers_per_process == 0;
68
69 let mut builder = OperatorBuilder::new("PrometheusMetrics".to_string(), scope.clone());
72 let (output, stream) = builder.new_output();
73 let mut output = OutputBuilder::<_, ColumnBuilder<_>>::from(output);
74
75 let operator_info = builder.operator_info();
76 builder.build(move |capabilities| {
77 let mut cap = enable.then_some(capabilities.into_element());
81 let activator = scope.activator_for(operator_info.address);
82
83 let mut prev_snapshot: BTreeMap<SnapshotKey, SnapshotValue> = BTreeMap::new();
84 let mut next_scrape = Instant::now();
85 let mut packer = PermutedRowPacker::new(ComputeLog::PrometheusMetrics);
86
87 move |_frontiers| {
88 let Some(cap) = &mut cap else { return };
89
90 let elapsed = now.elapsed().as_millis();
97 let time_ms: u128 =
98 ((elapsed + start_offset.as_millis()) / interval_ms + 1) * interval_ms;
99 let ts: Timestamp = time_ms.try_into().expect("must fit");
100 cap.downgrade(&ts);
101
102 let next_boundary_ms = time_ms - start_offset.as_millis();
105 let next_activation =
106 now + Duration::from_millis(next_boundary_ms.try_into().expect("must fit"));
107 activator.activate_after(next_activation.saturating_duration_since(Instant::now()));
108
109 if Instant::now() < next_scrape {
113 return;
114 }
115
116 let prom_interval =
117 COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL.get(&worker_config);
118 let effective_interval = prom_interval.max(interval);
119 next_scrape = Instant::now() + effective_interval;
120
121 let new_snapshot = if !prom_interval.is_zero() {
124 let metric_families = metrics_registry.gather();
125 flatten_metrics(metric_families)
126 } else {
127 BTreeMap::new()
128 };
129
130 let mut output = output.activate();
132 let mut session = output.session_with_builder(&cap);
133
134 for (key, old_val) in &prev_snapshot {
136 match new_snapshot.get(key) {
137 Some(new_val) if new_val == old_val => {}
138 _ => {
139 let (row_key, row_val) = pack_row(
140 &mut packer,
141 &key.0,
142 old_val.1,
143 &key.1,
144 old_val.0,
145 &old_val.2,
146 process_id,
147 );
148 session.give(((row_key, row_val), ts, Diff::MINUS_ONE));
149 }
150 }
151 }
152
153 for (key, new_val) in &new_snapshot {
155 match prev_snapshot.get(key) {
156 Some(old_val) if old_val == new_val => {}
157 _ => {
158 let (row_key, row_val) = pack_row(
159 &mut packer,
160 &key.0,
161 new_val.1,
162 &key.1,
163 new_val.0,
164 &new_val.2,
165 process_id,
166 );
167 session.give(((row_key, row_val), ts, Diff::ONE));
168 }
169 }
170 }
171
172 prev_snapshot = new_snapshot;
173 }
174 });
175
176 let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
178 columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>,
179 );
180 let trace = stream
181 .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
182 exchange,
183 "Arrange PrometheusMetrics",
184 )
185 .trace;
186 let token: Rc<dyn std::any::Any> = Rc::new(());
187 let collection = LogCollection { trace, token };
188 collections.insert(variant, collection);
189
190 Return { collections }
191}
192
193fn flatten_metrics(
195 families: Vec<prometheus::proto::MetricFamily>,
196) -> BTreeMap<SnapshotKey, SnapshotValue> {
197 let mut snapshot = BTreeMap::new();
198
199 for family in families {
200 let base_name = family.name();
201 let help = family.help();
202 let metric_type = family.get_field_type();
203 let type_str = match metric_type {
204 MetricType::COUNTER => "counter",
205 MetricType::GAUGE => "gauge",
206 MetricType::HISTOGRAM => "histogram",
207 MetricType::SUMMARY => "summary",
208 MetricType::UNTYPED => "untyped",
209 };
210
211 for metric in family.get_metric() {
212 let base_labels: Vec<(String, String)> = metric
213 .get_label()
214 .iter()
215 .map(|l| (l.name().to_string(), l.value().to_string()))
216 .collect();
217
218 match metric_type {
219 MetricType::COUNTER => {
220 let value = metric.get_counter().get_value();
221 insert_row(
222 &mut snapshot,
223 base_name.to_string(),
224 base_labels,
225 value,
226 type_str,
227 help,
228 );
229 }
230 MetricType::GAUGE => {
231 let value = metric.get_gauge().get_value();
232 insert_row(
233 &mut snapshot,
234 base_name.to_string(),
235 base_labels,
236 value,
237 type_str,
238 help,
239 );
240 }
241 MetricType::HISTOGRAM => {
242 let histogram = metric.get_histogram();
243
244 for bucket in histogram.get_bucket() {
246 let mut labels = base_labels.clone();
247 labels.push(("le".to_string(), format_f64(bucket.upper_bound())));
248 insert_row(
249 &mut snapshot,
250 format!("{base_name}_bucket"),
251 labels,
252 f64::cast_lossy(bucket.cumulative_count()),
253 type_str,
254 help,
255 );
256 }
257
258 insert_row(
260 &mut snapshot,
261 format!("{base_name}_sum"),
262 base_labels.clone(),
263 histogram.get_sample_sum(),
264 type_str,
265 help,
266 );
267
268 insert_row(
270 &mut snapshot,
271 format!("{base_name}_count"),
272 base_labels,
273 f64::cast_lossy(histogram.get_sample_count()),
274 type_str,
275 help,
276 );
277 }
278 MetricType::SUMMARY => {
279 let summary = metric.get_summary();
280
281 for quantile in summary.get_quantile() {
283 let mut labels = base_labels.clone();
284 labels.push(("quantile".to_string(), format_f64(quantile.quantile())));
285 insert_row(
286 &mut snapshot,
287 base_name.to_string(),
288 labels,
289 quantile.value(),
290 type_str,
291 help,
292 );
293 }
294
295 insert_row(
297 &mut snapshot,
298 format!("{base_name}_sum"),
299 base_labels.clone(),
300 summary.sample_sum(),
301 type_str,
302 help,
303 );
304
305 insert_row(
307 &mut snapshot,
308 format!("{base_name}_count"),
309 base_labels,
310 f64::cast_lossy(summary.sample_count()),
311 type_str,
312 help,
313 );
314 }
315 MetricType::UNTYPED => {
316 soft_panic_or_log!("unexpected untyped metric: {base_name}");
317 }
318 }
319 }
320 }
321
322 snapshot
323}
324
325fn format_f64(v: f64) -> String {
327 if v == f64::INFINITY {
328 "+Inf".to_string()
329 } else if v == f64::NEG_INFINITY {
330 "-Inf".to_string()
331 } else {
332 v.to_string()
333 }
334}
335
336fn insert_row(
338 snapshot: &mut BTreeMap<SnapshotKey, SnapshotValue>,
339 name: String,
340 mut labels: Vec<(String, String)>,
341 value: f64,
342 metric_type: &'static str,
343 help: &str,
344) {
345 labels.sort();
346
347 snapshot.insert((name, labels), (value, metric_type, help.to_string()));
348}
349
350fn pack_row<'a>(
352 packer: &'a mut PermutedRowPacker,
353 metric_name: &str,
354 metric_type: &str,
355 labels: &[(String, String)],
356 value: f64,
357 help: &str,
358 process_id: usize,
359) -> (&'a mz_repr::RowRef, &'a mz_repr::RowRef) {
360 packer.pack_by_index(|row_packer, index| match index {
361 0 => row_packer.push(Datum::UInt64(u64::cast_from(process_id))),
363 1 => row_packer.push(Datum::String(metric_name)),
365 2 => row_packer.push(Datum::String(metric_type)),
367 3 => {
369 row_packer.push_dict(labels.iter().map(|(k, v)| (k.as_str(), Datum::String(v))));
370 }
371 4 => row_packer.push(Datum::Float64(value.into())),
373 5 => row_packer.push(Datum::String(help)),
375 _ => unreachable!("unexpected column index {index}"),
376 })
377}