opentelemetry_sdk/metrics/
pipeline.rs

1use core::fmt;
2use std::{
3    borrow::Cow,
4    collections::{HashMap, HashSet},
5    sync::{Arc, Mutex},
6};
7
8use opentelemetry::{
9    global,
10    metrics::{CallbackRegistration, MetricsError, Result},
11    KeyValue,
12};
13
14use crate::{
15    instrumentation::Scope,
16    metrics::{
17        aggregation,
18        data::{Metric, ResourceMetrics, ScopeMetrics},
19        instrument::{Instrument, InstrumentId, InstrumentKind, Stream},
20        internal,
21        internal::AggregateBuilder,
22        internal::Number,
23        reader::{AggregationSelector, DefaultAggregationSelector, MetricReader, SdkProducer},
24        view::View,
25    },
26    Resource,
27};
28
29/// Connects all of the instruments created by a meter provider to a [MetricReader].
30///
31/// This is the object that will be registered when a meter provider is
32/// created.
33///
34/// As instruments are created the instrument should be checked if it exists in
35/// the views of a the reader, and if so each aggregate function should be added
36/// to the pipeline.
37#[doc(hidden)]
38pub struct Pipeline {
39    pub(crate) resource: Resource,
40    reader: Box<dyn MetricReader>,
41    views: Vec<Arc<dyn View>>,
42    inner: Box<Mutex<PipelineInner>>,
43}
44
45impl fmt::Debug for Pipeline {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        f.write_str("Pipeline")
48    }
49}
50
51/// Single or multi-instrument callbacks
52type GenericCallback = Arc<dyn Fn() + Send + Sync>;
53
54#[derive(Default)]
55struct PipelineInner {
56    aggregations: HashMap<Scope, Vec<InstrumentSync>>,
57    callbacks: Vec<GenericCallback>,
58    multi_callbacks: Vec<Option<GenericCallback>>,
59}
60
61impl fmt::Debug for PipelineInner {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        f.debug_struct("PipelineInner")
64            .field("aggregations", &self.aggregations)
65            .field("callbacks", &self.callbacks.len())
66            .finish()
67    }
68}
69
70impl Pipeline {
71    /// Adds the [InstrumentSync] to pipeline with scope.
72    ///
73    /// This method is not idempotent. Duplicate calls will result in duplicate
74    /// additions, it is the callers responsibility to ensure this is called with
75    /// unique values.
76    fn add_sync(&self, scope: Scope, i_sync: InstrumentSync) {
77        let _ = self.inner.lock().map(|mut inner| {
78            inner.aggregations.entry(scope).or_default().push(i_sync);
79        });
80    }
81
82    /// Registers a single instrument callback to be run when `produce` is called.
83    fn add_callback(&self, callback: GenericCallback) {
84        let _ = self
85            .inner
86            .lock()
87            .map(|mut inner| inner.callbacks.push(callback));
88    }
89
90    /// Registers a multi-instrument callback to be run when `produce` is called.
91    fn add_multi_callback(
92        &self,
93        callback: GenericCallback,
94    ) -> Result<impl FnOnce(&Pipeline) -> Result<()>> {
95        let mut inner = self.inner.lock()?;
96        inner.multi_callbacks.push(Some(callback));
97        let idx = inner.multi_callbacks.len() - 1;
98
99        Ok(move |this: &Pipeline| {
100            let mut inner = this.inner.lock()?;
101            // can't compare trait objects so use index + tombstones to drop
102            inner.multi_callbacks[idx] = None;
103            Ok(())
104        })
105    }
106
107    /// Send accumulated telemetry
108    fn force_flush(&self) -> Result<()> {
109        self.reader.force_flush()
110    }
111
112    /// Shut down pipeline
113    fn shutdown(&self) -> Result<()> {
114        self.reader.shutdown()
115    }
116}
117
118impl SdkProducer for Pipeline {
119    /// Returns aggregated metrics from a single collection.
120    fn produce(&self, rm: &mut ResourceMetrics) -> Result<()> {
121        let inner = self.inner.lock()?;
122        for cb in &inner.callbacks {
123            // TODO consider parallel callbacks.
124            cb();
125        }
126
127        for mcb in inner.multi_callbacks.iter().flatten() {
128            // TODO consider parallel multi callbacks.
129            mcb();
130        }
131
132        rm.resource = self.resource.clone();
133        if inner.aggregations.len() > rm.scope_metrics.len() {
134            rm.scope_metrics
135                .reserve(inner.aggregations.len() - rm.scope_metrics.len());
136        }
137
138        let mut i = 0;
139        for (scope, instruments) in inner.aggregations.iter() {
140            let sm = match rm.scope_metrics.get_mut(i) {
141                Some(sm) => sm,
142                None => {
143                    rm.scope_metrics.push(ScopeMetrics::default());
144                    rm.scope_metrics.last_mut().unwrap()
145                }
146            };
147            if instruments.len() > sm.metrics.len() {
148                sm.metrics.reserve(instruments.len() - sm.metrics.len());
149            }
150
151            let mut j = 0;
152            for inst in instruments {
153                let mut m = sm.metrics.get_mut(j);
154                match (inst.comp_agg.call(m.as_mut().map(|m| m.data.as_mut())), m) {
155                    // No metric to re-use, expect agg to create new metric data
156                    ((len, Some(initial_agg)), None) if len > 0 => sm.metrics.push(Metric {
157                        name: inst.name.clone(),
158                        description: inst.description.clone(),
159                        unit: inst.unit.clone(),
160                        data: initial_agg,
161                    }),
162                    // Existing metric can be re-used, update its values
163                    ((len, data), Some(prev_agg)) if len > 0 => {
164                        if let Some(data) = data {
165                            // previous aggregation was of a different type
166                            prev_agg.data = data;
167                        }
168                        prev_agg.name.clone_from(&inst.name);
169                        prev_agg.description.clone_from(&inst.description);
170                        prev_agg.unit.clone_from(&inst.unit);
171                    }
172                    _ => continue,
173                }
174
175                j += 1;
176            }
177
178            sm.metrics.truncate(j);
179            if !sm.metrics.is_empty() {
180                sm.scope = scope.clone();
181                i += 1;
182            }
183        }
184
185        rm.scope_metrics.truncate(i);
186
187        Ok(())
188    }
189}
190
191/// A synchronization point between a [Pipeline] and an instrument's aggregate function.
192struct InstrumentSync {
193    name: Cow<'static, str>,
194    description: Cow<'static, str>,
195    unit: Cow<'static, str>,
196    comp_agg: Box<dyn internal::ComputeAggregation>,
197}
198
199impl fmt::Debug for InstrumentSync {
200    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201        f.debug_struct("InstrumentSync")
202            .field("name", &self.name)
203            .field("description", &self.description)
204            .field("unit", &self.unit)
205            .finish()
206    }
207}
208
209type Cache<T> = Mutex<HashMap<InstrumentId, Result<Option<Arc<dyn internal::Measure<T>>>>>>;
210
211/// Facilitates inserting of new instruments from a single scope into a pipeline.
212struct Inserter<T> {
213    /// A cache that holds aggregate function inputs whose
214    /// outputs have been inserted into the underlying reader pipeline.
215    ///
216    /// This cache ensures no duplicate aggregate functions are inserted into
217    /// the reader pipeline and if a new request during an instrument creation
218    /// asks for the same aggregate function input the same instance is
219    /// returned.
220    aggregators: Cache<T>,
221
222    /// A cache that holds instrument identifiers for all the instruments a [Meter] has
223    /// created.
224    ///
225    /// It is provided from the `Meter` that owns this inserter. This cache ensures
226    /// that during the creation of instruments with the same name but different
227    /// options (e.g. description, unit) a warning message is logged.
228    views: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
229
230    pipeline: Arc<Pipeline>,
231}
232
233impl<T> Inserter<T>
234where
235    T: Number<T>,
236{
237    fn new(p: Arc<Pipeline>, vc: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>) -> Self {
238        Inserter {
239            aggregators: Default::default(),
240            views: vc,
241            pipeline: Arc::clone(&p),
242        }
243    }
244
245    /// Inserts the provided instrument into a pipeline.
246    ///
247    /// All views the pipeline contains are matched against, and any matching view
248    /// that creates a unique [Aggregator] will be inserted into the pipeline and
249    /// included in the returned list.
250    ///
251    /// The returned aggregate functions are ensured to be deduplicated and unique.
252    /// If another view in another pipeline that is cached by this inserter's cache
253    /// has already inserted the same aggregate function for the same instrument,
254    /// that function's instance is returned.
255    ///
256    /// If another instrument has already been inserted by this inserter, or any
257    /// other using the same cache, and it conflicts with the instrument being
258    /// inserted in this call, an aggregate function matching the arguments will
259    /// still be returned but a log message will also be logged to the OTel global
260    /// logger.
261    ///
262    /// If the passed instrument would result in an incompatible aggregate function,
263    /// an error is returned and that aggregate function is not inserted or
264    /// returned.
265    ///
266    /// If an instrument is determined to use a [aggregation::Aggregation::Drop],
267    /// that instrument is not inserted nor returned.
268    fn instrument(&self, inst: Instrument) -> Result<Vec<Arc<dyn internal::Measure<T>>>> {
269        let mut matched = false;
270        let mut measures = vec![];
271        let mut errs = vec![];
272        let kind = match inst.kind {
273            Some(kind) => kind,
274            None => return Err(MetricsError::Other("instrument must have a kind".into())),
275        };
276
277        // The cache will return the same Aggregator instance. Use stream ids to de duplicate.
278        let mut seen = HashSet::new();
279        for v in &self.pipeline.views {
280            let stream = match v.match_inst(&inst) {
281                Some(stream) => stream,
282                None => continue,
283            };
284            matched = true;
285
286            let id = self.inst_id(kind, &stream);
287            if seen.contains(&id) {
288                continue; // This aggregator has already been added
289            }
290
291            let agg = match self.cached_aggregator(&inst.scope, kind, stream) {
292                Ok(Some(agg)) => agg,
293                Ok(None) => continue, // Drop aggregator.
294                Err(err) => {
295                    errs.push(err);
296                    continue;
297                }
298            };
299            seen.insert(id);
300            measures.push(agg);
301        }
302
303        if matched {
304            if errs.is_empty() {
305                return Ok(measures);
306            } else {
307                return Err(MetricsError::Other(format!("{errs:?}")));
308            }
309        }
310
311        // Apply implicit default view if no explicit matched.
312        let stream = Stream {
313            name: inst.name,
314            description: inst.description,
315            unit: inst.unit,
316            aggregation: None,
317            allowed_attribute_keys: None,
318        };
319
320        match self.cached_aggregator(&inst.scope, kind, stream) {
321            Ok(agg) => {
322                if errs.is_empty() {
323                    if let Some(agg) = agg {
324                        measures.push(agg);
325                    }
326                    Ok(measures)
327                } else {
328                    Err(MetricsError::Other(format!("{errs:?}")))
329                }
330            }
331            Err(err) => {
332                errs.push(err);
333                Err(MetricsError::Other(format!("{errs:?}")))
334            }
335        }
336    }
337
338    /// Returns the appropriate aggregate functions for an instrument configuration.
339    ///
340    /// If the exact instrument has been created within the [Scope], that
341    /// aggregate function instance will be returned. Otherwise, a new computed
342    /// aggregate function will be cached and returned.
343    ///
344    /// If the instrument configuration conflicts with an instrument that has
345    /// already been created (e.g. description, unit, data type) a warning will be
346    /// logged with the global OTel logger. A valid new aggregate function for the
347    /// instrument configuration will still be returned without an error.
348    ///
349    /// If the instrument defines an unknown or incompatible aggregation, an error
350    /// is returned.
351    fn cached_aggregator(
352        &self,
353        scope: &Scope,
354        kind: InstrumentKind,
355        mut stream: Stream,
356    ) -> Result<Option<Arc<dyn internal::Measure<T>>>> {
357        let mut agg = stream
358            .aggregation
359            .take()
360            .unwrap_or_else(|| self.pipeline.reader.aggregation(kind));
361
362        // Apply default if stream or reader aggregation returns default
363        if matches!(agg, aggregation::Aggregation::Default) {
364            agg = DefaultAggregationSelector::new().aggregation(kind);
365        }
366
367        if let Err(err) = is_aggregator_compatible(&kind, &agg) {
368            return Err(MetricsError::Other(format!(
369                "creating aggregator with instrumentKind: {:?}, aggregation {:?}: {:?}",
370                kind, stream.aggregation, err,
371            )));
372        }
373
374        let mut id = self.inst_id(kind, &stream);
375        // If there is a conflict, the specification says the view should
376        // still be applied and a warning should be logged.
377        self.log_conflict(&id);
378
379        // If there are requests for the same instrument with different name
380        // casing, the first-seen needs to be returned. Use a normalize ID for the
381        // cache lookup to ensure the correct comparison.
382        id.normalize();
383
384        let mut cache = self.aggregators.lock()?;
385
386        let cached = cache.entry(id).or_insert_with(|| {
387            let filter = stream
388                .allowed_attribute_keys
389                .clone()
390                .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>);
391
392            let b = AggregateBuilder::new(Some(self.pipeline.reader.temporality(kind)), filter);
393            let (m, ca) = match aggregate_fn(b, &agg, kind) {
394                Ok(Some((m, ca))) => (m, ca),
395                other => return other.map(|fs| fs.map(|(m, _)| m)), // Drop aggregator or error
396            };
397
398            self.pipeline.add_sync(
399                scope.clone(),
400                InstrumentSync {
401                    name: stream.name,
402                    description: stream.description,
403                    unit: stream.unit,
404                    comp_agg: ca,
405                },
406            );
407
408            Ok(Some(m))
409        });
410
411        match cached {
412            Ok(opt) => Ok(opt.clone()),
413            Err(err) => Err(MetricsError::Other(err.to_string())),
414        }
415    }
416
417    /// Validates if an instrument with the same name as id has already been created.
418    ///
419    /// If that instrument conflicts with id, a warning is logged.
420    fn log_conflict(&self, id: &InstrumentId) {
421        if let Ok(views) = self.views.lock() {
422            if let Some(existing) = views.get(id.name.to_lowercase().as_str()) {
423                if existing == id {
424                    return;
425                }
426
427                global::handle_error(MetricsError::Other(format!(
428                    "duplicate metric stream definitions, names: ({} and {}), descriptions: ({} and {}), kinds: ({:?} and {:?}), units: ({:?} and {:?}), and numbers: ({} and {})",
429                    existing.name, id.name,
430                    existing.description, id.description,
431                    existing.kind, id.kind,
432                    existing.unit, id.unit,
433                    existing.number, id.number,
434               )))
435            }
436        }
437    }
438
439    fn inst_id(&self, kind: InstrumentKind, stream: &Stream) -> InstrumentId {
440        InstrumentId {
441            name: stream.name.clone(),
442            description: stream.description.clone(),
443            kind,
444            unit: stream.unit.clone(),
445            number: Cow::Borrowed(std::any::type_name::<T>()),
446        }
447    }
448}
449
450type AggregateFns<T> = (
451    Arc<dyn internal::Measure<T>>,
452    Box<dyn internal::ComputeAggregation>,
453);
454
455/// Returns new aggregate functions for the given params.
456///
457/// If the aggregation is unknown or temporality is invalid, an error is returned.
458fn aggregate_fn<T: Number<T>>(
459    b: AggregateBuilder<T>,
460    agg: &aggregation::Aggregation,
461    kind: InstrumentKind,
462) -> Result<Option<AggregateFns<T>>> {
463    use aggregation::Aggregation;
464    fn box_val<T>(
465        (m, ca): (impl internal::Measure<T>, impl internal::ComputeAggregation),
466    ) -> (
467        Arc<dyn internal::Measure<T>>,
468        Box<dyn internal::ComputeAggregation>,
469    ) {
470        (Arc::new(m), Box::new(ca))
471    }
472
473    match agg {
474        Aggregation::Default => aggregate_fn(
475            b,
476            &DefaultAggregationSelector::new().aggregation(kind),
477            kind,
478        ),
479        Aggregation::Drop => Ok(None),
480        Aggregation::LastValue => Ok(Some(box_val(b.last_value()))),
481        Aggregation::Sum => {
482            let fns = match kind {
483                InstrumentKind::ObservableCounter => box_val(b.precomputed_sum(true)),
484                InstrumentKind::ObservableUpDownCounter => box_val(b.precomputed_sum(false)),
485                InstrumentKind::Counter | InstrumentKind::Histogram => box_val(b.sum(true)),
486                _ => box_val(b.sum(false)),
487            };
488            Ok(Some(fns))
489        }
490        Aggregation::ExplicitBucketHistogram {
491            boundaries,
492            record_min_max,
493        } => {
494            let record_sum = !matches!(
495                kind,
496                InstrumentKind::UpDownCounter
497                    | InstrumentKind::ObservableUpDownCounter
498                    | InstrumentKind::ObservableGauge
499            );
500            Ok(Some(box_val(b.explicit_bucket_histogram(
501                boundaries.to_vec(),
502                *record_min_max,
503                record_sum,
504            ))))
505        }
506        Aggregation::Base2ExponentialHistogram {
507            max_size,
508            max_scale,
509            record_min_max,
510        } => {
511            let record_sum = !matches!(
512                kind,
513                InstrumentKind::UpDownCounter
514                    | InstrumentKind::ObservableUpDownCounter
515                    | InstrumentKind::ObservableGauge
516            );
517            Ok(Some(box_val(b.exponential_bucket_histogram(
518                *max_size,
519                *max_scale,
520                *record_min_max,
521                record_sum,
522            ))))
523        }
524    }
525}
526
527/// Checks if the aggregation can be used by the instrument.
528///
529/// Current compatibility:
530///
531/// | Instrument Kind          | Drop | LastValue | Sum | Histogram | Exponential Histogram |
532/// |--------------------------|------|-----------|-----|-----------|-----------------------|
533/// | Counter                  | ✓    |           | ✓   | ✓         | ✓                     |
534/// | UpDownCounter            | ✓    |           | ✓   | ✓         | ✓                     |
535/// | Histogram                | ✓    |           | ✓   | ✓         | ✓                     |
536/// | Observable Counter       | ✓    |           | ✓   | ✓         | ✓                     |
537/// | Observable UpDownCounter | ✓    |           | ✓   | ✓         | ✓                     |
538/// | Gauge                    | ✓    | ✓         |     | ✓         | ✓                     |
539/// | Observable Gauge         | ✓    | ✓         |     | ✓         | ✓                     |
540fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregation) -> Result<()> {
541    use aggregation::Aggregation;
542    match agg {
543        Aggregation::Default => Ok(()),
544        Aggregation::ExplicitBucketHistogram { .. }
545        | Aggregation::Base2ExponentialHistogram { .. } => {
546            if matches!(
547                kind,
548                InstrumentKind::Counter
549                    | InstrumentKind::UpDownCounter
550                    | InstrumentKind::Gauge
551                    | InstrumentKind::Histogram
552                    | InstrumentKind::ObservableCounter
553                    | InstrumentKind::ObservableUpDownCounter
554                    | InstrumentKind::ObservableGauge
555            ) {
556                return Ok(());
557            }
558            Err(MetricsError::Other("incompatible aggregation".into()))
559        }
560        Aggregation::Sum => {
561            match kind {
562                InstrumentKind::ObservableCounter
563                | InstrumentKind::ObservableUpDownCounter
564                | InstrumentKind::Counter
565                | InstrumentKind::Histogram
566                | InstrumentKind::UpDownCounter => Ok(()),
567                _ => {
568                    // TODO: review need for aggregation check after
569                    // https://github.com/open-telemetry/opentelemetry-specification/issues/2710
570                    Err(MetricsError::Other("incompatible aggregation".into()))
571                }
572            }
573        }
574        Aggregation::LastValue => {
575            match kind {
576                InstrumentKind::Gauge | InstrumentKind::ObservableGauge => Ok(()),
577                _ => {
578                    // TODO: review need for aggregation check after
579                    // https://github.com/open-telemetry/opentelemetry-specification/issues/2710
580                    Err(MetricsError::Other("incompatible aggregation".into()))
581                }
582            }
583        }
584        Aggregation::Drop => Ok(()),
585    }
586}
587
588/// The group of pipelines connecting Readers with instrument measurement.
589#[derive(Clone, Debug)]
590pub(crate) struct Pipelines(pub(crate) Vec<Arc<Pipeline>>);
591
592impl Pipelines {
593    pub(crate) fn new(
594        res: Resource,
595        readers: Vec<Box<dyn MetricReader>>,
596        views: Vec<Arc<dyn View>>,
597    ) -> Self {
598        let mut pipes = Vec::with_capacity(readers.len());
599        for r in readers {
600            let p = Arc::new(Pipeline {
601                resource: res.clone(),
602                reader: r,
603                views: views.clone(),
604                inner: Default::default(),
605            });
606            p.reader.register_pipeline(Arc::downgrade(&p));
607            pipes.push(p);
608        }
609
610        Pipelines(pipes)
611    }
612
613    pub(crate) fn register_callback<F>(&self, callback: F)
614    where
615        F: Fn() + Send + Sync + 'static,
616    {
617        let cb = Arc::new(callback);
618        for pipe in &self.0 {
619            pipe.add_callback(cb.clone())
620        }
621    }
622
623    /// Registers a multi-instrument callback to be run when `produce` is called.
624    pub(crate) fn register_multi_callback<F>(&self, f: F) -> Result<Box<dyn CallbackRegistration>>
625    where
626        F: Fn() + Send + Sync + 'static,
627    {
628        let cb = Arc::new(f);
629
630        let fns = self
631            .0
632            .iter()
633            .map(|pipe| {
634                let pipe = Arc::clone(pipe);
635                let unreg = pipe.add_multi_callback(cb.clone())?;
636                Ok(Box::new(move || unreg(pipe.as_ref())) as _)
637            })
638            .collect::<Result<_>>()?;
639
640        Ok(Box::new(Unregister(fns)))
641    }
642
643    /// Force flush all pipelines
644    pub(crate) fn force_flush(&self) -> Result<()> {
645        let mut errs = vec![];
646        for pipeline in &self.0 {
647            if let Err(err) = pipeline.force_flush() {
648                errs.push(err);
649            }
650        }
651
652        if errs.is_empty() {
653            Ok(())
654        } else {
655            Err(MetricsError::Other(format!("{errs:?}")))
656        }
657    }
658
659    /// Shut down all pipelines
660    pub(crate) fn shutdown(&self) -> Result<()> {
661        let mut errs = vec![];
662        for pipeline in &self.0 {
663            if let Err(err) = pipeline.shutdown() {
664                errs.push(err);
665            }
666        }
667
668        if errs.is_empty() {
669            Ok(())
670        } else {
671            Err(MetricsError::Other(format!("{errs:?}")))
672        }
673    }
674}
675
676struct Unregister(Vec<Box<dyn FnOnce() -> Result<()> + Send + Sync>>);
677
678impl CallbackRegistration for Unregister {
679    fn unregister(&mut self) -> Result<()> {
680        let mut errs = vec![];
681        while let Some(unreg) = self.0.pop() {
682            if let Err(err) = unreg() {
683                errs.push(err);
684            }
685        }
686
687        if errs.is_empty() {
688            Ok(())
689        } else {
690            Err(MetricsError::Other(format!("{errs:?}")))
691        }
692    }
693}
694
695/// resolver facilitates resolving aggregate functions an instrument calls to
696/// aggregate measurements with while updating all pipelines that need to pull from
697/// those aggregations.
698pub(crate) struct Resolver<T> {
699    inserters: Vec<Inserter<T>>,
700}
701
702impl<T> Resolver<T>
703where
704    T: Number<T>,
705{
706    pub(crate) fn new(
707        pipelines: Arc<Pipelines>,
708        view_cache: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
709    ) -> Self {
710        let inserters = pipelines
711            .0
712            .iter()
713            .map(|pipe| Inserter::new(Arc::clone(pipe), Arc::clone(&view_cache)))
714            .collect();
715
716        Resolver { inserters }
717    }
718
719    /// The measures that must be updated by the instrument defined by key.
720    pub(crate) fn measures(&self, id: Instrument) -> Result<Vec<Arc<dyn internal::Measure<T>>>> {
721        let (mut measures, mut errs) = (vec![], vec![]);
722
723        for inserter in &self.inserters {
724            match inserter.instrument(id.clone()) {
725                Ok(ms) => measures.extend(ms),
726                Err(err) => errs.push(err),
727            }
728        }
729
730        if errs.is_empty() {
731            Ok(measures)
732        } else {
733            Err(MetricsError::Other(format!("{errs:?}")))
734        }
735    }
736}