Skip to main content

mz_compute/logging/
prometheus.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflow for Prometheus metrics gathered from the metrics registry.
11
12use std::collections::BTreeMap;
13use std::rc::Rc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::dyncfgs::COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL;
17use mz_dyncfg::ConfigSet;
18use mz_ore::cast::{CastFrom, CastLossy};
19use mz_ore::collections::CollectionExt;
20use mz_ore::metrics::MetricsRegistry;
21use mz_ore::soft_panic_or_log;
22use mz_repr::{Datum, Diff, Timestamp};
23use mz_timely_util::columnar::batcher;
24use mz_timely_util::columnar::builder::ColumnBuilder;
25use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
26use prometheus::proto::MetricType;
27use timely::dataflow::Scope;
28use timely::dataflow::channels::pact::ExchangeCore;
29use timely::dataflow::operators::generic::OutputBuilder;
30use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
31
32use crate::extensions::arrange::MzArrangeCore;
33use crate::logging::{ComputeLog, LogCollection, LogVariant, PermutedRowPacker};
34use crate::typedefs::RowRowSpine;
35use mz_row_spine::RowRowBuilder;
36
37/// The return type of [`construct`].
38pub(super) struct Return {
39    /// Collections to export.
40    pub collections: BTreeMap<LogVariant, LogCollection>,
41}
42
43/// Key type for the snapshot: (metric_name, sorted_label_pairs).
44type SnapshotKey = (String, Vec<(String, String)>);
45/// Value type for the snapshot: (value, metric_type, help).
46type SnapshotValue = (f64, &'static str, String);
47
48/// Constructs the logging dataflow fragment for Prometheus metrics.
49pub(super) fn construct(
50    scope: Scope<'_, Timestamp>,
51    config: &mz_compute_client::logging::LoggingConfig,
52    metrics_registry: MetricsRegistry,
53    now: Instant,
54    start_offset: Duration,
55    worker_config: Rc<ConfigSet>,
56    workers_per_process: usize,
57) -> Return {
58    let variant = LogVariant::Compute(ComputeLog::PrometheusMetrics);
59    let mut collections = BTreeMap::new();
60    let interval = config.interval;
61    let interval_ms = std::cmp::max(1, interval.as_millis());
62
63    if !config.index_logs.contains_key(&variant) {
64        return Return { collections };
65    }
66
67    let process_id = scope.index() / workers_per_process;
68    let enable = scope.index() % workers_per_process == 0;
69
70    // Build a source operator that periodically gathers Prometheus metrics
71    // and packs them directly into Row pairs.
72    let mut builder = OperatorBuilder::new("PrometheusMetrics".to_string(), scope.clone());
73    let (output, stream) = builder.new_output();
74    let mut output = OutputBuilder::<_, ColumnBuilder<_>>::from(output);
75
76    let operator_info = builder.operator_info();
77    builder.build(move |capabilities| {
78        // Metrics are per-process, so only one worker per process needs to
79        // scrape. Drop the capability for disabled workers so the frontier
80        // can advance without this operator holding it back.
81        let mut cap = enable.then_some(capabilities.into_element());
82        let activator = scope.activator_for(operator_info.address);
83
84        let mut prev_snapshot: BTreeMap<SnapshotKey, SnapshotValue> = BTreeMap::new();
85        let mut next_scrape = Instant::now();
86        let mut packer = PermutedRowPacker::new(ComputeLog::PrometheusMetrics);
87
88        move |_frontiers| {
89            let Some(cap) = &mut cap else { return };
90
91            // Advance the capability to the next logging interval boundary.
92            // This keeps the output frontier progressing at the logging
93            // rate, even when scrapes happen less frequently. Note that
94            // advancing the frontier implies the data is up-to-date, but
95            // the metrics snapshot may be stale by up to the scrape
96            // interval when it exceeds the logging interval.
97            let elapsed = now.elapsed().as_millis();
98            let time_ms: u128 =
99                ((elapsed + start_offset.as_millis()) / interval_ms + 1) * interval_ms;
100            let ts: Timestamp = time_ms.try_into().expect("must fit");
101            cap.downgrade(&ts);
102
103            // Schedule the next activation at the interval boundary
104            // to avoid drift from wall-clock elapsed time.
105            let next_boundary_ms = time_ms - start_offset.as_millis();
106            let next_activation =
107                now + Duration::from_millis(next_boundary_ms.try_into().expect("must fit"));
108            activator.activate_after(next_activation.saturating_duration_since(Instant::now()));
109
110            // Only scrape when the scrape interval has elapsed.
111            // The operator wakes every logging interval to advance the
112            // capability, but scrapes less frequently if configured.
113            if Instant::now() < next_scrape {
114                return;
115            }
116
117            let prom_interval =
118                COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL.get(&worker_config);
119            let effective_interval = prom_interval.max(interval);
120            next_scrape = Instant::now() + effective_interval;
121
122            // Gather current metrics and build new snapshot, or an empty
123            // snapshot when disabled (which retracts any existing data).
124            let new_snapshot = if !prom_interval.is_zero() {
125                let metric_families = metrics_registry.gather();
126                flatten_metrics(metric_families)
127            } else {
128                BTreeMap::new()
129            };
130
131            // Diff against previous snapshot and emit packed Row pairs.
132            let mut output = output.activate();
133            let mut session = output.session_with_builder(&cap);
134
135            // Retract entries that were removed or changed.
136            for (key, old_val) in &prev_snapshot {
137                match new_snapshot.get(key) {
138                    Some(new_val) if new_val == old_val => {}
139                    _ => {
140                        let (row_key, row_val) = pack_row(
141                            &mut packer,
142                            &key.0,
143                            old_val.1,
144                            &key.1,
145                            old_val.0,
146                            &old_val.2,
147                            process_id,
148                        );
149                        session.give(((row_key, row_val), ts, Diff::MINUS_ONE));
150                    }
151                }
152            }
153
154            // Insert entries that are new or changed.
155            for (key, new_val) in &new_snapshot {
156                match prev_snapshot.get(key) {
157                    Some(old_val) if old_val == new_val => {}
158                    _ => {
159                        let (row_key, row_val) = pack_row(
160                            &mut packer,
161                            &key.0,
162                            new_val.1,
163                            &key.1,
164                            new_val.0,
165                            &new_val.2,
166                            process_id,
167                        );
168                        session.give(((row_key, row_val), ts, Diff::ONE));
169                    }
170                }
171            }
172
173            prev_snapshot = new_snapshot;
174        }
175    });
176
177    // Arrange into a trace.
178    let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
179        columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>,
180    );
181    let trace = stream
182        .mz_arrange_core::<
183            _,
184            batcher::Chunker<_>,
185            Col2ValBatcher<_, _, _, _>,
186            RowRowBuilder<_, _>,
187            RowRowSpine<_, _>,
188        >(exchange, "Arrange PrometheusMetrics")
189        .trace;
190    let token: Rc<dyn std::any::Any> = Rc::new(());
191    let collection = LogCollection { trace, token };
192    collections.insert(variant, collection);
193
194    Return { collections }
195}
196
197/// Flatten metric families into a snapshot map.
198fn flatten_metrics(
199    families: Vec<prometheus::proto::MetricFamily>,
200) -> BTreeMap<SnapshotKey, SnapshotValue> {
201    let mut snapshot = BTreeMap::new();
202
203    for family in families {
204        let base_name = family.name();
205        let help = family.help();
206        let metric_type = family.get_field_type();
207        let type_str = match metric_type {
208            MetricType::COUNTER => "counter",
209            MetricType::GAUGE => "gauge",
210            MetricType::HISTOGRAM => "histogram",
211            MetricType::SUMMARY => "summary",
212            MetricType::UNTYPED => "untyped",
213        };
214
215        for metric in family.get_metric() {
216            let base_labels: Vec<(String, String)> = metric
217                .get_label()
218                .iter()
219                .map(|l| (l.name().to_string(), l.value().to_string()))
220                .collect();
221
222            match metric_type {
223                MetricType::COUNTER => {
224                    let value = metric.get_counter().value();
225                    insert_row(
226                        &mut snapshot,
227                        base_name.to_string(),
228                        base_labels,
229                        value,
230                        type_str,
231                        help,
232                    );
233                }
234                MetricType::GAUGE => {
235                    let value = metric.get_gauge().value();
236                    insert_row(
237                        &mut snapshot,
238                        base_name.to_string(),
239                        base_labels,
240                        value,
241                        type_str,
242                        help,
243                    );
244                }
245                MetricType::HISTOGRAM => {
246                    let histogram = metric.get_histogram();
247
248                    // One row per bucket with `le` label.
249                    for bucket in histogram.get_bucket() {
250                        let mut labels = base_labels.clone();
251                        labels.push(("le".to_string(), format_f64(bucket.upper_bound())));
252                        insert_row(
253                            &mut snapshot,
254                            format!("{base_name}_bucket"),
255                            labels,
256                            f64::cast_lossy(bucket.cumulative_count()),
257                            type_str,
258                            help,
259                        );
260                    }
261
262                    // _sum row
263                    insert_row(
264                        &mut snapshot,
265                        format!("{base_name}_sum"),
266                        base_labels.clone(),
267                        histogram.get_sample_sum(),
268                        type_str,
269                        help,
270                    );
271
272                    // _count row
273                    insert_row(
274                        &mut snapshot,
275                        format!("{base_name}_count"),
276                        base_labels,
277                        f64::cast_lossy(histogram.get_sample_count()),
278                        type_str,
279                        help,
280                    );
281                }
282                MetricType::SUMMARY => {
283                    let summary = metric.get_summary();
284
285                    // One row per quantile.
286                    for quantile in summary.get_quantile() {
287                        let mut labels = base_labels.clone();
288                        labels.push(("quantile".to_string(), format_f64(quantile.quantile())));
289                        insert_row(
290                            &mut snapshot,
291                            base_name.to_string(),
292                            labels,
293                            quantile.value(),
294                            type_str,
295                            help,
296                        );
297                    }
298
299                    // _sum row
300                    insert_row(
301                        &mut snapshot,
302                        format!("{base_name}_sum"),
303                        base_labels.clone(),
304                        summary.sample_sum(),
305                        type_str,
306                        help,
307                    );
308
309                    // _count row
310                    insert_row(
311                        &mut snapshot,
312                        format!("{base_name}_count"),
313                        base_labels,
314                        f64::cast_lossy(summary.sample_count()),
315                        type_str,
316                        help,
317                    );
318                }
319                MetricType::UNTYPED => {
320                    soft_panic_or_log!("unexpected untyped metric: {base_name}");
321                }
322            }
323        }
324    }
325
326    snapshot
327}
328
329/// Format an f64 for use as a label value, matching Prometheus conventions.
330fn format_f64(v: f64) -> String {
331    if v == f64::INFINITY {
332        "+Inf".to_string()
333    } else if v == f64::NEG_INFINITY {
334        "-Inf".to_string()
335    } else {
336        v.to_string()
337    }
338}
339
340/// Insert a single metric row into the snapshot.
341fn insert_row(
342    snapshot: &mut BTreeMap<SnapshotKey, SnapshotValue>,
343    name: String,
344    mut labels: Vec<(String, String)>,
345    value: f64,
346    metric_type: &'static str,
347    help: &str,
348) {
349    labels.sort();
350
351    snapshot.insert((name, labels), (value, metric_type, help.to_string()));
352}
353
354/// Pack a metric row into key/value row pairs.
355fn pack_row<'a>(
356    packer: &'a mut PermutedRowPacker,
357    metric_name: &str,
358    metric_type: &str,
359    labels: &[(String, String)],
360    value: f64,
361    help: &str,
362    process_id: usize,
363) -> (&'a mz_repr::RowRef, &'a mz_repr::RowRef) {
364    packer.pack_by_index(|row_packer, index| match index {
365        // process_id
366        0 => row_packer.push(Datum::UInt64(u64::cast_from(process_id))),
367        // metric_name
368        1 => row_packer.push(Datum::String(metric_name)),
369        // metric_type
370        2 => row_packer.push(Datum::String(metric_type)),
371        // labels (Map)
372        3 => {
373            row_packer.push_dict(labels.iter().map(|(k, v)| (k.as_str(), Datum::String(v))));
374        }
375        // value
376        4 => row_packer.push(Datum::Float64(value.into())),
377        // help
378        5 => row_packer.push(Datum::String(help)),
379        _ => unreachable!("unexpected column index {index}"),
380    })
381}