opentelemetry_otlp/exporter/tonic/
trace.rs

1use core::fmt;
2
3use futures_core::future::BoxFuture;
4use opentelemetry::trace::TraceError;
5use opentelemetry_proto::tonic::collector::trace::v1::{
6    trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
7};
8use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
9use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
10
11use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
12
13use super::BoxInterceptor;
14
15pub(crate) struct TonicTracesClient {
16    inner: Option<ClientInner>,
17    #[allow(dead_code)]
18    // <allow dead> would be removed once we support set_resource for metrics.
19    resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
20}
21
22struct ClientInner {
23    client: TraceServiceClient<Channel>,
24    interceptor: BoxInterceptor,
25}
26
27impl fmt::Debug for TonicTracesClient {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        f.write_str("TonicTracesClient")
30    }
31}
32
33impl TonicTracesClient {
34    pub(super) fn new(
35        channel: Channel,
36        interceptor: BoxInterceptor,
37        compression: Option<CompressionEncoding>,
38    ) -> Self {
39        let mut client = TraceServiceClient::new(channel);
40        if let Some(compression) = compression {
41            client = client
42                .send_compressed(compression)
43                .accept_compressed(compression);
44        }
45
46        TonicTracesClient {
47            inner: Some(ClientInner {
48                client,
49                interceptor,
50            }),
51            resource: Default::default(),
52        }
53    }
54}
55
56impl SpanExporter for TonicTracesClient {
57    fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
58        let (mut client, metadata, extensions) = match &mut self.inner {
59            Some(inner) => {
60                let (m, e, _) = match inner.interceptor.call(Request::new(())) {
61                    Ok(res) => res.into_parts(),
62                    Err(e) => {
63                        return Box::pin(std::future::ready(Err(TraceError::Other(Box::new(e)))))
64                    }
65                };
66                (inner.client.clone(), m, e)
67            }
68            None => {
69                return Box::pin(std::future::ready(Err(TraceError::Other(
70                    "exporter is already shut down".into(),
71                ))))
72            }
73        };
74
75        let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);
76
77        Box::pin(async move {
78            client
79                .export(Request::from_parts(
80                    metadata,
81                    extensions,
82                    ExportTraceServiceRequest { resource_spans },
83                ))
84                .await
85                .map_err(crate::Error::from)?;
86
87            Ok(())
88        })
89    }
90
91    fn shutdown(&mut self) {
92        let _ = self.inner.take();
93    }
94
95    fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
96        self.resource = resource.into();
97    }
98}