1use core::fmt;
2use std::{
3 borrow::Cow,
4 collections::{HashMap, HashSet},
5 sync::{Arc, Mutex},
6};
7
8use opentelemetry::{
9 global,
10 metrics::{CallbackRegistration, MetricsError, Result},
11 KeyValue,
12};
13
14use crate::{
15 instrumentation::Scope,
16 metrics::{
17 aggregation,
18 data::{Metric, ResourceMetrics, ScopeMetrics},
19 instrument::{Instrument, InstrumentId, InstrumentKind, Stream},
20 internal,
21 internal::AggregateBuilder,
22 internal::Number,
23 reader::{AggregationSelector, DefaultAggregationSelector, MetricReader, SdkProducer},
24 view::View,
25 },
26 Resource,
27};
28
29#[doc(hidden)]
38pub struct Pipeline {
39 pub(crate) resource: Resource,
40 reader: Box<dyn MetricReader>,
41 views: Vec<Arc<dyn View>>,
42 inner: Box<Mutex<PipelineInner>>,
43}
44
45impl fmt::Debug for Pipeline {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 f.write_str("Pipeline")
48 }
49}
50
51type GenericCallback = Arc<dyn Fn() + Send + Sync>;
53
54#[derive(Default)]
55struct PipelineInner {
56 aggregations: HashMap<Scope, Vec<InstrumentSync>>,
57 callbacks: Vec<GenericCallback>,
58 multi_callbacks: Vec<Option<GenericCallback>>,
59}
60
61impl fmt::Debug for PipelineInner {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 f.debug_struct("PipelineInner")
64 .field("aggregations", &self.aggregations)
65 .field("callbacks", &self.callbacks.len())
66 .finish()
67 }
68}
69
70impl Pipeline {
71 fn add_sync(&self, scope: Scope, i_sync: InstrumentSync) {
77 let _ = self.inner.lock().map(|mut inner| {
78 inner.aggregations.entry(scope).or_default().push(i_sync);
79 });
80 }
81
82 fn add_callback(&self, callback: GenericCallback) {
84 let _ = self
85 .inner
86 .lock()
87 .map(|mut inner| inner.callbacks.push(callback));
88 }
89
90 fn add_multi_callback(
92 &self,
93 callback: GenericCallback,
94 ) -> Result<impl FnOnce(&Pipeline) -> Result<()>> {
95 let mut inner = self.inner.lock()?;
96 inner.multi_callbacks.push(Some(callback));
97 let idx = inner.multi_callbacks.len() - 1;
98
99 Ok(move |this: &Pipeline| {
100 let mut inner = this.inner.lock()?;
101 inner.multi_callbacks[idx] = None;
103 Ok(())
104 })
105 }
106
107 fn force_flush(&self) -> Result<()> {
109 self.reader.force_flush()
110 }
111
112 fn shutdown(&self) -> Result<()> {
114 self.reader.shutdown()
115 }
116}
117
118impl SdkProducer for Pipeline {
119 fn produce(&self, rm: &mut ResourceMetrics) -> Result<()> {
121 let inner = self.inner.lock()?;
122 for cb in &inner.callbacks {
123 cb();
125 }
126
127 for mcb in inner.multi_callbacks.iter().flatten() {
128 mcb();
130 }
131
132 rm.resource = self.resource.clone();
133 if inner.aggregations.len() > rm.scope_metrics.len() {
134 rm.scope_metrics
135 .reserve(inner.aggregations.len() - rm.scope_metrics.len());
136 }
137
138 let mut i = 0;
139 for (scope, instruments) in inner.aggregations.iter() {
140 let sm = match rm.scope_metrics.get_mut(i) {
141 Some(sm) => sm,
142 None => {
143 rm.scope_metrics.push(ScopeMetrics::default());
144 rm.scope_metrics.last_mut().unwrap()
145 }
146 };
147 if instruments.len() > sm.metrics.len() {
148 sm.metrics.reserve(instruments.len() - sm.metrics.len());
149 }
150
151 let mut j = 0;
152 for inst in instruments {
153 let mut m = sm.metrics.get_mut(j);
154 match (inst.comp_agg.call(m.as_mut().map(|m| m.data.as_mut())), m) {
155 ((len, Some(initial_agg)), None) if len > 0 => sm.metrics.push(Metric {
157 name: inst.name.clone(),
158 description: inst.description.clone(),
159 unit: inst.unit.clone(),
160 data: initial_agg,
161 }),
162 ((len, data), Some(prev_agg)) if len > 0 => {
164 if let Some(data) = data {
165 prev_agg.data = data;
167 }
168 prev_agg.name.clone_from(&inst.name);
169 prev_agg.description.clone_from(&inst.description);
170 prev_agg.unit.clone_from(&inst.unit);
171 }
172 _ => continue,
173 }
174
175 j += 1;
176 }
177
178 sm.metrics.truncate(j);
179 if !sm.metrics.is_empty() {
180 sm.scope = scope.clone();
181 i += 1;
182 }
183 }
184
185 rm.scope_metrics.truncate(i);
186
187 Ok(())
188 }
189}
190
191struct InstrumentSync {
193 name: Cow<'static, str>,
194 description: Cow<'static, str>,
195 unit: Cow<'static, str>,
196 comp_agg: Box<dyn internal::ComputeAggregation>,
197}
198
199impl fmt::Debug for InstrumentSync {
200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201 f.debug_struct("InstrumentSync")
202 .field("name", &self.name)
203 .field("description", &self.description)
204 .field("unit", &self.unit)
205 .finish()
206 }
207}
208
209type Cache<T> = Mutex<HashMap<InstrumentId, Result<Option<Arc<dyn internal::Measure<T>>>>>>;
210
211struct Inserter<T> {
213 aggregators: Cache<T>,
221
222 views: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
229
230 pipeline: Arc<Pipeline>,
231}
232
233impl<T> Inserter<T>
234where
235 T: Number<T>,
236{
237 fn new(p: Arc<Pipeline>, vc: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>) -> Self {
238 Inserter {
239 aggregators: Default::default(),
240 views: vc,
241 pipeline: Arc::clone(&p),
242 }
243 }
244
245 fn instrument(&self, inst: Instrument) -> Result<Vec<Arc<dyn internal::Measure<T>>>> {
269 let mut matched = false;
270 let mut measures = vec![];
271 let mut errs = vec![];
272 let kind = match inst.kind {
273 Some(kind) => kind,
274 None => return Err(MetricsError::Other("instrument must have a kind".into())),
275 };
276
277 let mut seen = HashSet::new();
279 for v in &self.pipeline.views {
280 let stream = match v.match_inst(&inst) {
281 Some(stream) => stream,
282 None => continue,
283 };
284 matched = true;
285
286 let id = self.inst_id(kind, &stream);
287 if seen.contains(&id) {
288 continue; }
290
291 let agg = match self.cached_aggregator(&inst.scope, kind, stream) {
292 Ok(Some(agg)) => agg,
293 Ok(None) => continue, Err(err) => {
295 errs.push(err);
296 continue;
297 }
298 };
299 seen.insert(id);
300 measures.push(agg);
301 }
302
303 if matched {
304 if errs.is_empty() {
305 return Ok(measures);
306 } else {
307 return Err(MetricsError::Other(format!("{errs:?}")));
308 }
309 }
310
311 let stream = Stream {
313 name: inst.name,
314 description: inst.description,
315 unit: inst.unit,
316 aggregation: None,
317 allowed_attribute_keys: None,
318 };
319
320 match self.cached_aggregator(&inst.scope, kind, stream) {
321 Ok(agg) => {
322 if errs.is_empty() {
323 if let Some(agg) = agg {
324 measures.push(agg);
325 }
326 Ok(measures)
327 } else {
328 Err(MetricsError::Other(format!("{errs:?}")))
329 }
330 }
331 Err(err) => {
332 errs.push(err);
333 Err(MetricsError::Other(format!("{errs:?}")))
334 }
335 }
336 }
337
338 fn cached_aggregator(
352 &self,
353 scope: &Scope,
354 kind: InstrumentKind,
355 mut stream: Stream,
356 ) -> Result<Option<Arc<dyn internal::Measure<T>>>> {
357 let mut agg = stream
358 .aggregation
359 .take()
360 .unwrap_or_else(|| self.pipeline.reader.aggregation(kind));
361
362 if matches!(agg, aggregation::Aggregation::Default) {
364 agg = DefaultAggregationSelector::new().aggregation(kind);
365 }
366
367 if let Err(err) = is_aggregator_compatible(&kind, &agg) {
368 return Err(MetricsError::Other(format!(
369 "creating aggregator with instrumentKind: {:?}, aggregation {:?}: {:?}",
370 kind, stream.aggregation, err,
371 )));
372 }
373
374 let mut id = self.inst_id(kind, &stream);
375 self.log_conflict(&id);
378
379 id.normalize();
383
384 let mut cache = self.aggregators.lock()?;
385
386 let cached = cache.entry(id).or_insert_with(|| {
387 let filter = stream
388 .allowed_attribute_keys
389 .clone()
390 .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>);
391
392 let b = AggregateBuilder::new(Some(self.pipeline.reader.temporality(kind)), filter);
393 let (m, ca) = match aggregate_fn(b, &agg, kind) {
394 Ok(Some((m, ca))) => (m, ca),
395 other => return other.map(|fs| fs.map(|(m, _)| m)), };
397
398 self.pipeline.add_sync(
399 scope.clone(),
400 InstrumentSync {
401 name: stream.name,
402 description: stream.description,
403 unit: stream.unit,
404 comp_agg: ca,
405 },
406 );
407
408 Ok(Some(m))
409 });
410
411 match cached {
412 Ok(opt) => Ok(opt.clone()),
413 Err(err) => Err(MetricsError::Other(err.to_string())),
414 }
415 }
416
417 fn log_conflict(&self, id: &InstrumentId) {
421 if let Ok(views) = self.views.lock() {
422 if let Some(existing) = views.get(id.name.to_lowercase().as_str()) {
423 if existing == id {
424 return;
425 }
426
427 global::handle_error(MetricsError::Other(format!(
428 "duplicate metric stream definitions, names: ({} and {}), descriptions: ({} and {}), kinds: ({:?} and {:?}), units: ({:?} and {:?}), and numbers: ({} and {})",
429 existing.name, id.name,
430 existing.description, id.description,
431 existing.kind, id.kind,
432 existing.unit, id.unit,
433 existing.number, id.number,
434 )))
435 }
436 }
437 }
438
439 fn inst_id(&self, kind: InstrumentKind, stream: &Stream) -> InstrumentId {
440 InstrumentId {
441 name: stream.name.clone(),
442 description: stream.description.clone(),
443 kind,
444 unit: stream.unit.clone(),
445 number: Cow::Borrowed(std::any::type_name::<T>()),
446 }
447 }
448}
449
450type AggregateFns<T> = (
451 Arc<dyn internal::Measure<T>>,
452 Box<dyn internal::ComputeAggregation>,
453);
454
455fn aggregate_fn<T: Number<T>>(
459 b: AggregateBuilder<T>,
460 agg: &aggregation::Aggregation,
461 kind: InstrumentKind,
462) -> Result<Option<AggregateFns<T>>> {
463 use aggregation::Aggregation;
464 fn box_val<T>(
465 (m, ca): (impl internal::Measure<T>, impl internal::ComputeAggregation),
466 ) -> (
467 Arc<dyn internal::Measure<T>>,
468 Box<dyn internal::ComputeAggregation>,
469 ) {
470 (Arc::new(m), Box::new(ca))
471 }
472
473 match agg {
474 Aggregation::Default => aggregate_fn(
475 b,
476 &DefaultAggregationSelector::new().aggregation(kind),
477 kind,
478 ),
479 Aggregation::Drop => Ok(None),
480 Aggregation::LastValue => Ok(Some(box_val(b.last_value()))),
481 Aggregation::Sum => {
482 let fns = match kind {
483 InstrumentKind::ObservableCounter => box_val(b.precomputed_sum(true)),
484 InstrumentKind::ObservableUpDownCounter => box_val(b.precomputed_sum(false)),
485 InstrumentKind::Counter | InstrumentKind::Histogram => box_val(b.sum(true)),
486 _ => box_val(b.sum(false)),
487 };
488 Ok(Some(fns))
489 }
490 Aggregation::ExplicitBucketHistogram {
491 boundaries,
492 record_min_max,
493 } => {
494 let record_sum = !matches!(
495 kind,
496 InstrumentKind::UpDownCounter
497 | InstrumentKind::ObservableUpDownCounter
498 | InstrumentKind::ObservableGauge
499 );
500 Ok(Some(box_val(b.explicit_bucket_histogram(
501 boundaries.to_vec(),
502 *record_min_max,
503 record_sum,
504 ))))
505 }
506 Aggregation::Base2ExponentialHistogram {
507 max_size,
508 max_scale,
509 record_min_max,
510 } => {
511 let record_sum = !matches!(
512 kind,
513 InstrumentKind::UpDownCounter
514 | InstrumentKind::ObservableUpDownCounter
515 | InstrumentKind::ObservableGauge
516 );
517 Ok(Some(box_val(b.exponential_bucket_histogram(
518 *max_size,
519 *max_scale,
520 *record_min_max,
521 record_sum,
522 ))))
523 }
524 }
525}
526
527fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregation) -> Result<()> {
541 use aggregation::Aggregation;
542 match agg {
543 Aggregation::Default => Ok(()),
544 Aggregation::ExplicitBucketHistogram { .. }
545 | Aggregation::Base2ExponentialHistogram { .. } => {
546 if matches!(
547 kind,
548 InstrumentKind::Counter
549 | InstrumentKind::UpDownCounter
550 | InstrumentKind::Gauge
551 | InstrumentKind::Histogram
552 | InstrumentKind::ObservableCounter
553 | InstrumentKind::ObservableUpDownCounter
554 | InstrumentKind::ObservableGauge
555 ) {
556 return Ok(());
557 }
558 Err(MetricsError::Other("incompatible aggregation".into()))
559 }
560 Aggregation::Sum => {
561 match kind {
562 InstrumentKind::ObservableCounter
563 | InstrumentKind::ObservableUpDownCounter
564 | InstrumentKind::Counter
565 | InstrumentKind::Histogram
566 | InstrumentKind::UpDownCounter => Ok(()),
567 _ => {
568 Err(MetricsError::Other("incompatible aggregation".into()))
571 }
572 }
573 }
574 Aggregation::LastValue => {
575 match kind {
576 InstrumentKind::Gauge | InstrumentKind::ObservableGauge => Ok(()),
577 _ => {
578 Err(MetricsError::Other("incompatible aggregation".into()))
581 }
582 }
583 }
584 Aggregation::Drop => Ok(()),
585 }
586}
587
588#[derive(Clone, Debug)]
590pub(crate) struct Pipelines(pub(crate) Vec<Arc<Pipeline>>);
591
592impl Pipelines {
593 pub(crate) fn new(
594 res: Resource,
595 readers: Vec<Box<dyn MetricReader>>,
596 views: Vec<Arc<dyn View>>,
597 ) -> Self {
598 let mut pipes = Vec::with_capacity(readers.len());
599 for r in readers {
600 let p = Arc::new(Pipeline {
601 resource: res.clone(),
602 reader: r,
603 views: views.clone(),
604 inner: Default::default(),
605 });
606 p.reader.register_pipeline(Arc::downgrade(&p));
607 pipes.push(p);
608 }
609
610 Pipelines(pipes)
611 }
612
613 pub(crate) fn register_callback<F>(&self, callback: F)
614 where
615 F: Fn() + Send + Sync + 'static,
616 {
617 let cb = Arc::new(callback);
618 for pipe in &self.0 {
619 pipe.add_callback(cb.clone())
620 }
621 }
622
623 pub(crate) fn register_multi_callback<F>(&self, f: F) -> Result<Box<dyn CallbackRegistration>>
625 where
626 F: Fn() + Send + Sync + 'static,
627 {
628 let cb = Arc::new(f);
629
630 let fns = self
631 .0
632 .iter()
633 .map(|pipe| {
634 let pipe = Arc::clone(pipe);
635 let unreg = pipe.add_multi_callback(cb.clone())?;
636 Ok(Box::new(move || unreg(pipe.as_ref())) as _)
637 })
638 .collect::<Result<_>>()?;
639
640 Ok(Box::new(Unregister(fns)))
641 }
642
643 pub(crate) fn force_flush(&self) -> Result<()> {
645 let mut errs = vec![];
646 for pipeline in &self.0 {
647 if let Err(err) = pipeline.force_flush() {
648 errs.push(err);
649 }
650 }
651
652 if errs.is_empty() {
653 Ok(())
654 } else {
655 Err(MetricsError::Other(format!("{errs:?}")))
656 }
657 }
658
659 pub(crate) fn shutdown(&self) -> Result<()> {
661 let mut errs = vec![];
662 for pipeline in &self.0 {
663 if let Err(err) = pipeline.shutdown() {
664 errs.push(err);
665 }
666 }
667
668 if errs.is_empty() {
669 Ok(())
670 } else {
671 Err(MetricsError::Other(format!("{errs:?}")))
672 }
673 }
674}
675
676struct Unregister(Vec<Box<dyn FnOnce() -> Result<()> + Send + Sync>>);
677
678impl CallbackRegistration for Unregister {
679 fn unregister(&mut self) -> Result<()> {
680 let mut errs = vec![];
681 while let Some(unreg) = self.0.pop() {
682 if let Err(err) = unreg() {
683 errs.push(err);
684 }
685 }
686
687 if errs.is_empty() {
688 Ok(())
689 } else {
690 Err(MetricsError::Other(format!("{errs:?}")))
691 }
692 }
693}
694
695pub(crate) struct Resolver<T> {
699 inserters: Vec<Inserter<T>>,
700}
701
702impl<T> Resolver<T>
703where
704 T: Number<T>,
705{
706 pub(crate) fn new(
707 pipelines: Arc<Pipelines>,
708 view_cache: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
709 ) -> Self {
710 let inserters = pipelines
711 .0
712 .iter()
713 .map(|pipe| Inserter::new(Arc::clone(pipe), Arc::clone(&view_cache)))
714 .collect();
715
716 Resolver { inserters }
717 }
718
719 pub(crate) fn measures(&self, id: Instrument) -> Result<Vec<Arc<dyn internal::Measure<T>>>> {
721 let (mut measures, mut errs) = (vec![], vec![]);
722
723 for inserter in &self.inserters {
724 match inserter.instrument(id.clone()) {
725 Ok(ms) => measures.extend(ms),
726 Err(err) => errs.push(err),
727 }
728 }
729
730 if errs.is_empty() {
731 Ok(measures)
732 } else {
733 Err(MetricsError::Other(format!("{errs:?}")))
734 }
735 }
736}