1use std::{any::Any, borrow::Cow, collections::HashSet, hash::Hash, marker, sync::Arc};
23use opentelemetry::{
4 metrics::{
5 AsyncInstrument, MetricsError, Result, SyncCounter, SyncGauge, SyncHistogram,
6 SyncUpDownCounter,
7 },
8 Key, KeyValue,
9};
1011use crate::{
12 instrumentation::Scope,
13 metrics::AttributeSet,
14 metrics::{aggregation::Aggregation, internal::Measure},
15};
1617pub(crate) const EMPTY_MEASURE_MSG: &str = "no aggregators for observable instrument";
1819/// The identifier of a group of instruments that all perform the same function.
20#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
21pub enum InstrumentKind {
22/// Identifies a group of instruments that record increasing values synchronously
23 /// with the code path they are measuring.
24Counter,
25/// A group of instruments that record increasing and decreasing values
26 /// synchronously with the code path they are measuring.
27UpDownCounter,
28/// A group of instruments that record a distribution of values synchronously with
29 /// the code path they are measuring.
30Histogram,
31/// A group of instruments that record increasing values in an asynchronous
32 /// callback.
33ObservableCounter,
34/// A group of instruments that record increasing and decreasing values in an
35 /// asynchronous callback.
36ObservableUpDownCounter,
3738/// a group of instruments that record current value synchronously with
39 /// the code path they are measuring.
40Gauge,
41///
42 /// a group of instruments that record current values in an asynchronous callback.
43ObservableGauge,
44}
4546/// Describes properties an instrument is created with, also used for filtering
47/// in [View](crate::metrics::View)s.
48///
49/// # Example
50///
51/// Instruments can be used as criteria for views.
52///
53/// ```
54/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
55///
56/// let criteria = Instrument::new().name("counter_*");
57/// let mask = Stream::new().aggregation(Aggregation::Sum);
58///
59/// let view = new_view(criteria, mask);
60/// # drop(view);
61/// ```
62#[derive(Clone, Default, Debug, PartialEq)]
63#[non_exhaustive]
64pub struct Instrument {
65/// The human-readable identifier of the instrument.
66pub name: Cow<'static, str>,
67/// describes the purpose of the instrument.
68pub description: Cow<'static, str>,
69/// The functional group of the instrument.
70pub kind: Option<InstrumentKind>,
71/// Unit is the unit of measurement recorded by the instrument.
72pub unit: Cow<'static, str>,
73/// The instrumentation that created the instrument.
74pub scope: Scope,
75}
7677impl Instrument {
78/// Create a new instrument with default values
79pub fn new() -> Self {
80 Instrument::default()
81 }
8283/// Set the instrument name.
84pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
85self.name = name.into();
86self
87}
8889/// Set the instrument description.
90pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
91self.description = description.into();
92self
93}
9495/// Set the instrument unit.
96pub fn unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
97self.unit = unit.into();
98self
99}
100101/// Set the instrument scope.
102pub fn scope(mut self, scope: Scope) -> Self {
103self.scope = scope;
104self
105}
106107/// empty returns if all fields of i are their default-value.
108pub(crate) fn is_empty(&self) -> bool {
109self.name == ""
110&& self.description == ""
111&& self.kind.is_none()
112 && self.unit == ""
113&& self.scope == Scope::default()
114 }
115116pub(crate) fn matches(&self, other: &Instrument) -> bool {
117self.matches_name(other)
118 && self.matches_description(other)
119 && self.matches_kind(other)
120 && self.matches_unit(other)
121 && self.matches_scope(other)
122 }
123124pub(crate) fn matches_name(&self, other: &Instrument) -> bool {
125self.name.is_empty() || self.name.as_ref() == other.name.as_ref()
126 }
127128pub(crate) fn matches_description(&self, other: &Instrument) -> bool {
129self.description.is_empty() || self.description.as_ref() == other.description.as_ref()
130 }
131132pub(crate) fn matches_kind(&self, other: &Instrument) -> bool {
133self.kind.is_none() || self.kind == other.kind
134 }
135136pub(crate) fn matches_unit(&self, other: &Instrument) -> bool {
137self.unit.is_empty() || self.unit.as_ref() == other.unit.as_ref()
138 }
139140pub(crate) fn matches_scope(&self, other: &Instrument) -> bool {
141 (self.scope.name.is_empty() || self.scope.name.as_ref() == other.scope.name.as_ref())
142 && (self.scope.version.is_none()
143 || self.scope.version.as_ref().map(AsRef::as_ref)
144 == other.scope.version.as_ref().map(AsRef::as_ref))
145 && (self.scope.schema_url.is_none()
146 || self.scope.schema_url.as_ref().map(AsRef::as_ref)
147 == other.scope.schema_url.as_ref().map(AsRef::as_ref))
148 }
149}
150151/// Describes the stream of data an instrument produces.
152///
153/// # Example
154///
155/// Streams can be used as masks in views.
156///
157/// ```
158/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
159///
160/// let criteria = Instrument::new().name("counter_*");
161/// let mask = Stream::new().aggregation(Aggregation::Sum);
162///
163/// let view = new_view(criteria, mask);
164/// # drop(view);
165/// ```
166#[derive(Default, Debug)]
167#[non_exhaustive]
168pub struct Stream {
169/// The human-readable identifier of the stream.
170pub name: Cow<'static, str>,
171/// Describes the purpose of the data.
172pub description: Cow<'static, str>,
173/// the unit of measurement recorded.
174pub unit: Cow<'static, str>,
175/// Aggregation the stream uses for an instrument.
176pub aggregation: Option<Aggregation>,
177/// An allow-list of attribute keys that will be preserved for the stream.
178 ///
179 /// Any attribute recorded for the stream with a key not in this set will be
180 /// dropped. If the set is empty, all attributes will be dropped, if `None` all
181 /// attributes will be kept.
182pub allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
183}
184185impl Stream {
186/// Create a new stream with empty values.
187pub fn new() -> Self {
188 Stream::default()
189 }
190191/// Set the stream name.
192pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
193self.name = name.into();
194self
195}
196197/// Set the stream description.
198pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
199self.description = description.into();
200self
201}
202203/// Set the stream unit.
204pub fn unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
205self.unit = unit.into();
206self
207}
208209/// Set the stream aggregation.
210pub fn aggregation(mut self, aggregation: Aggregation) -> Self {
211self.aggregation = Some(aggregation);
212self
213}
214215/// Set the stream allowed attribute keys.
216 ///
217 /// Any attribute recorded for the stream with a key not in this set will be
218 /// dropped. If this set is empty all attributes will be dropped.
219pub fn allowed_attribute_keys(mut self, attribute_keys: impl IntoIterator<Item = Key>) -> Self {
220self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));
221222self
223}
224}
225226/// The identifying properties of an instrument.
227#[derive(Debug, PartialEq, Eq, Hash)]
228pub(crate) struct InstrumentId {
229/// The human-readable identifier of the instrument.
230pub(crate) name: Cow<'static, str>,
231/// Describes the purpose of the data.
232pub(crate) description: Cow<'static, str>,
233/// Defines the functional group of the instrument.
234pub(crate) kind: InstrumentKind,
235/// The unit of measurement recorded.
236pub(crate) unit: Cow<'static, str>,
237/// Number is the underlying data type of the instrument.
238pub(crate) number: Cow<'static, str>,
239}
240241impl InstrumentId {
242/// Instrument names are considered case-insensitive ASCII.
243 ///
244 /// Standardize the instrument name to always be lowercase so it can be compared
245 /// via hash.
246 ///
247 /// See [naming syntax] for full requirements.
248 ///
249 /// [naming syntax]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/api.md#instrument-name-syntax
250pub(crate) fn normalize(&mut self) {
251if self.name.chars().any(|c| c.is_ascii_uppercase()) {
252self.name = self.name.to_ascii_lowercase().into();
253 }
254 }
255}
256257pub(crate) struct ResolvedMeasures<T> {
258pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
259}
260261impl<T: Copy + 'static> SyncCounter<T> for ResolvedMeasures<T> {
262fn add(&self, val: T, attrs: &[KeyValue]) {
263for measure in &self.measures {
264 measure.call(val, AttributeSet::from(attrs))
265 }
266 }
267}
268269impl<T: Copy + 'static> SyncUpDownCounter<T> for ResolvedMeasures<T> {
270fn add(&self, val: T, attrs: &[KeyValue]) {
271for measure in &self.measures {
272 measure.call(val, AttributeSet::from(attrs))
273 }
274 }
275}
276277impl<T: Copy + 'static> SyncGauge<T> for ResolvedMeasures<T> {
278fn record(&self, val: T, attrs: &[KeyValue]) {
279for measure in &self.measures {
280 measure.call(val, AttributeSet::from(attrs))
281 }
282 }
283}
284285impl<T: Copy + 'static> SyncHistogram<T> for ResolvedMeasures<T> {
286fn record(&self, val: T, attrs: &[KeyValue]) {
287for measure in &self.measures {
288 measure.call(val, AttributeSet::from(attrs))
289 }
290 }
291}
292293/// A comparable unique identifier of an observable.
294#[derive(Clone, Debug)]
295pub(crate) struct ObservableId<T> {
296pub(crate) inner: IdInner,
297 _marker: marker::PhantomData<T>,
298}
299300#[derive(Clone, Debug, Hash, PartialEq, Eq)]
301pub(crate) struct IdInner {
302/// The human-readable identifier of the instrument.
303pub(crate) name: Cow<'static, str>,
304/// describes the purpose of the instrument.
305pub(crate) description: Cow<'static, str>,
306/// The functional group of the instrument.
307kind: InstrumentKind,
308/// The unit of measurement recorded by the instrument.
309pub(crate) unit: Cow<'static, str>,
310/// The instrumentation that created the instrument.
311scope: Scope,
312}
313314impl<T> Hash for ObservableId<T> {
315fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
316self.inner.hash(state)
317 }
318}
319320impl<T> PartialEq for ObservableId<T> {
321fn eq(&self, other: &Self) -> bool {
322self.inner == other.inner
323 }
324}
325326impl<T> Eq for ObservableId<T> {}
327328#[derive(Clone)]
329pub(crate) struct Observable<T> {
330pub(crate) id: ObservableId<T>,
331 measures: Vec<Arc<dyn Measure<T>>>,
332}
333334impl<T> Observable<T> {
335pub(crate) fn new(
336 scope: Scope,
337 kind: InstrumentKind,
338 name: Cow<'static, str>,
339 description: Cow<'static, str>,
340 unit: Cow<'static, str>,
341 measures: Vec<Arc<dyn Measure<T>>>,
342 ) -> Self {
343Self {
344 id: ObservableId {
345 inner: IdInner {
346 name,
347 description,
348 kind,
349 unit,
350 scope,
351 },
352 _marker: marker::PhantomData,
353 },
354 measures,
355 }
356 }
357358/// Returns `Err` if the observable should not be registered, and `Ok` if it
359 /// should.
360 ///
361 /// An error is returned if this observable is effectively a no-op because it does not have
362 /// any aggregators. Also, an error is returned if scope defines a Meter other
363 /// than the observable it was created by.
364pub(crate) fn registerable(&self, scope: &Scope) -> Result<()> {
365if self.measures.is_empty() {
366return Err(MetricsError::Other(EMPTY_MEASURE_MSG.into()));
367 }
368if &self.id.inner.scope != scope {
369return Err(MetricsError::Other(format!(
370"invalid registration: observable {} from Meter {:?}, registered with Meter {}",
371self.id.inner.name, self.id.inner.scope, scope.name,
372 )));
373 }
374375Ok(())
376 }
377}
378379impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
380fn observe(&self, measurement: T, attrs: &[KeyValue]) {
381for measure in &self.measures {
382 measure.call(measurement, AttributeSet::from(attrs))
383 }
384 }
385386fn as_any(&self) -> Arc<dyn Any> {
387 Arc::new(self.clone())
388 }
389}