1use std::collections::BTreeMap;
13use std::rc::Rc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::dyncfgs::COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL;
17use mz_dyncfg::ConfigSet;
18use mz_ore::cast::{CastFrom, CastLossy};
19use mz_ore::collections::CollectionExt;
20use mz_ore::metrics::MetricsRegistry;
21use mz_ore::soft_panic_or_log;
22use mz_repr::{Datum, Diff, Timestamp};
23use mz_timely_util::columnar::batcher;
24use mz_timely_util::columnar::builder::ColumnBuilder;
25use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
26use prometheus::proto::MetricType;
27use timely::dataflow::Scope;
28use timely::dataflow::channels::pact::ExchangeCore;
29use timely::dataflow::operators::generic::OutputBuilder;
30use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
31
32use crate::extensions::arrange::MzArrangeCore;
33use crate::logging::{ComputeLog, LogCollection, LogVariant, PermutedRowPacker};
34use crate::typedefs::RowRowSpine;
35use mz_row_spine::RowRowBuilder;
36
37pub(super) struct Return {
39 pub collections: BTreeMap<LogVariant, LogCollection>,
41}
42
43type SnapshotKey = (String, Vec<(String, String)>);
45type SnapshotValue = (f64, &'static str, String);
47
48pub(super) fn construct(
50 scope: Scope<'_, Timestamp>,
51 config: &mz_compute_client::logging::LoggingConfig,
52 metrics_registry: MetricsRegistry,
53 now: Instant,
54 start_offset: Duration,
55 worker_config: Rc<ConfigSet>,
56 workers_per_process: usize,
57) -> Return {
58 let variant = LogVariant::Compute(ComputeLog::PrometheusMetrics);
59 let mut collections = BTreeMap::new();
60 let interval = config.interval;
61 let interval_ms = std::cmp::max(1, interval.as_millis());
62
63 if !config.index_logs.contains_key(&variant) {
64 return Return { collections };
65 }
66
67 let process_id = scope.index() / workers_per_process;
68 let enable = scope.index() % workers_per_process == 0;
69
70 let mut builder = OperatorBuilder::new("PrometheusMetrics".to_string(), scope.clone());
73 let (output, stream) = builder.new_output();
74 let mut output = OutputBuilder::<_, ColumnBuilder<_>>::from(output);
75
76 let operator_info = builder.operator_info();
77 builder.build(move |capabilities| {
78 let mut cap = enable.then_some(capabilities.into_element());
82 let activator = scope.activator_for(operator_info.address);
83
84 let mut prev_snapshot: BTreeMap<SnapshotKey, SnapshotValue> = BTreeMap::new();
85 let mut next_scrape = Instant::now();
86 let mut packer = PermutedRowPacker::new(ComputeLog::PrometheusMetrics);
87
88 move |_frontiers| {
89 let Some(cap) = &mut cap else { return };
90
91 let elapsed = now.elapsed().as_millis();
98 let time_ms: u128 =
99 ((elapsed + start_offset.as_millis()) / interval_ms + 1) * interval_ms;
100 let ts: Timestamp = time_ms.try_into().expect("must fit");
101 cap.downgrade(&ts);
102
103 let next_boundary_ms = time_ms - start_offset.as_millis();
106 let next_activation =
107 now + Duration::from_millis(next_boundary_ms.try_into().expect("must fit"));
108 activator.activate_after(next_activation.saturating_duration_since(Instant::now()));
109
110 if Instant::now() < next_scrape {
114 return;
115 }
116
117 let prom_interval =
118 COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL.get(&worker_config);
119 let effective_interval = prom_interval.max(interval);
120 next_scrape = Instant::now() + effective_interval;
121
122 let new_snapshot = if !prom_interval.is_zero() {
125 let metric_families = metrics_registry.gather();
126 flatten_metrics(metric_families)
127 } else {
128 BTreeMap::new()
129 };
130
131 let mut output = output.activate();
133 let mut session = output.session_with_builder(&cap);
134
135 for (key, old_val) in &prev_snapshot {
137 match new_snapshot.get(key) {
138 Some(new_val) if new_val == old_val => {}
139 _ => {
140 let (row_key, row_val) = pack_row(
141 &mut packer,
142 &key.0,
143 old_val.1,
144 &key.1,
145 old_val.0,
146 &old_val.2,
147 process_id,
148 );
149 session.give(((row_key, row_val), ts, Diff::MINUS_ONE));
150 }
151 }
152 }
153
154 for (key, new_val) in &new_snapshot {
156 match prev_snapshot.get(key) {
157 Some(old_val) if old_val == new_val => {}
158 _ => {
159 let (row_key, row_val) = pack_row(
160 &mut packer,
161 &key.0,
162 new_val.1,
163 &key.1,
164 new_val.0,
165 &new_val.2,
166 process_id,
167 );
168 session.give(((row_key, row_val), ts, Diff::ONE));
169 }
170 }
171 }
172
173 prev_snapshot = new_snapshot;
174 }
175 });
176
177 let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
179 columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>,
180 );
181 let trace = stream
182 .mz_arrange_core::<
183 _,
184 batcher::Chunker<_>,
185 Col2ValBatcher<_, _, _, _>,
186 RowRowBuilder<_, _>,
187 RowRowSpine<_, _>,
188 >(exchange, "Arrange PrometheusMetrics")
189 .trace;
190 let token: Rc<dyn std::any::Any> = Rc::new(());
191 let collection = LogCollection { trace, token };
192 collections.insert(variant, collection);
193
194 Return { collections }
195}
196
197fn flatten_metrics(
199 families: Vec<prometheus::proto::MetricFamily>,
200) -> BTreeMap<SnapshotKey, SnapshotValue> {
201 let mut snapshot = BTreeMap::new();
202
203 for family in families {
204 let base_name = family.name();
205 let help = family.help();
206 let metric_type = family.get_field_type();
207 let type_str = match metric_type {
208 MetricType::COUNTER => "counter",
209 MetricType::GAUGE => "gauge",
210 MetricType::HISTOGRAM => "histogram",
211 MetricType::SUMMARY => "summary",
212 MetricType::UNTYPED => "untyped",
213 };
214
215 for metric in family.get_metric() {
216 let base_labels: Vec<(String, String)> = metric
217 .get_label()
218 .iter()
219 .map(|l| (l.name().to_string(), l.value().to_string()))
220 .collect();
221
222 match metric_type {
223 MetricType::COUNTER => {
224 let value = metric.get_counter().value();
225 insert_row(
226 &mut snapshot,
227 base_name.to_string(),
228 base_labels,
229 value,
230 type_str,
231 help,
232 );
233 }
234 MetricType::GAUGE => {
235 let value = metric.get_gauge().value();
236 insert_row(
237 &mut snapshot,
238 base_name.to_string(),
239 base_labels,
240 value,
241 type_str,
242 help,
243 );
244 }
245 MetricType::HISTOGRAM => {
246 let histogram = metric.get_histogram();
247
248 for bucket in histogram.get_bucket() {
250 let mut labels = base_labels.clone();
251 labels.push(("le".to_string(), format_f64(bucket.upper_bound())));
252 insert_row(
253 &mut snapshot,
254 format!("{base_name}_bucket"),
255 labels,
256 f64::cast_lossy(bucket.cumulative_count()),
257 type_str,
258 help,
259 );
260 }
261
262 insert_row(
264 &mut snapshot,
265 format!("{base_name}_sum"),
266 base_labels.clone(),
267 histogram.get_sample_sum(),
268 type_str,
269 help,
270 );
271
272 insert_row(
274 &mut snapshot,
275 format!("{base_name}_count"),
276 base_labels,
277 f64::cast_lossy(histogram.get_sample_count()),
278 type_str,
279 help,
280 );
281 }
282 MetricType::SUMMARY => {
283 let summary = metric.get_summary();
284
285 for quantile in summary.get_quantile() {
287 let mut labels = base_labels.clone();
288 labels.push(("quantile".to_string(), format_f64(quantile.quantile())));
289 insert_row(
290 &mut snapshot,
291 base_name.to_string(),
292 labels,
293 quantile.value(),
294 type_str,
295 help,
296 );
297 }
298
299 insert_row(
301 &mut snapshot,
302 format!("{base_name}_sum"),
303 base_labels.clone(),
304 summary.sample_sum(),
305 type_str,
306 help,
307 );
308
309 insert_row(
311 &mut snapshot,
312 format!("{base_name}_count"),
313 base_labels,
314 f64::cast_lossy(summary.sample_count()),
315 type_str,
316 help,
317 );
318 }
319 MetricType::UNTYPED => {
320 soft_panic_or_log!("unexpected untyped metric: {base_name}");
321 }
322 }
323 }
324 }
325
326 snapshot
327}
328
329fn format_f64(v: f64) -> String {
331 if v == f64::INFINITY {
332 "+Inf".to_string()
333 } else if v == f64::NEG_INFINITY {
334 "-Inf".to_string()
335 } else {
336 v.to_string()
337 }
338}
339
340fn insert_row(
342 snapshot: &mut BTreeMap<SnapshotKey, SnapshotValue>,
343 name: String,
344 mut labels: Vec<(String, String)>,
345 value: f64,
346 metric_type: &'static str,
347 help: &str,
348) {
349 labels.sort();
350
351 snapshot.insert((name, labels), (value, metric_type, help.to_string()));
352}
353
354fn pack_row<'a>(
356 packer: &'a mut PermutedRowPacker,
357 metric_name: &str,
358 metric_type: &str,
359 labels: &[(String, String)],
360 value: f64,
361 help: &str,
362 process_id: usize,
363) -> (&'a mz_repr::RowRef, &'a mz_repr::RowRef) {
364 packer.pack_by_index(|row_packer, index| match index {
365 0 => row_packer.push(Datum::UInt64(u64::cast_from(process_id))),
367 1 => row_packer.push(Datum::String(metric_name)),
369 2 => row_packer.push(Datum::String(metric_type)),
371 3 => {
373 row_packer.push_dict(labels.iter().map(|(k, v)| (k.as_str(), Datum::String(v))));
374 }
375 4 => row_packer.push(Datum::Float64(value.into())),
377 5 => row_packer.push(Datum::String(help)),
379 _ => unreachable!("unexpected column index {index}"),
380 })
381}