opentelemetry_sdk/metrics/
instrument.rs1use std::{any::Any, borrow::Cow, collections::HashSet, hash::Hash, marker, sync::Arc};
2
3use opentelemetry::{
4 metrics::{
5 AsyncInstrument, MetricsError, Result, SyncCounter, SyncGauge, SyncHistogram,
6 SyncUpDownCounter,
7 },
8 Key, KeyValue,
9};
10
11use crate::{
12 instrumentation::Scope,
13 metrics::AttributeSet,
14 metrics::{aggregation::Aggregation, internal::Measure},
15};
16
17pub(crate) const EMPTY_MEASURE_MSG: &str = "no aggregators for observable instrument";
18
19#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
21pub enum InstrumentKind {
22 Counter,
25 UpDownCounter,
28 Histogram,
31 ObservableCounter,
34 ObservableUpDownCounter,
37
38 Gauge,
41 ObservableGauge,
44}
45
46#[derive(Clone, Default, Debug, PartialEq)]
63#[non_exhaustive]
64pub struct Instrument {
65 pub name: Cow<'static, str>,
67 pub description: Cow<'static, str>,
69 pub kind: Option<InstrumentKind>,
71 pub unit: Cow<'static, str>,
73 pub scope: Scope,
75}
76
77impl Instrument {
78 pub fn new() -> Self {
80 Instrument::default()
81 }
82
83 pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
85 self.name = name.into();
86 self
87 }
88
89 pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
91 self.description = description.into();
92 self
93 }
94
95 pub fn unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
97 self.unit = unit.into();
98 self
99 }
100
101 pub fn scope(mut self, scope: Scope) -> Self {
103 self.scope = scope;
104 self
105 }
106
107 pub(crate) fn is_empty(&self) -> bool {
109 self.name == ""
110 && self.description == ""
111 && self.kind.is_none()
112 && self.unit == ""
113 && self.scope == Scope::default()
114 }
115
116 pub(crate) fn matches(&self, other: &Instrument) -> bool {
117 self.matches_name(other)
118 && self.matches_description(other)
119 && self.matches_kind(other)
120 && self.matches_unit(other)
121 && self.matches_scope(other)
122 }
123
124 pub(crate) fn matches_name(&self, other: &Instrument) -> bool {
125 self.name.is_empty() || self.name.as_ref() == other.name.as_ref()
126 }
127
128 pub(crate) fn matches_description(&self, other: &Instrument) -> bool {
129 self.description.is_empty() || self.description.as_ref() == other.description.as_ref()
130 }
131
132 pub(crate) fn matches_kind(&self, other: &Instrument) -> bool {
133 self.kind.is_none() || self.kind == other.kind
134 }
135
136 pub(crate) fn matches_unit(&self, other: &Instrument) -> bool {
137 self.unit.is_empty() || self.unit.as_ref() == other.unit.as_ref()
138 }
139
140 pub(crate) fn matches_scope(&self, other: &Instrument) -> bool {
141 (self.scope.name.is_empty() || self.scope.name.as_ref() == other.scope.name.as_ref())
142 && (self.scope.version.is_none()
143 || self.scope.version.as_ref().map(AsRef::as_ref)
144 == other.scope.version.as_ref().map(AsRef::as_ref))
145 && (self.scope.schema_url.is_none()
146 || self.scope.schema_url.as_ref().map(AsRef::as_ref)
147 == other.scope.schema_url.as_ref().map(AsRef::as_ref))
148 }
149}
150
151#[derive(Default, Debug)]
167#[non_exhaustive]
168pub struct Stream {
169 pub name: Cow<'static, str>,
171 pub description: Cow<'static, str>,
173 pub unit: Cow<'static, str>,
175 pub aggregation: Option<Aggregation>,
177 pub allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
183}
184
185impl Stream {
186 pub fn new() -> Self {
188 Stream::default()
189 }
190
191 pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
193 self.name = name.into();
194 self
195 }
196
197 pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
199 self.description = description.into();
200 self
201 }
202
203 pub fn unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
205 self.unit = unit.into();
206 self
207 }
208
209 pub fn aggregation(mut self, aggregation: Aggregation) -> Self {
211 self.aggregation = Some(aggregation);
212 self
213 }
214
215 pub fn allowed_attribute_keys(mut self, attribute_keys: impl IntoIterator<Item = Key>) -> Self {
220 self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));
221
222 self
223 }
224}
225
226#[derive(Debug, PartialEq, Eq, Hash)]
228pub(crate) struct InstrumentId {
229 pub(crate) name: Cow<'static, str>,
231 pub(crate) description: Cow<'static, str>,
233 pub(crate) kind: InstrumentKind,
235 pub(crate) unit: Cow<'static, str>,
237 pub(crate) number: Cow<'static, str>,
239}
240
241impl InstrumentId {
242 pub(crate) fn normalize(&mut self) {
251 if self.name.chars().any(|c| c.is_ascii_uppercase()) {
252 self.name = self.name.to_ascii_lowercase().into();
253 }
254 }
255}
256
257pub(crate) struct ResolvedMeasures<T> {
258 pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
259}
260
261impl<T: Copy + 'static> SyncCounter<T> for ResolvedMeasures<T> {
262 fn add(&self, val: T, attrs: &[KeyValue]) {
263 for measure in &self.measures {
264 measure.call(val, AttributeSet::from(attrs))
265 }
266 }
267}
268
269impl<T: Copy + 'static> SyncUpDownCounter<T> for ResolvedMeasures<T> {
270 fn add(&self, val: T, attrs: &[KeyValue]) {
271 for measure in &self.measures {
272 measure.call(val, AttributeSet::from(attrs))
273 }
274 }
275}
276
277impl<T: Copy + 'static> SyncGauge<T> for ResolvedMeasures<T> {
278 fn record(&self, val: T, attrs: &[KeyValue]) {
279 for measure in &self.measures {
280 measure.call(val, AttributeSet::from(attrs))
281 }
282 }
283}
284
285impl<T: Copy + 'static> SyncHistogram<T> for ResolvedMeasures<T> {
286 fn record(&self, val: T, attrs: &[KeyValue]) {
287 for measure in &self.measures {
288 measure.call(val, AttributeSet::from(attrs))
289 }
290 }
291}
292
293#[derive(Clone, Debug)]
295pub(crate) struct ObservableId<T> {
296 pub(crate) inner: IdInner,
297 _marker: marker::PhantomData<T>,
298}
299
300#[derive(Clone, Debug, Hash, PartialEq, Eq)]
301pub(crate) struct IdInner {
302 pub(crate) name: Cow<'static, str>,
304 pub(crate) description: Cow<'static, str>,
306 kind: InstrumentKind,
308 pub(crate) unit: Cow<'static, str>,
310 scope: Scope,
312}
313
314impl<T> Hash for ObservableId<T> {
315 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
316 self.inner.hash(state)
317 }
318}
319
320impl<T> PartialEq for ObservableId<T> {
321 fn eq(&self, other: &Self) -> bool {
322 self.inner == other.inner
323 }
324}
325
326impl<T> Eq for ObservableId<T> {}
327
328#[derive(Clone)]
329pub(crate) struct Observable<T> {
330 pub(crate) id: ObservableId<T>,
331 measures: Vec<Arc<dyn Measure<T>>>,
332}
333
334impl<T> Observable<T> {
335 pub(crate) fn new(
336 scope: Scope,
337 kind: InstrumentKind,
338 name: Cow<'static, str>,
339 description: Cow<'static, str>,
340 unit: Cow<'static, str>,
341 measures: Vec<Arc<dyn Measure<T>>>,
342 ) -> Self {
343 Self {
344 id: ObservableId {
345 inner: IdInner {
346 name,
347 description,
348 kind,
349 unit,
350 scope,
351 },
352 _marker: marker::PhantomData,
353 },
354 measures,
355 }
356 }
357
358 pub(crate) fn registerable(&self, scope: &Scope) -> Result<()> {
365 if self.measures.is_empty() {
366 return Err(MetricsError::Other(EMPTY_MEASURE_MSG.into()));
367 }
368 if &self.id.inner.scope != scope {
369 return Err(MetricsError::Other(format!(
370 "invalid registration: observable {} from Meter {:?}, registered with Meter {}",
371 self.id.inner.name, self.id.inner.scope, scope.name,
372 )));
373 }
374
375 Ok(())
376 }
377}
378
379impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
380 fn observe(&self, measurement: T, attrs: &[KeyValue]) {
381 for measure in &self.measures {
382 measure.call(measurement, AttributeSet::from(attrs))
383 }
384 }
385
386 fn as_any(&self) -> Arc<dyn Any> {
387 Arc::new(self.clone())
388 }
389}