opentelemetry_otlp/exporter/tonic/
trace.rs
1use core::fmt;
2
3use futures_core::future::BoxFuture;
4use opentelemetry::trace::TraceError;
5use opentelemetry_proto::tonic::collector::trace::v1::{
6 trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
7};
8use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
9use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
10
11use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
12
13use super::BoxInterceptor;
14
15pub(crate) struct TonicTracesClient {
16 inner: Option<ClientInner>,
17 #[allow(dead_code)]
18 resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
20}
21
22struct ClientInner {
23 client: TraceServiceClient<Channel>,
24 interceptor: BoxInterceptor,
25}
26
27impl fmt::Debug for TonicTracesClient {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.write_str("TonicTracesClient")
30 }
31}
32
33impl TonicTracesClient {
34 pub(super) fn new(
35 channel: Channel,
36 interceptor: BoxInterceptor,
37 compression: Option<CompressionEncoding>,
38 ) -> Self {
39 let mut client = TraceServiceClient::new(channel);
40 if let Some(compression) = compression {
41 client = client
42 .send_compressed(compression)
43 .accept_compressed(compression);
44 }
45
46 TonicTracesClient {
47 inner: Some(ClientInner {
48 client,
49 interceptor,
50 }),
51 resource: Default::default(),
52 }
53 }
54}
55
56impl SpanExporter for TonicTracesClient {
57 fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
58 let (mut client, metadata, extensions) = match &mut self.inner {
59 Some(inner) => {
60 let (m, e, _) = match inner.interceptor.call(Request::new(())) {
61 Ok(res) => res.into_parts(),
62 Err(e) => {
63 return Box::pin(std::future::ready(Err(TraceError::Other(Box::new(e)))))
64 }
65 };
66 (inner.client.clone(), m, e)
67 }
68 None => {
69 return Box::pin(std::future::ready(Err(TraceError::Other(
70 "exporter is already shut down".into(),
71 ))))
72 }
73 };
74
75 let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);
76
77 Box::pin(async move {
78 client
79 .export(Request::from_parts(
80 metadata,
81 extensions,
82 ExportTraceServiceRequest { resource_spans },
83 ))
84 .await
85 .map_err(crate::Error::from)?;
86
87 Ok(())
88 })
89 }
90
91 fn shutdown(&mut self) {
92 let _ = self.inner.take();
93 }
94
95 fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
96 self.resource = resource.into();
97 }
98}