Skip to main content

mz_compute/logging/
prometheus.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflow for Prometheus metrics gathered from the metrics registry.
11
12use std::collections::BTreeMap;
13use std::rc::Rc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::dyncfgs::COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL;
17use mz_dyncfg::ConfigSet;
18use mz_ore::cast::{CastFrom, CastLossy};
19use mz_ore::collections::CollectionExt;
20use mz_ore::metrics::MetricsRegistry;
21use mz_ore::soft_panic_or_log;
22use mz_repr::{Datum, Diff, Timestamp};
23use mz_timely_util::columnar::builder::ColumnBuilder;
24use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
25use prometheus::proto::MetricType;
26use timely::dataflow::Scope;
27use timely::dataflow::channels::pact::ExchangeCore;
28use timely::dataflow::operators::generic::OutputBuilder;
29use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
30
31use crate::extensions::arrange::MzArrangeCore;
32use crate::logging::{ComputeLog, LogCollection, LogVariant, PermutedRowPacker};
33use crate::row_spine::RowRowBuilder;
34use crate::typedefs::RowRowSpine;
35
36/// The return type of [`construct`].
37pub(super) struct Return {
38    /// Collections to export.
39    pub collections: BTreeMap<LogVariant, LogCollection>,
40}
41
42/// Key type for the snapshot: (metric_name, sorted_label_pairs).
43type SnapshotKey = (String, Vec<(String, String)>);
44/// Value type for the snapshot: (value, metric_type, help).
45type SnapshotValue = (f64, &'static str, String);
46
47/// Constructs the logging dataflow fragment for Prometheus metrics.
48pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
49    scope: G,
50    config: &mz_compute_client::logging::LoggingConfig,
51    metrics_registry: MetricsRegistry,
52    now: Instant,
53    start_offset: Duration,
54    worker_config: Rc<ConfigSet>,
55    workers_per_process: usize,
56) -> Return {
57    let variant = LogVariant::Compute(ComputeLog::PrometheusMetrics);
58    let mut collections = BTreeMap::new();
59    let interval = config.interval;
60    let interval_ms = std::cmp::max(1, interval.as_millis());
61
62    if !config.index_logs.contains_key(&variant) {
63        return Return { collections };
64    }
65
66    let process_id = scope.index() / workers_per_process;
67    let enable = scope.index() % workers_per_process == 0;
68
69    // Build a source operator that periodically gathers Prometheus metrics
70    // and packs them directly into Row pairs.
71    let mut builder = OperatorBuilder::new("PrometheusMetrics".to_string(), scope.clone());
72    let (output, stream) = builder.new_output();
73    let mut output = OutputBuilder::<_, ColumnBuilder<_>>::from(output);
74
75    let operator_info = builder.operator_info();
76    builder.build(move |capabilities| {
77        // Metrics are per-process, so only one worker per process needs to
78        // scrape. Drop the capability for disabled workers so the frontier
79        // can advance without this operator holding it back.
80        let mut cap = enable.then_some(capabilities.into_element());
81        let activator = scope.activator_for(operator_info.address);
82
83        let mut prev_snapshot: BTreeMap<SnapshotKey, SnapshotValue> = BTreeMap::new();
84        let mut next_scrape = Instant::now();
85        let mut packer = PermutedRowPacker::new(ComputeLog::PrometheusMetrics);
86
87        move |_frontiers| {
88            let Some(cap) = &mut cap else { return };
89
90            // Advance the capability to the next logging interval boundary.
91            // This keeps the output frontier progressing at the logging
92            // rate, even when scrapes happen less frequently. Note that
93            // advancing the frontier implies the data is up-to-date, but
94            // the metrics snapshot may be stale by up to the scrape
95            // interval when it exceeds the logging interval.
96            let elapsed = now.elapsed().as_millis();
97            let time_ms: u128 =
98                ((elapsed + start_offset.as_millis()) / interval_ms + 1) * interval_ms;
99            let ts: Timestamp = time_ms.try_into().expect("must fit");
100            cap.downgrade(&ts);
101
102            // Schedule the next activation at the interval boundary
103            // to avoid drift from wall-clock elapsed time.
104            let next_boundary_ms = time_ms - start_offset.as_millis();
105            let next_activation =
106                now + Duration::from_millis(next_boundary_ms.try_into().expect("must fit"));
107            activator.activate_after(next_activation.saturating_duration_since(Instant::now()));
108
109            // Only scrape when the scrape interval has elapsed.
110            // The operator wakes every logging interval to advance the
111            // capability, but scrapes less frequently if configured.
112            if Instant::now() < next_scrape {
113                return;
114            }
115
116            let prom_interval =
117                COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL.get(&worker_config);
118            let effective_interval = prom_interval.max(interval);
119            next_scrape = Instant::now() + effective_interval;
120
121            // Gather current metrics and build new snapshot, or an empty
122            // snapshot when disabled (which retracts any existing data).
123            let new_snapshot = if !prom_interval.is_zero() {
124                let metric_families = metrics_registry.gather();
125                flatten_metrics(metric_families)
126            } else {
127                BTreeMap::new()
128            };
129
130            // Diff against previous snapshot and emit packed Row pairs.
131            let mut output = output.activate();
132            let mut session = output.session_with_builder(&cap);
133
134            // Retract entries that were removed or changed.
135            for (key, old_val) in &prev_snapshot {
136                match new_snapshot.get(key) {
137                    Some(new_val) if new_val == old_val => {}
138                    _ => {
139                        let (row_key, row_val) = pack_row(
140                            &mut packer,
141                            &key.0,
142                            old_val.1,
143                            &key.1,
144                            old_val.0,
145                            &old_val.2,
146                            process_id,
147                        );
148                        session.give(((row_key, row_val), ts, Diff::MINUS_ONE));
149                    }
150                }
151            }
152
153            // Insert entries that are new or changed.
154            for (key, new_val) in &new_snapshot {
155                match prev_snapshot.get(key) {
156                    Some(old_val) if old_val == new_val => {}
157                    _ => {
158                        let (row_key, row_val) = pack_row(
159                            &mut packer,
160                            &key.0,
161                            new_val.1,
162                            &key.1,
163                            new_val.0,
164                            &new_val.2,
165                            process_id,
166                        );
167                        session.give(((row_key, row_val), ts, Diff::ONE));
168                    }
169                }
170            }
171
172            prev_snapshot = new_snapshot;
173        }
174    });
175
176    // Arrange into a trace.
177    let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
178        columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>,
179    );
180    let trace = stream
181        .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
182            exchange,
183            "Arrange PrometheusMetrics",
184        )
185        .trace;
186    let token: Rc<dyn std::any::Any> = Rc::new(());
187    let collection = LogCollection { trace, token };
188    collections.insert(variant, collection);
189
190    Return { collections }
191}
192
193/// Flatten metric families into a snapshot map.
194fn flatten_metrics(
195    families: Vec<prometheus::proto::MetricFamily>,
196) -> BTreeMap<SnapshotKey, SnapshotValue> {
197    let mut snapshot = BTreeMap::new();
198
199    for family in families {
200        let base_name = family.name();
201        let help = family.help();
202        let metric_type = family.get_field_type();
203        let type_str = match metric_type {
204            MetricType::COUNTER => "counter",
205            MetricType::GAUGE => "gauge",
206            MetricType::HISTOGRAM => "histogram",
207            MetricType::SUMMARY => "summary",
208            MetricType::UNTYPED => "untyped",
209        };
210
211        for metric in family.get_metric() {
212            let base_labels: Vec<(String, String)> = metric
213                .get_label()
214                .iter()
215                .map(|l| (l.name().to_string(), l.value().to_string()))
216                .collect();
217
218            match metric_type {
219                MetricType::COUNTER => {
220                    let value = metric.get_counter().get_value();
221                    insert_row(
222                        &mut snapshot,
223                        base_name.to_string(),
224                        base_labels,
225                        value,
226                        type_str,
227                        help,
228                    );
229                }
230                MetricType::GAUGE => {
231                    let value = metric.get_gauge().get_value();
232                    insert_row(
233                        &mut snapshot,
234                        base_name.to_string(),
235                        base_labels,
236                        value,
237                        type_str,
238                        help,
239                    );
240                }
241                MetricType::HISTOGRAM => {
242                    let histogram = metric.get_histogram();
243
244                    // One row per bucket with `le` label.
245                    for bucket in histogram.get_bucket() {
246                        let mut labels = base_labels.clone();
247                        labels.push(("le".to_string(), format_f64(bucket.upper_bound())));
248                        insert_row(
249                            &mut snapshot,
250                            format!("{base_name}_bucket"),
251                            labels,
252                            f64::cast_lossy(bucket.cumulative_count()),
253                            type_str,
254                            help,
255                        );
256                    }
257
258                    // _sum row
259                    insert_row(
260                        &mut snapshot,
261                        format!("{base_name}_sum"),
262                        base_labels.clone(),
263                        histogram.get_sample_sum(),
264                        type_str,
265                        help,
266                    );
267
268                    // _count row
269                    insert_row(
270                        &mut snapshot,
271                        format!("{base_name}_count"),
272                        base_labels,
273                        f64::cast_lossy(histogram.get_sample_count()),
274                        type_str,
275                        help,
276                    );
277                }
278                MetricType::SUMMARY => {
279                    let summary = metric.get_summary();
280
281                    // One row per quantile.
282                    for quantile in summary.get_quantile() {
283                        let mut labels = base_labels.clone();
284                        labels.push(("quantile".to_string(), format_f64(quantile.quantile())));
285                        insert_row(
286                            &mut snapshot,
287                            base_name.to_string(),
288                            labels,
289                            quantile.value(),
290                            type_str,
291                            help,
292                        );
293                    }
294
295                    // _sum row
296                    insert_row(
297                        &mut snapshot,
298                        format!("{base_name}_sum"),
299                        base_labels.clone(),
300                        summary.sample_sum(),
301                        type_str,
302                        help,
303                    );
304
305                    // _count row
306                    insert_row(
307                        &mut snapshot,
308                        format!("{base_name}_count"),
309                        base_labels,
310                        f64::cast_lossy(summary.sample_count()),
311                        type_str,
312                        help,
313                    );
314                }
315                MetricType::UNTYPED => {
316                    soft_panic_or_log!("unexpected untyped metric: {base_name}");
317                }
318            }
319        }
320    }
321
322    snapshot
323}
324
325/// Format an f64 for use as a label value, matching Prometheus conventions.
326fn format_f64(v: f64) -> String {
327    if v == f64::INFINITY {
328        "+Inf".to_string()
329    } else if v == f64::NEG_INFINITY {
330        "-Inf".to_string()
331    } else {
332        v.to_string()
333    }
334}
335
336/// Insert a single metric row into the snapshot.
337fn insert_row(
338    snapshot: &mut BTreeMap<SnapshotKey, SnapshotValue>,
339    name: String,
340    mut labels: Vec<(String, String)>,
341    value: f64,
342    metric_type: &'static str,
343    help: &str,
344) {
345    labels.sort();
346
347    snapshot.insert((name, labels), (value, metric_type, help.to_string()));
348}
349
350/// Pack a metric row into key/value row pairs.
351fn pack_row<'a>(
352    packer: &'a mut PermutedRowPacker,
353    metric_name: &str,
354    metric_type: &str,
355    labels: &[(String, String)],
356    value: f64,
357    help: &str,
358    process_id: usize,
359) -> (&'a mz_repr::RowRef, &'a mz_repr::RowRef) {
360    packer.pack_by_index(|row_packer, index| match index {
361        // process_id
362        0 => row_packer.push(Datum::UInt64(u64::cast_from(process_id))),
363        // metric_name
364        1 => row_packer.push(Datum::String(metric_name)),
365        // metric_type
366        2 => row_packer.push(Datum::String(metric_type)),
367        // labels (Map)
368        3 => {
369            row_packer.push_dict(labels.iter().map(|(k, v)| (k.as_str(), Datum::String(v))));
370        }
371        // value
372        4 => row_packer.push(Datum::Float64(value.into())),
373        // help
374        5 => row_packer.push(Datum::String(help)),
375        _ => unreachable!("unexpected column index {index}"),
376    })
377}