tracing_opentelemetry/
metrics.rs

1use std::{collections::HashMap, fmt, sync::RwLock};
2use tracing::{field::Visit, Subscriber};
3use tracing_core::{Field, Interest, Metadata};
4
5#[cfg(feature = "metrics_gauge_unstable")]
6use opentelemetry::metrics::Gauge;
7use opentelemetry::{
8    metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter},
9    KeyValue, Value,
10};
11use tracing_subscriber::{
12    filter::Filtered,
13    layer::{Context, Filter},
14    registry::LookupSpan,
15    Layer,
16};
17
18use smallvec::SmallVec;
19
20const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
21const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry";
22
23const METRIC_PREFIX_MONOTONIC_COUNTER: &str = "monotonic_counter.";
24const METRIC_PREFIX_COUNTER: &str = "counter.";
25const METRIC_PREFIX_HISTOGRAM: &str = "histogram.";
26#[cfg(feature = "metrics_gauge_unstable")]
27const METRIC_PREFIX_GAUGE: &str = "gauge.";
28
29const I64_MAX: u64 = i64::MAX as u64;
30
31#[derive(Default)]
32pub(crate) struct Instruments {
33    u64_counter: MetricsMap<Counter<u64>>,
34    f64_counter: MetricsMap<Counter<f64>>,
35    i64_up_down_counter: MetricsMap<UpDownCounter<i64>>,
36    f64_up_down_counter: MetricsMap<UpDownCounter<f64>>,
37    u64_histogram: MetricsMap<Histogram<u64>>,
38    f64_histogram: MetricsMap<Histogram<f64>>,
39    #[cfg(feature = "metrics_gauge_unstable")]
40    u64_gauge: MetricsMap<Gauge<u64>>,
41    #[cfg(feature = "metrics_gauge_unstable")]
42    i64_gauge: MetricsMap<Gauge<i64>>,
43    #[cfg(feature = "metrics_gauge_unstable")]
44    f64_gauge: MetricsMap<Gauge<f64>>,
45}
46
47type MetricsMap<T> = RwLock<HashMap<&'static str, T>>;
48
49#[derive(Copy, Clone, Debug)]
50pub(crate) enum InstrumentType {
51    CounterU64(u64),
52    CounterF64(f64),
53    UpDownCounterI64(i64),
54    UpDownCounterF64(f64),
55    HistogramU64(u64),
56    HistogramF64(f64),
57    #[cfg(feature = "metrics_gauge_unstable")]
58    GaugeU64(u64),
59    #[cfg(feature = "metrics_gauge_unstable")]
60    GaugeI64(i64),
61    #[cfg(feature = "metrics_gauge_unstable")]
62    GaugeF64(f64),
63}
64
65impl Instruments {
66    pub(crate) fn update_metric(
67        &self,
68        meter: &Meter,
69        instrument_type: InstrumentType,
70        metric_name: &'static str,
71        attributes: &[KeyValue],
72    ) {
73        fn update_or_insert<T>(
74            map: &MetricsMap<T>,
75            name: &'static str,
76            insert: impl FnOnce() -> T,
77            update: impl FnOnce(&T),
78        ) {
79            {
80                let lock = map.read().unwrap();
81                if let Some(metric) = lock.get(name) {
82                    update(metric);
83                    return;
84                }
85            }
86
87            // that metric did not already exist, so we have to acquire a write lock to
88            // create it.
89            let mut lock = map.write().unwrap();
90            // handle the case where the entry was created while we were waiting to
91            // acquire the write lock
92            let metric = lock.entry(name).or_insert_with(insert);
93            update(metric)
94        }
95
96        match instrument_type {
97            InstrumentType::CounterU64(value) => {
98                update_or_insert(
99                    &self.u64_counter,
100                    metric_name,
101                    || meter.u64_counter(metric_name).init(),
102                    |ctr| ctr.add(value, attributes),
103                );
104            }
105            InstrumentType::CounterF64(value) => {
106                update_or_insert(
107                    &self.f64_counter,
108                    metric_name,
109                    || meter.f64_counter(metric_name).init(),
110                    |ctr| ctr.add(value, attributes),
111                );
112            }
113            InstrumentType::UpDownCounterI64(value) => {
114                update_or_insert(
115                    &self.i64_up_down_counter,
116                    metric_name,
117                    || meter.i64_up_down_counter(metric_name).init(),
118                    |ctr| ctr.add(value, attributes),
119                );
120            }
121            InstrumentType::UpDownCounterF64(value) => {
122                update_or_insert(
123                    &self.f64_up_down_counter,
124                    metric_name,
125                    || meter.f64_up_down_counter(metric_name).init(),
126                    |ctr| ctr.add(value, attributes),
127                );
128            }
129            InstrumentType::HistogramU64(value) => {
130                update_or_insert(
131                    &self.u64_histogram,
132                    metric_name,
133                    || meter.u64_histogram(metric_name).init(),
134                    |rec| rec.record(value, attributes),
135                );
136            }
137            InstrumentType::HistogramF64(value) => {
138                update_or_insert(
139                    &self.f64_histogram,
140                    metric_name,
141                    || meter.f64_histogram(metric_name).init(),
142                    |rec| rec.record(value, attributes),
143                );
144            }
145            #[cfg(feature = "metrics_gauge_unstable")]
146            InstrumentType::GaugeU64(value) => {
147                update_or_insert(
148                    &self.u64_gauge,
149                    metric_name,
150                    || meter.u64_gauge(metric_name).init(),
151                    |rec| rec.record(value, attributes),
152                );
153            }
154            #[cfg(feature = "metrics_gauge_unstable")]
155            InstrumentType::GaugeI64(value) => {
156                update_or_insert(
157                    &self.i64_gauge,
158                    metric_name,
159                    || meter.i64_gauge(metric_name).init(),
160                    |rec| rec.record(value, attributes),
161                );
162            }
163            #[cfg(feature = "metrics_gauge_unstable")]
164            InstrumentType::GaugeF64(value) => {
165                update_or_insert(
166                    &self.f64_gauge,
167                    metric_name,
168                    || meter.f64_gauge(metric_name).init(),
169                    |rec| rec.record(value, attributes),
170                );
171            }
172        };
173    }
174}
175
176pub(crate) struct MetricVisitor<'a> {
177    attributes: &'a mut SmallVec<[KeyValue; 8]>,
178    visited_metrics: &'a mut SmallVec<[(&'static str, InstrumentType); 2]>,
179}
180
181impl<'a> Visit for MetricVisitor<'a> {
182    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
183        self.attributes
184            .push(KeyValue::new(field.name(), format!("{value:?}")));
185    }
186
187    fn record_u64(&mut self, field: &Field, value: u64) {
188        #[cfg(feature = "metrics_gauge_unstable")]
189        if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
190            self.visited_metrics
191                .push((metric_name, InstrumentType::GaugeU64(value)));
192            return;
193        }
194        if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
195            self.visited_metrics
196                .push((metric_name, InstrumentType::CounterU64(value)));
197        } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
198            if value <= I64_MAX {
199                self.visited_metrics
200                    .push((metric_name, InstrumentType::UpDownCounterI64(value as i64)));
201            } else {
202                eprintln!(
203                    "[tracing-opentelemetry]: Received Counter metric, but \
204                    provided u64: {} is greater than i64::MAX. Ignoring \
205                    this metric.",
206                    value
207                );
208            }
209        } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) {
210            self.visited_metrics
211                .push((metric_name, InstrumentType::HistogramU64(value)));
212        } else if value <= I64_MAX {
213            self.attributes
214                .push(KeyValue::new(field.name(), Value::I64(value as i64)));
215        }
216    }
217
218    fn record_f64(&mut self, field: &Field, value: f64) {
219        #[cfg(feature = "metrics_gauge_unstable")]
220        if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
221            self.visited_metrics
222                .push((metric_name, InstrumentType::GaugeF64(value)));
223            return;
224        }
225        if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
226            self.visited_metrics
227                .push((metric_name, InstrumentType::CounterF64(value)));
228        } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
229            self.visited_metrics
230                .push((metric_name, InstrumentType::UpDownCounterF64(value)));
231        } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) {
232            self.visited_metrics
233                .push((metric_name, InstrumentType::HistogramF64(value)));
234        } else {
235            self.attributes
236                .push(KeyValue::new(field.name(), Value::F64(value)));
237        }
238    }
239
240    fn record_i64(&mut self, field: &Field, value: i64) {
241        #[cfg(feature = "metrics_gauge_unstable")]
242        if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
243            self.visited_metrics
244                .push((metric_name, InstrumentType::GaugeI64(value)));
245            return;
246        }
247        if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
248            self.visited_metrics
249                .push((metric_name, InstrumentType::CounterU64(value as u64)));
250        } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
251            self.visited_metrics
252                .push((metric_name, InstrumentType::UpDownCounterI64(value)));
253        } else {
254            self.attributes.push(KeyValue::new(field.name(), value));
255        }
256    }
257
258    fn record_str(&mut self, field: &Field, value: &str) {
259        self.attributes
260            .push(KeyValue::new(field.name(), value.to_owned()));
261    }
262
263    fn record_bool(&mut self, field: &Field, value: bool) {
264        self.attributes.push(KeyValue::new(field.name(), value));
265    }
266}
267
268/// A layer that publishes metrics via the OpenTelemetry SDK.
269///
270/// # Usage
271///
272/// No configuration is needed for this Layer, as it's only responsible for
273/// pushing data out to the `opentelemetry` family of crates. For example, when
274/// using `opentelemetry-otlp`, that crate will provide its own set of
275/// configuration options for setting up the duration metrics will be collected
276/// before exporting to the OpenTelemetry Collector, aggregation of data points,
277/// etc.
278///
279/// ```no_run
280/// use tracing_opentelemetry::MetricsLayer;
281/// use tracing_subscriber::layer::SubscriberExt;
282/// use tracing_subscriber::Registry;
283/// # use opentelemetry_sdk::metrics::SdkMeterProvider;
284///
285/// // Constructing a MeterProvider is out-of-scope for the docs here, but there
286/// // are examples in the opentelemetry repository. See:
287/// // https://github.com/open-telemetry/opentelemetry-rust/blob/dfeac078ff7853e7dc814778524b93470dfa5c9c/examples/metrics-basic/src/main.rs#L7
288/// # let meter_provider: SdkMeterProvider = unimplemented!();
289///
290/// let opentelemetry_metrics =  MetricsLayer::new(meter_provider);
291/// let subscriber = Registry::default().with(opentelemetry_metrics);
292/// tracing::subscriber::set_global_default(subscriber).unwrap();
293/// ```
294///
295/// To publish a new metric, add a key-value pair to your `tracing::Event` that
296/// contains following prefixes:
297/// - `monotonic_counter.` (non-negative numbers): Used when the counter should
298///   only ever increase
299/// - `counter.`: Used when the counter can go up or down
300/// - `histogram.`: Used to report arbitrary values that are likely to be statistically meaningful
301///
302/// Examples:
303/// ```
304/// # use tracing::info;
305/// info!(monotonic_counter.foo = 1);
306/// info!(monotonic_counter.bar = 1.1);
307///
308/// info!(counter.baz = 1);
309/// info!(counter.baz = -1);
310/// info!(counter.xyz = 1.1);
311///
312/// info!(histogram.qux = 1);
313/// info!(histogram.abc = -1);
314/// info!(histogram.def = 1.1);
315/// ```
316///
317/// # Mixing data types
318///
319/// ## Floating-point numbers
320///
321/// Do not mix floating point and non-floating point numbers for the same
322/// metric. If a floating point number will be used for a given metric, be sure
323/// to cast any other usages of that metric to a floating point number.
324///
325/// Do this:
326/// ```
327/// # use tracing::info;
328/// info!(monotonic_counter.foo = 1_f64);
329/// info!(monotonic_counter.foo = 1.1);
330/// ```
331///
332/// This is because all data published for a given metric name must be the same
333/// numeric type.
334///
335/// ## Integers
336///
337/// Positive and negative integers can be mixed freely. The instrumentation
338/// provided by `tracing` assumes that all integers are `i64` unless explicitly
339/// cast to something else. In the case that an integer *is* cast to `u64`, this
340/// subscriber will handle the conversion internally.
341///
342/// For example:
343/// ```
344/// # use tracing::info;
345/// // The subscriber receives an i64
346/// info!(counter.baz = 1);
347///
348/// // The subscriber receives an i64
349/// info!(counter.baz = -1);
350///
351/// // The subscriber receives a u64, but casts it to i64 internally
352/// info!(counter.baz = 1_u64);
353///
354/// // The subscriber receives a u64, but cannot cast it to i64 because of
355/// // overflow. An error is printed to stderr, and the metric is dropped.
356/// info!(counter.baz = (i64::MAX as u64) + 1)
357/// ```
358///
359/// # Attributes
360///
361/// When `MetricsLayer` outputs metrics, it converts key-value pairs into [Attributes] and associates them with metrics.
362///
363/// [Attributes]: https://opentelemetry.io/docs/specs/otel/common/#attribute
364///
365/// For example:
366/// ```
367/// # use tracing::info;
368/// // adds attributes bar="baz" and qux=2 to the `foo` counter.
369/// info!(monotonic_counter.foo = 1, bar = "baz", qux = 2);
370/// ```
371///
372/// # Implementation Details
373///
374/// `MetricsLayer` holds a set of maps, with each map corresponding to a
375/// type of metric supported by OpenTelemetry. These maps are populated lazily.
376/// The first time that a metric is emitted by the instrumentation, a `Metric`
377/// instance will be created and added to the corresponding map. This means that
378/// any time a metric is emitted by the instrumentation, one map lookup has to
379/// be performed.
380///
381/// In the future, this can be improved by associating each `Metric` instance to
382/// its callsite, eliminating the need for any maps.
383///
384#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
385pub struct MetricsLayer<S> {
386    inner: Filtered<InstrumentLayer, MetricsFilter, S>,
387}
388
389impl<S> MetricsLayer<S>
390where
391    S: Subscriber + for<'span> LookupSpan<'span>,
392{
393    /// Create a new instance of MetricsLayer.
394    pub fn new<M>(meter_provider: M) -> MetricsLayer<S>
395    where
396        M: MeterProvider,
397    {
398        let meter = meter_provider.versioned_meter(
399            INSTRUMENTATION_LIBRARY_NAME,
400            Some(CARGO_PKG_VERSION),
401            None::<&'static str>,
402            None,
403        );
404
405        let layer = InstrumentLayer {
406            meter,
407            instruments: Default::default(),
408        };
409
410        MetricsLayer {
411            inner: layer.with_filter(MetricsFilter),
412        }
413    }
414}
415
416struct MetricsFilter;
417
418impl MetricsFilter {
419    fn is_metrics_event(&self, meta: &Metadata<'_>) -> bool {
420        meta.is_event()
421            && meta.fields().iter().any(|field| {
422                let name = field.name();
423
424                if name.starts_with(METRIC_PREFIX_COUNTER)
425                    || name.starts_with(METRIC_PREFIX_MONOTONIC_COUNTER)
426                    || name.starts_with(METRIC_PREFIX_HISTOGRAM)
427                {
428                    return true;
429                }
430
431                #[cfg(feature = "metrics_gauge_unstable")]
432                if name.starts_with(METRIC_PREFIX_GAUGE) {
433                    return true;
434                }
435
436                false
437            })
438    }
439}
440
441impl<S> Filter<S> for MetricsFilter {
442    fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool {
443        self.is_metrics_event(meta)
444    }
445
446    fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> Interest {
447        if self.is_metrics_event(meta) {
448            Interest::always()
449        } else {
450            Interest::never()
451        }
452    }
453}
454
455struct InstrumentLayer {
456    meter: Meter,
457    instruments: Instruments,
458}
459
460impl<S> Layer<S> for InstrumentLayer
461where
462    S: Subscriber + for<'span> LookupSpan<'span>,
463{
464    fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
465        let mut attributes = SmallVec::new();
466        let mut visited_metrics = SmallVec::new();
467        let mut metric_visitor = MetricVisitor {
468            attributes: &mut attributes,
469            visited_metrics: &mut visited_metrics,
470        };
471        event.record(&mut metric_visitor);
472
473        // associate attrivutes with visited metrics
474        visited_metrics
475            .into_iter()
476            .for_each(|(metric_name, value)| {
477                self.instruments.update_metric(
478                    &self.meter,
479                    value,
480                    metric_name,
481                    attributes.as_slice(),
482                );
483            })
484    }
485}
486
487impl<S> Layer<S> for MetricsLayer<S>
488where
489    S: Subscriber + for<'span> LookupSpan<'span>,
490{
491    fn on_layer(&mut self, subscriber: &mut S) {
492        self.inner.on_layer(subscriber)
493    }
494
495    fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
496        self.inner.register_callsite(metadata)
497    }
498
499    fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
500        self.inner.enabled(metadata, ctx)
501    }
502
503    fn on_new_span(
504        &self,
505        attrs: &tracing_core::span::Attributes<'_>,
506        id: &tracing_core::span::Id,
507        ctx: Context<'_, S>,
508    ) {
509        self.inner.on_new_span(attrs, id, ctx)
510    }
511
512    fn max_level_hint(&self) -> Option<tracing_core::LevelFilter> {
513        self.inner.max_level_hint()
514    }
515
516    fn on_record(
517        &self,
518        span: &tracing_core::span::Id,
519        values: &tracing_core::span::Record<'_>,
520        ctx: Context<'_, S>,
521    ) {
522        self.inner.on_record(span, values, ctx)
523    }
524
525    fn on_follows_from(
526        &self,
527        span: &tracing_core::span::Id,
528        follows: &tracing_core::span::Id,
529        ctx: Context<'_, S>,
530    ) {
531        self.inner.on_follows_from(span, follows, ctx)
532    }
533
534    fn on_event(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) {
535        self.inner.on_event(event, ctx)
536    }
537
538    fn on_enter(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) {
539        self.inner.on_enter(id, ctx)
540    }
541
542    fn on_exit(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) {
543        self.inner.on_exit(id, ctx)
544    }
545
546    fn on_close(&self, id: tracing_core::span::Id, ctx: Context<'_, S>) {
547        self.inner.on_close(id, ctx)
548    }
549
550    fn on_id_change(
551        &self,
552        old: &tracing_core::span::Id,
553        new: &tracing_core::span::Id,
554        ctx: Context<'_, S>,
555    ) {
556        self.inner.on_id_change(old, new, ctx)
557    }
558}
559
560#[cfg(test)]
561mod tests {
562    use super::*;
563    use tracing_subscriber::layer::SubscriberExt;
564
565    struct PanicLayer;
566    impl<S> Layer<S> for PanicLayer
567    where
568        S: Subscriber + for<'span> LookupSpan<'span>,
569    {
570        fn on_event(&self, _event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
571            panic!("panic");
572        }
573    }
574
575    #[test]
576    fn filter_layer_should_filter_non_metrics_event() {
577        let layer = PanicLayer.with_filter(MetricsFilter);
578        let subscriber = tracing_subscriber::registry().with(layer);
579
580        tracing::subscriber::with_default(subscriber, || {
581            tracing::info!(key = "val", "foo");
582        });
583    }
584}