opentelemetry_sdk/metrics/
instrument.rs

1use std::{any::Any, borrow::Cow, collections::HashSet, hash::Hash, marker, sync::Arc};
2
3use opentelemetry::{
4    metrics::{
5        AsyncInstrument, MetricsError, Result, SyncCounter, SyncGauge, SyncHistogram,
6        SyncUpDownCounter,
7    },
8    Key, KeyValue,
9};
10
11use crate::{
12    instrumentation::Scope,
13    metrics::AttributeSet,
14    metrics::{aggregation::Aggregation, internal::Measure},
15};
16
17pub(crate) const EMPTY_MEASURE_MSG: &str = "no aggregators for observable instrument";
18
19/// The identifier of a group of instruments that all perform the same function.
20#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
21pub enum InstrumentKind {
22    /// Identifies a group of instruments that record increasing values synchronously
23    /// with the code path they are measuring.
24    Counter,
25    /// A group of instruments that record increasing and decreasing values
26    /// synchronously with the code path they are measuring.
27    UpDownCounter,
28    /// A group of instruments that record a distribution of values synchronously with
29    /// the code path they are measuring.
30    Histogram,
31    /// A group of instruments that record increasing values in an asynchronous
32    /// callback.
33    ObservableCounter,
34    /// A group of instruments that record increasing and decreasing values in an
35    /// asynchronous callback.
36    ObservableUpDownCounter,
37
38    /// a group of instruments that record current value synchronously with
39    /// the code path they are measuring.
40    Gauge,
41    ///
42    /// a group of instruments that record current values in an asynchronous callback.
43    ObservableGauge,
44}
45
46/// Describes properties an instrument is created with, also used for filtering
47/// in [View](crate::metrics::View)s.
48///
49/// # Example
50///
51/// Instruments can be used as criteria for views.
52///
53/// ```
54/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
55///
56/// let criteria = Instrument::new().name("counter_*");
57/// let mask = Stream::new().aggregation(Aggregation::Sum);
58///
59/// let view = new_view(criteria, mask);
60/// # drop(view);
61/// ```
62#[derive(Clone, Default, Debug, PartialEq)]
63#[non_exhaustive]
64pub struct Instrument {
65    /// The human-readable identifier of the instrument.
66    pub name: Cow<'static, str>,
67    /// describes the purpose of the instrument.
68    pub description: Cow<'static, str>,
69    /// The functional group of the instrument.
70    pub kind: Option<InstrumentKind>,
71    /// Unit is the unit of measurement recorded by the instrument.
72    pub unit: Cow<'static, str>,
73    /// The instrumentation that created the instrument.
74    pub scope: Scope,
75}
76
77impl Instrument {
78    /// Create a new instrument with default values
79    pub fn new() -> Self {
80        Instrument::default()
81    }
82
83    /// Set the instrument name.
84    pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
85        self.name = name.into();
86        self
87    }
88
89    /// Set the instrument description.
90    pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
91        self.description = description.into();
92        self
93    }
94
95    /// Set the instrument unit.
96    pub fn unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
97        self.unit = unit.into();
98        self
99    }
100
101    /// Set the instrument scope.
102    pub fn scope(mut self, scope: Scope) -> Self {
103        self.scope = scope;
104        self
105    }
106
107    /// empty returns if all fields of i are their default-value.
108    pub(crate) fn is_empty(&self) -> bool {
109        self.name == ""
110            && self.description == ""
111            && self.kind.is_none()
112            && self.unit == ""
113            && self.scope == Scope::default()
114    }
115
116    pub(crate) fn matches(&self, other: &Instrument) -> bool {
117        self.matches_name(other)
118            && self.matches_description(other)
119            && self.matches_kind(other)
120            && self.matches_unit(other)
121            && self.matches_scope(other)
122    }
123
124    pub(crate) fn matches_name(&self, other: &Instrument) -> bool {
125        self.name.is_empty() || self.name.as_ref() == other.name.as_ref()
126    }
127
128    pub(crate) fn matches_description(&self, other: &Instrument) -> bool {
129        self.description.is_empty() || self.description.as_ref() == other.description.as_ref()
130    }
131
132    pub(crate) fn matches_kind(&self, other: &Instrument) -> bool {
133        self.kind.is_none() || self.kind == other.kind
134    }
135
136    pub(crate) fn matches_unit(&self, other: &Instrument) -> bool {
137        self.unit.is_empty() || self.unit.as_ref() == other.unit.as_ref()
138    }
139
140    pub(crate) fn matches_scope(&self, other: &Instrument) -> bool {
141        (self.scope.name.is_empty() || self.scope.name.as_ref() == other.scope.name.as_ref())
142            && (self.scope.version.is_none()
143                || self.scope.version.as_ref().map(AsRef::as_ref)
144                    == other.scope.version.as_ref().map(AsRef::as_ref))
145            && (self.scope.schema_url.is_none()
146                || self.scope.schema_url.as_ref().map(AsRef::as_ref)
147                    == other.scope.schema_url.as_ref().map(AsRef::as_ref))
148    }
149}
150
151/// Describes the stream of data an instrument produces.
152///
153/// # Example
154///
155/// Streams can be used as masks in views.
156///
157/// ```
158/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
159///
160/// let criteria = Instrument::new().name("counter_*");
161/// let mask = Stream::new().aggregation(Aggregation::Sum);
162///
163/// let view = new_view(criteria, mask);
164/// # drop(view);
165/// ```
166#[derive(Default, Debug)]
167#[non_exhaustive]
168pub struct Stream {
169    /// The human-readable identifier of the stream.
170    pub name: Cow<'static, str>,
171    /// Describes the purpose of the data.
172    pub description: Cow<'static, str>,
173    /// the unit of measurement recorded.
174    pub unit: Cow<'static, str>,
175    /// Aggregation the stream uses for an instrument.
176    pub aggregation: Option<Aggregation>,
177    /// An allow-list of attribute keys that will be preserved for the stream.
178    ///
179    /// Any attribute recorded for the stream with a key not in this set will be
180    /// dropped. If the set is empty, all attributes will be dropped, if `None` all
181    /// attributes will be kept.
182    pub allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
183}
184
185impl Stream {
186    /// Create a new stream with empty values.
187    pub fn new() -> Self {
188        Stream::default()
189    }
190
191    /// Set the stream name.
192    pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
193        self.name = name.into();
194        self
195    }
196
197    /// Set the stream description.
198    pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
199        self.description = description.into();
200        self
201    }
202
203    /// Set the stream unit.
204    pub fn unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
205        self.unit = unit.into();
206        self
207    }
208
209    /// Set the stream aggregation.
210    pub fn aggregation(mut self, aggregation: Aggregation) -> Self {
211        self.aggregation = Some(aggregation);
212        self
213    }
214
215    /// Set the stream allowed attribute keys.
216    ///
217    /// Any attribute recorded for the stream with a key not in this set will be
218    /// dropped. If this set is empty all attributes will be dropped.
219    pub fn allowed_attribute_keys(mut self, attribute_keys: impl IntoIterator<Item = Key>) -> Self {
220        self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));
221
222        self
223    }
224}
225
226/// The identifying properties of an instrument.
227#[derive(Debug, PartialEq, Eq, Hash)]
228pub(crate) struct InstrumentId {
229    /// The human-readable identifier of the instrument.
230    pub(crate) name: Cow<'static, str>,
231    /// Describes the purpose of the data.
232    pub(crate) description: Cow<'static, str>,
233    /// Defines the functional group of the instrument.
234    pub(crate) kind: InstrumentKind,
235    /// The unit of measurement recorded.
236    pub(crate) unit: Cow<'static, str>,
237    /// Number is the underlying data type of the instrument.
238    pub(crate) number: Cow<'static, str>,
239}
240
241impl InstrumentId {
242    /// Instrument names are considered case-insensitive ASCII.
243    ///
244    /// Standardize the instrument name to always be lowercase so it can be compared
245    /// via hash.
246    ///
247    /// See [naming syntax] for full requirements.
248    ///
249    /// [naming syntax]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/api.md#instrument-name-syntax
250    pub(crate) fn normalize(&mut self) {
251        if self.name.chars().any(|c| c.is_ascii_uppercase()) {
252            self.name = self.name.to_ascii_lowercase().into();
253        }
254    }
255}
256
257pub(crate) struct ResolvedMeasures<T> {
258    pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
259}
260
261impl<T: Copy + 'static> SyncCounter<T> for ResolvedMeasures<T> {
262    fn add(&self, val: T, attrs: &[KeyValue]) {
263        for measure in &self.measures {
264            measure.call(val, AttributeSet::from(attrs))
265        }
266    }
267}
268
269impl<T: Copy + 'static> SyncUpDownCounter<T> for ResolvedMeasures<T> {
270    fn add(&self, val: T, attrs: &[KeyValue]) {
271        for measure in &self.measures {
272            measure.call(val, AttributeSet::from(attrs))
273        }
274    }
275}
276
277impl<T: Copy + 'static> SyncGauge<T> for ResolvedMeasures<T> {
278    fn record(&self, val: T, attrs: &[KeyValue]) {
279        for measure in &self.measures {
280            measure.call(val, AttributeSet::from(attrs))
281        }
282    }
283}
284
285impl<T: Copy + 'static> SyncHistogram<T> for ResolvedMeasures<T> {
286    fn record(&self, val: T, attrs: &[KeyValue]) {
287        for measure in &self.measures {
288            measure.call(val, AttributeSet::from(attrs))
289        }
290    }
291}
292
293/// A comparable unique identifier of an observable.
294#[derive(Clone, Debug)]
295pub(crate) struct ObservableId<T> {
296    pub(crate) inner: IdInner,
297    _marker: marker::PhantomData<T>,
298}
299
300#[derive(Clone, Debug, Hash, PartialEq, Eq)]
301pub(crate) struct IdInner {
302    /// The human-readable identifier of the instrument.
303    pub(crate) name: Cow<'static, str>,
304    /// describes the purpose of the instrument.
305    pub(crate) description: Cow<'static, str>,
306    /// The functional group of the instrument.
307    kind: InstrumentKind,
308    /// The unit of measurement recorded by the instrument.
309    pub(crate) unit: Cow<'static, str>,
310    /// The instrumentation that created the instrument.
311    scope: Scope,
312}
313
314impl<T> Hash for ObservableId<T> {
315    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
316        self.inner.hash(state)
317    }
318}
319
320impl<T> PartialEq for ObservableId<T> {
321    fn eq(&self, other: &Self) -> bool {
322        self.inner == other.inner
323    }
324}
325
326impl<T> Eq for ObservableId<T> {}
327
328#[derive(Clone)]
329pub(crate) struct Observable<T> {
330    pub(crate) id: ObservableId<T>,
331    measures: Vec<Arc<dyn Measure<T>>>,
332}
333
334impl<T> Observable<T> {
335    pub(crate) fn new(
336        scope: Scope,
337        kind: InstrumentKind,
338        name: Cow<'static, str>,
339        description: Cow<'static, str>,
340        unit: Cow<'static, str>,
341        measures: Vec<Arc<dyn Measure<T>>>,
342    ) -> Self {
343        Self {
344            id: ObservableId {
345                inner: IdInner {
346                    name,
347                    description,
348                    kind,
349                    unit,
350                    scope,
351                },
352                _marker: marker::PhantomData,
353            },
354            measures,
355        }
356    }
357
358    /// Returns `Err` if the observable should not be registered, and `Ok` if it
359    /// should.
360    ///
361    /// An error is returned if this observable is effectively a no-op because it does not have
362    /// any aggregators. Also, an error is returned if scope defines a Meter other
363    /// than the observable it was created by.
364    pub(crate) fn registerable(&self, scope: &Scope) -> Result<()> {
365        if self.measures.is_empty() {
366            return Err(MetricsError::Other(EMPTY_MEASURE_MSG.into()));
367        }
368        if &self.id.inner.scope != scope {
369            return Err(MetricsError::Other(format!(
370                "invalid registration: observable {} from Meter {:?}, registered with Meter {}",
371                self.id.inner.name, self.id.inner.scope, scope.name,
372            )));
373        }
374
375        Ok(())
376    }
377}
378
379impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
380    fn observe(&self, measurement: T, attrs: &[KeyValue]) {
381        for measure in &self.measures {
382            measure.call(measurement, AttributeSet::from(attrs))
383        }
384    }
385
386    fn as_any(&self) -> Arc<dyn Any> {
387        Arc::new(self.clone())
388    }
389}