opentelemetry_otlp/exporter/tonic/
metrics.rs

1use core::fmt;
2use std::sync::Mutex;
3
4use async_trait::async_trait;
5use opentelemetry::metrics::{MetricsError, Result};
6use opentelemetry_proto::tonic::collector::metrics::v1::{
7    metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
8};
9use opentelemetry_sdk::metrics::data::ResourceMetrics;
10use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
11
12use super::BoxInterceptor;
13use crate::metric::MetricsClient;
14
15pub(crate) struct TonicMetricsClient {
16    inner: Mutex<Option<ClientInner>>,
17}
18
19struct ClientInner {
20    client: MetricsServiceClient<Channel>,
21    interceptor: BoxInterceptor,
22}
23
24impl fmt::Debug for TonicMetricsClient {
25    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26        f.write_str("TonicMetricsClient")
27    }
28}
29
30impl TonicMetricsClient {
31    pub(super) fn new(
32        channel: Channel,
33        interceptor: BoxInterceptor,
34        compression: Option<CompressionEncoding>,
35    ) -> Self {
36        let mut client = MetricsServiceClient::new(channel);
37        if let Some(compression) = compression {
38            client = client
39                .send_compressed(compression)
40                .accept_compressed(compression);
41        }
42
43        TonicMetricsClient {
44            inner: Mutex::new(Some(ClientInner {
45                client,
46                interceptor,
47            })),
48        }
49    }
50}
51
52#[async_trait]
53impl MetricsClient for TonicMetricsClient {
54    async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
55        let (mut client, metadata, extensions) =
56            self.inner
57                .lock()
58                .map_err(Into::into)
59                .and_then(|mut inner| match &mut *inner {
60                    Some(inner) => {
61                        let (m, e, _) = inner
62                            .interceptor
63                            .call(Request::new(()))
64                            .map_err(|e| {
65                                MetricsError::Other(format!(
66                                    "unexpected status while exporting {e:?}"
67                                ))
68                            })?
69                            .into_parts();
70                        Ok((inner.client.clone(), m, e))
71                    }
72                    None => Err(MetricsError::Other("exporter is already shut down".into())),
73                })?;
74
75        client
76            .export(Request::from_parts(
77                metadata,
78                extensions,
79                ExportMetricsServiceRequest::from(&*metrics),
80            ))
81            .await
82            .map_err(crate::Error::from)?;
83
84        Ok(())
85    }
86
87    fn shutdown(&self) -> Result<()> {
88        let _ = self.inner.lock()?.take();
89
90        Ok(())
91    }
92}