1use std::{collections::HashMap, fmt, sync::RwLock};
2use tracing::{field::Visit, Subscriber};
3use tracing_core::{Field, Interest, Metadata};
4
5#[cfg(feature = "metrics_gauge_unstable")]
6use opentelemetry::metrics::Gauge;
7use opentelemetry::{
8 metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter},
9 KeyValue, Value,
10};
11use tracing_subscriber::{
12 filter::Filtered,
13 layer::{Context, Filter},
14 registry::LookupSpan,
15 Layer,
16};
17
18use smallvec::SmallVec;
19
20const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
21const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry";
22
23const METRIC_PREFIX_MONOTONIC_COUNTER: &str = "monotonic_counter.";
24const METRIC_PREFIX_COUNTER: &str = "counter.";
25const METRIC_PREFIX_HISTOGRAM: &str = "histogram.";
26#[cfg(feature = "metrics_gauge_unstable")]
27const METRIC_PREFIX_GAUGE: &str = "gauge.";
28
29const I64_MAX: u64 = i64::MAX as u64;
30
31#[derive(Default)]
32pub(crate) struct Instruments {
33 u64_counter: MetricsMap<Counter<u64>>,
34 f64_counter: MetricsMap<Counter<f64>>,
35 i64_up_down_counter: MetricsMap<UpDownCounter<i64>>,
36 f64_up_down_counter: MetricsMap<UpDownCounter<f64>>,
37 u64_histogram: MetricsMap<Histogram<u64>>,
38 f64_histogram: MetricsMap<Histogram<f64>>,
39 #[cfg(feature = "metrics_gauge_unstable")]
40 u64_gauge: MetricsMap<Gauge<u64>>,
41 #[cfg(feature = "metrics_gauge_unstable")]
42 i64_gauge: MetricsMap<Gauge<i64>>,
43 #[cfg(feature = "metrics_gauge_unstable")]
44 f64_gauge: MetricsMap<Gauge<f64>>,
45}
46
47type MetricsMap<T> = RwLock<HashMap<&'static str, T>>;
48
49#[derive(Copy, Clone, Debug)]
50pub(crate) enum InstrumentType {
51 CounterU64(u64),
52 CounterF64(f64),
53 UpDownCounterI64(i64),
54 UpDownCounterF64(f64),
55 HistogramU64(u64),
56 HistogramF64(f64),
57 #[cfg(feature = "metrics_gauge_unstable")]
58 GaugeU64(u64),
59 #[cfg(feature = "metrics_gauge_unstable")]
60 GaugeI64(i64),
61 #[cfg(feature = "metrics_gauge_unstable")]
62 GaugeF64(f64),
63}
64
65impl Instruments {
66 pub(crate) fn update_metric(
67 &self,
68 meter: &Meter,
69 instrument_type: InstrumentType,
70 metric_name: &'static str,
71 attributes: &[KeyValue],
72 ) {
73 fn update_or_insert<T>(
74 map: &MetricsMap<T>,
75 name: &'static str,
76 insert: impl FnOnce() -> T,
77 update: impl FnOnce(&T),
78 ) {
79 {
80 let lock = map.read().unwrap();
81 if let Some(metric) = lock.get(name) {
82 update(metric);
83 return;
84 }
85 }
86
87 let mut lock = map.write().unwrap();
90 let metric = lock.entry(name).or_insert_with(insert);
93 update(metric)
94 }
95
96 match instrument_type {
97 InstrumentType::CounterU64(value) => {
98 update_or_insert(
99 &self.u64_counter,
100 metric_name,
101 || meter.u64_counter(metric_name).init(),
102 |ctr| ctr.add(value, attributes),
103 );
104 }
105 InstrumentType::CounterF64(value) => {
106 update_or_insert(
107 &self.f64_counter,
108 metric_name,
109 || meter.f64_counter(metric_name).init(),
110 |ctr| ctr.add(value, attributes),
111 );
112 }
113 InstrumentType::UpDownCounterI64(value) => {
114 update_or_insert(
115 &self.i64_up_down_counter,
116 metric_name,
117 || meter.i64_up_down_counter(metric_name).init(),
118 |ctr| ctr.add(value, attributes),
119 );
120 }
121 InstrumentType::UpDownCounterF64(value) => {
122 update_or_insert(
123 &self.f64_up_down_counter,
124 metric_name,
125 || meter.f64_up_down_counter(metric_name).init(),
126 |ctr| ctr.add(value, attributes),
127 );
128 }
129 InstrumentType::HistogramU64(value) => {
130 update_or_insert(
131 &self.u64_histogram,
132 metric_name,
133 || meter.u64_histogram(metric_name).init(),
134 |rec| rec.record(value, attributes),
135 );
136 }
137 InstrumentType::HistogramF64(value) => {
138 update_or_insert(
139 &self.f64_histogram,
140 metric_name,
141 || meter.f64_histogram(metric_name).init(),
142 |rec| rec.record(value, attributes),
143 );
144 }
145 #[cfg(feature = "metrics_gauge_unstable")]
146 InstrumentType::GaugeU64(value) => {
147 update_or_insert(
148 &self.u64_gauge,
149 metric_name,
150 || meter.u64_gauge(metric_name).init(),
151 |rec| rec.record(value, attributes),
152 );
153 }
154 #[cfg(feature = "metrics_gauge_unstable")]
155 InstrumentType::GaugeI64(value) => {
156 update_or_insert(
157 &self.i64_gauge,
158 metric_name,
159 || meter.i64_gauge(metric_name).init(),
160 |rec| rec.record(value, attributes),
161 );
162 }
163 #[cfg(feature = "metrics_gauge_unstable")]
164 InstrumentType::GaugeF64(value) => {
165 update_or_insert(
166 &self.f64_gauge,
167 metric_name,
168 || meter.f64_gauge(metric_name).init(),
169 |rec| rec.record(value, attributes),
170 );
171 }
172 };
173 }
174}
175
176pub(crate) struct MetricVisitor<'a> {
177 attributes: &'a mut SmallVec<[KeyValue; 8]>,
178 visited_metrics: &'a mut SmallVec<[(&'static str, InstrumentType); 2]>,
179}
180
181impl<'a> Visit for MetricVisitor<'a> {
182 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
183 self.attributes
184 .push(KeyValue::new(field.name(), format!("{value:?}")));
185 }
186
187 fn record_u64(&mut self, field: &Field, value: u64) {
188 #[cfg(feature = "metrics_gauge_unstable")]
189 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
190 self.visited_metrics
191 .push((metric_name, InstrumentType::GaugeU64(value)));
192 return;
193 }
194 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
195 self.visited_metrics
196 .push((metric_name, InstrumentType::CounterU64(value)));
197 } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
198 if value <= I64_MAX {
199 self.visited_metrics
200 .push((metric_name, InstrumentType::UpDownCounterI64(value as i64)));
201 } else {
202 eprintln!(
203 "[tracing-opentelemetry]: Received Counter metric, but \
204 provided u64: {} is greater than i64::MAX. Ignoring \
205 this metric.",
206 value
207 );
208 }
209 } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) {
210 self.visited_metrics
211 .push((metric_name, InstrumentType::HistogramU64(value)));
212 } else if value <= I64_MAX {
213 self.attributes
214 .push(KeyValue::new(field.name(), Value::I64(value as i64)));
215 }
216 }
217
218 fn record_f64(&mut self, field: &Field, value: f64) {
219 #[cfg(feature = "metrics_gauge_unstable")]
220 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
221 self.visited_metrics
222 .push((metric_name, InstrumentType::GaugeF64(value)));
223 return;
224 }
225 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
226 self.visited_metrics
227 .push((metric_name, InstrumentType::CounterF64(value)));
228 } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
229 self.visited_metrics
230 .push((metric_name, InstrumentType::UpDownCounterF64(value)));
231 } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) {
232 self.visited_metrics
233 .push((metric_name, InstrumentType::HistogramF64(value)));
234 } else {
235 self.attributes
236 .push(KeyValue::new(field.name(), Value::F64(value)));
237 }
238 }
239
240 fn record_i64(&mut self, field: &Field, value: i64) {
241 #[cfg(feature = "metrics_gauge_unstable")]
242 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_GAUGE) {
243 self.visited_metrics
244 .push((metric_name, InstrumentType::GaugeI64(value)));
245 return;
246 }
247 if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) {
248 self.visited_metrics
249 .push((metric_name, InstrumentType::CounterU64(value as u64)));
250 } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) {
251 self.visited_metrics
252 .push((metric_name, InstrumentType::UpDownCounterI64(value)));
253 } else {
254 self.attributes.push(KeyValue::new(field.name(), value));
255 }
256 }
257
258 fn record_str(&mut self, field: &Field, value: &str) {
259 self.attributes
260 .push(KeyValue::new(field.name(), value.to_owned()));
261 }
262
263 fn record_bool(&mut self, field: &Field, value: bool) {
264 self.attributes.push(KeyValue::new(field.name(), value));
265 }
266}
267
268#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
385pub struct MetricsLayer<S> {
386 inner: Filtered<InstrumentLayer, MetricsFilter, S>,
387}
388
389impl<S> MetricsLayer<S>
390where
391 S: Subscriber + for<'span> LookupSpan<'span>,
392{
393 pub fn new<M>(meter_provider: M) -> MetricsLayer<S>
395 where
396 M: MeterProvider,
397 {
398 let meter = meter_provider.versioned_meter(
399 INSTRUMENTATION_LIBRARY_NAME,
400 Some(CARGO_PKG_VERSION),
401 None::<&'static str>,
402 None,
403 );
404
405 let layer = InstrumentLayer {
406 meter,
407 instruments: Default::default(),
408 };
409
410 MetricsLayer {
411 inner: layer.with_filter(MetricsFilter),
412 }
413 }
414}
415
416struct MetricsFilter;
417
418impl MetricsFilter {
419 fn is_metrics_event(&self, meta: &Metadata<'_>) -> bool {
420 meta.is_event()
421 && meta.fields().iter().any(|field| {
422 let name = field.name();
423
424 if name.starts_with(METRIC_PREFIX_COUNTER)
425 || name.starts_with(METRIC_PREFIX_MONOTONIC_COUNTER)
426 || name.starts_with(METRIC_PREFIX_HISTOGRAM)
427 {
428 return true;
429 }
430
431 #[cfg(feature = "metrics_gauge_unstable")]
432 if name.starts_with(METRIC_PREFIX_GAUGE) {
433 return true;
434 }
435
436 false
437 })
438 }
439}
440
441impl<S> Filter<S> for MetricsFilter {
442 fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool {
443 self.is_metrics_event(meta)
444 }
445
446 fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> Interest {
447 if self.is_metrics_event(meta) {
448 Interest::always()
449 } else {
450 Interest::never()
451 }
452 }
453}
454
455struct InstrumentLayer {
456 meter: Meter,
457 instruments: Instruments,
458}
459
460impl<S> Layer<S> for InstrumentLayer
461where
462 S: Subscriber + for<'span> LookupSpan<'span>,
463{
464 fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
465 let mut attributes = SmallVec::new();
466 let mut visited_metrics = SmallVec::new();
467 let mut metric_visitor = MetricVisitor {
468 attributes: &mut attributes,
469 visited_metrics: &mut visited_metrics,
470 };
471 event.record(&mut metric_visitor);
472
473 visited_metrics
475 .into_iter()
476 .for_each(|(metric_name, value)| {
477 self.instruments.update_metric(
478 &self.meter,
479 value,
480 metric_name,
481 attributes.as_slice(),
482 );
483 })
484 }
485}
486
487impl<S> Layer<S> for MetricsLayer<S>
488where
489 S: Subscriber + for<'span> LookupSpan<'span>,
490{
491 fn on_layer(&mut self, subscriber: &mut S) {
492 self.inner.on_layer(subscriber)
493 }
494
495 fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
496 self.inner.register_callsite(metadata)
497 }
498
499 fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
500 self.inner.enabled(metadata, ctx)
501 }
502
503 fn on_new_span(
504 &self,
505 attrs: &tracing_core::span::Attributes<'_>,
506 id: &tracing_core::span::Id,
507 ctx: Context<'_, S>,
508 ) {
509 self.inner.on_new_span(attrs, id, ctx)
510 }
511
512 fn max_level_hint(&self) -> Option<tracing_core::LevelFilter> {
513 self.inner.max_level_hint()
514 }
515
516 fn on_record(
517 &self,
518 span: &tracing_core::span::Id,
519 values: &tracing_core::span::Record<'_>,
520 ctx: Context<'_, S>,
521 ) {
522 self.inner.on_record(span, values, ctx)
523 }
524
525 fn on_follows_from(
526 &self,
527 span: &tracing_core::span::Id,
528 follows: &tracing_core::span::Id,
529 ctx: Context<'_, S>,
530 ) {
531 self.inner.on_follows_from(span, follows, ctx)
532 }
533
534 fn on_event(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) {
535 self.inner.on_event(event, ctx)
536 }
537
538 fn on_enter(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) {
539 self.inner.on_enter(id, ctx)
540 }
541
542 fn on_exit(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) {
543 self.inner.on_exit(id, ctx)
544 }
545
546 fn on_close(&self, id: tracing_core::span::Id, ctx: Context<'_, S>) {
547 self.inner.on_close(id, ctx)
548 }
549
550 fn on_id_change(
551 &self,
552 old: &tracing_core::span::Id,
553 new: &tracing_core::span::Id,
554 ctx: Context<'_, S>,
555 ) {
556 self.inner.on_id_change(old, new, ctx)
557 }
558}
559
560#[cfg(test)]
561mod tests {
562 use super::*;
563 use tracing_subscriber::layer::SubscriberExt;
564
565 struct PanicLayer;
566 impl<S> Layer<S> for PanicLayer
567 where
568 S: Subscriber + for<'span> LookupSpan<'span>,
569 {
570 fn on_event(&self, _event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) {
571 panic!("panic");
572 }
573 }
574
575 #[test]
576 fn filter_layer_should_filter_non_metrics_event() {
577 let layer = PanicLayer.with_filter(MetricsFilter);
578 let subscriber = tracing_subscriber::registry().with(layer);
579
580 tracing::subscriber::with_default(subscriber, || {
581 tracing::info!(key = "val", "foo");
582 });
583 }
584}