Skip to main content

mz_prometheus_protobuf/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Decode helper for the Prometheus protobuf scrape format.
11//!
12//! We encapsulate this in its own crate since we've banned direct use of the `protobuf`
13//! crate due to it being unmaintained. However, we use it out of convenience since
14//! `prometheus::proto::MetricFamily` is already generated by `protobuf`.
15use protobuf::{CodedInputStream, Message};
16
17pub use prometheus::proto;
18
19/// Decodes the wire format produced by [`prometheus::ProtobufEncoder`]
20/// (and what callers receive from a Prometheus `/metrics` endpoint with
21/// `Accept: application/vnd.google.protobuf`).
22pub fn decode_length_delimited(bytes: &[u8]) -> Result<Vec<proto::MetricFamily>, String> {
23    let mut families = Vec::new();
24    let mut input = CodedInputStream::from_bytes(bytes);
25
26    loop {
27        match input.eof() {
28            Ok(true) => break,
29            Ok(false) => {}
30            Err(e) => return Err(format!("checking eof: {e}")),
31        }
32        // Read the length of the next message and push it as a limit to know
33        // what to read upto. Also keep track of the previous limit to restore later.
34        let len = input
35            .read_raw_varint32()
36            .map_err(|e| format!("reading length prefix: {e}"))?;
37        let previous_limit = input
38            .push_limit(len.into())
39            .map_err(|e| format!("pushing limit: {e}"))?;
40
41        let mut family = proto::MetricFamily::new();
42        family
43            .merge_from(&mut input)
44            .map_err(|e| format!("decoding MetricFamily: {e}"))?;
45        families.push(family);
46
47        // Restore the previous limit.
48        input.pop_limit(previous_limit);
49    }
50
51    Ok(families)
52}
53
54#[cfg(test)]
55mod tests {
56    use prometheus::Encoder;
57
58    use super::*;
59
60    #[mz_ore::test]
61    fn roundtrip_through_protobuf_encoder() {
62        let registry = prometheus::Registry::new();
63        let counter =
64            prometheus::IntCounter::new("test_counter", "a counter").expect("create counter");
65        counter.inc_by(7);
66        registry
67            .register(Box::new(counter))
68            .expect("register counter");
69
70        let original = registry.gather();
71        let mut encoded = Vec::new();
72        prometheus::ProtobufEncoder::new()
73            .encode(&original, &mut encoded)
74            .expect("encode");
75
76        let decoded = decode_length_delimited(&encoded).expect("decode");
77
78        let text = |fams: &[proto::MetricFamily]| {
79            let mut buf = Vec::new();
80            prometheus::TextEncoder::new()
81                .encode(fams, &mut buf)
82                .expect("text encode");
83            String::from_utf8(buf).expect("utf-8")
84        };
85        assert_eq!(text(&original), text(&decoded));
86    }
87}