opentelemetry_otlp/exporter/tonic/
metrics.rs
1use core::fmt;
2use std::sync::Mutex;
3
4use async_trait::async_trait;
5use opentelemetry::metrics::{MetricsError, Result};
6use opentelemetry_proto::tonic::collector::metrics::v1::{
7 metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
8};
9use opentelemetry_sdk::metrics::data::ResourceMetrics;
10use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
11
12use super::BoxInterceptor;
13use crate::metric::MetricsClient;
14
15pub(crate) struct TonicMetricsClient {
16 inner: Mutex<Option<ClientInner>>,
17}
18
19struct ClientInner {
20 client: MetricsServiceClient<Channel>,
21 interceptor: BoxInterceptor,
22}
23
24impl fmt::Debug for TonicMetricsClient {
25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26 f.write_str("TonicMetricsClient")
27 }
28}
29
30impl TonicMetricsClient {
31 pub(super) fn new(
32 channel: Channel,
33 interceptor: BoxInterceptor,
34 compression: Option<CompressionEncoding>,
35 ) -> Self {
36 let mut client = MetricsServiceClient::new(channel);
37 if let Some(compression) = compression {
38 client = client
39 .send_compressed(compression)
40 .accept_compressed(compression);
41 }
42
43 TonicMetricsClient {
44 inner: Mutex::new(Some(ClientInner {
45 client,
46 interceptor,
47 })),
48 }
49 }
50}
51
52#[async_trait]
53impl MetricsClient for TonicMetricsClient {
54 async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
55 let (mut client, metadata, extensions) =
56 self.inner
57 .lock()
58 .map_err(Into::into)
59 .and_then(|mut inner| match &mut *inner {
60 Some(inner) => {
61 let (m, e, _) = inner
62 .interceptor
63 .call(Request::new(()))
64 .map_err(|e| {
65 MetricsError::Other(format!(
66 "unexpected status while exporting {e:?}"
67 ))
68 })?
69 .into_parts();
70 Ok((inner.client.clone(), m, e))
71 }
72 None => Err(MetricsError::Other("exporter is already shut down".into())),
73 })?;
74
75 client
76 .export(Request::from_parts(
77 metadata,
78 extensions,
79 ExportMetricsServiceRequest::from(&*metrics),
80 ))
81 .await
82 .map_err(crate::Error::from)?;
83
84 Ok(())
85 }
86
87 fn shutdown(&self) -> Result<()> {
88 let _ = self.inner.lock()?.take();
89
90 Ok(())
91 }
92}