opentelemetry_otlp/exporter/tonic/
mod.rs
1use std::env;
2use std::fmt::{Debug, Formatter};
3use std::str::FromStr;
4use std::time::Duration;
5
6use http::{HeaderMap, HeaderName, HeaderValue};
7use tonic::codec::CompressionEncoding;
8use tonic::metadata::{KeyAndValueRef, MetadataMap};
9use tonic::service::Interceptor;
10use tonic::transport::Channel;
11#[cfg(feature = "tls")]
12use tonic::transport::ClientTlsConfig;
13
14use super::{default_headers, parse_header_string, OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT};
15use crate::exporter::Compression;
16use crate::{
17 ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT,
18 OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT,
19};
20
21#[cfg(feature = "logs")]
22mod logs;
23
24#[cfg(feature = "metrics")]
25mod metrics;
26
27#[cfg(feature = "trace")]
28mod trace;
29
30#[derive(Debug, Default)]
34#[non_exhaustive]
35pub struct TonicConfig {
36 pub metadata: Option<MetadataMap>,
38
39 #[cfg(feature = "tls")]
41 pub tls_config: Option<ClientTlsConfig>,
42
43 pub compression: Option<Compression>,
45}
46
47impl TryFrom<Compression> for tonic::codec::CompressionEncoding {
48 type Error = crate::Error;
49
50 fn try_from(value: Compression) -> Result<Self, Self::Error> {
51 match value {
52 #[cfg(feature = "gzip-tonic")]
53 Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip),
54 #[cfg(not(feature = "gzip-tonic"))]
55 Compression::Gzip => Err(crate::Error::UnsupportedCompressionAlgorithm(
56 value.to_string(),
57 )),
58 }
59 }
60}
61
62fn resolve_compression(
63 tonic_config: &TonicConfig,
64 env_override: &str,
65) -> Result<Option<CompressionEncoding>, crate::Error> {
66 if let Some(compression) = tonic_config.compression {
67 Ok(Some(compression.try_into()?))
68 } else if let Ok(compression) = env::var(env_override) {
69 Ok(Some(compression.parse::<Compression>()?.try_into()?))
70 } else if let Ok(compression) = env::var(OTEL_EXPORTER_OTLP_COMPRESSION) {
71 Ok(Some(compression.parse::<Compression>()?.try_into()?))
72 } else {
73 Ok(None)
74 }
75}
76
77#[derive(Debug)]
116pub struct TonicExporterBuilder {
117 pub(crate) exporter_config: ExportConfig,
118 pub(crate) tonic_config: TonicConfig,
119 pub(crate) channel: Option<tonic::transport::Channel>,
120 pub(crate) interceptor: Option<BoxInterceptor>,
121}
122
123pub(crate) struct BoxInterceptor(Box<dyn Interceptor + Send + Sync>);
124impl tonic::service::Interceptor for BoxInterceptor {
125 fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
126 self.0.call(request)
127 }
128}
129
130impl Debug for BoxInterceptor {
131 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132 write!(f, "BoxInterceptor(..)")
133 }
134}
135
136impl Default for TonicExporterBuilder {
137 fn default() -> Self {
138 let tonic_config = TonicConfig {
139 metadata: Some(MetadataMap::from_headers(
140 (&default_headers())
141 .try_into()
142 .expect("Invalid tonic headers"),
143 )),
144 #[cfg(feature = "tls")]
145 tls_config: None,
146 compression: None,
147 };
148
149 TonicExporterBuilder {
150 exporter_config: ExportConfig {
151 protocol: crate::Protocol::Grpc,
152 ..Default::default()
153 },
154 tonic_config,
155 channel: Option::default(),
156 interceptor: Option::default(),
157 }
158 }
159}
160
161impl TonicExporterBuilder {
162 #[cfg(feature = "tls")]
164 pub fn with_tls_config(mut self, tls_config: ClientTlsConfig) -> Self {
165 self.tonic_config.tls_config = Some(tls_config);
166 self
167 }
168
169 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
171 let incoming_headers = metadata.into_headers();
173 let mut existing_headers = self
174 .tonic_config
175 .metadata
176 .unwrap_or_default()
177 .into_headers();
178 existing_headers.extend(incoming_headers);
179
180 self.tonic_config.metadata = Some(MetadataMap::from_headers(existing_headers));
181 self
182 }
183
184 pub fn with_compression(mut self, compression: Compression) -> Self {
186 self.tonic_config.compression = Some(compression);
187 self
188 }
189
190 pub fn with_channel(mut self, channel: tonic::transport::Channel) -> Self {
197 self.channel = Some(channel);
198 self
199 }
200
201 pub fn with_interceptor<I>(mut self, interceptor: I) -> Self
205 where
206 I: tonic::service::Interceptor + Clone + Send + Sync + 'static,
207 {
208 self.interceptor = Some(BoxInterceptor(Box::new(interceptor)));
209 self
210 }
211
212 fn build_channel(
213 self,
214 signal_endpoint_var: &str,
215 signal_timeout_var: &str,
216 signal_compression_var: &str,
217 signal_headers_var: &str,
218 ) -> Result<(Channel, BoxInterceptor, Option<CompressionEncoding>), crate::Error> {
219 let tonic_config = self.tonic_config;
220 let compression = resolve_compression(&tonic_config, signal_compression_var)?;
221
222 let headers_from_env = parse_headers_from_env(signal_headers_var);
223 let metadata = merge_metadata_with_headers_from_env(
224 tonic_config.metadata.unwrap_or_default(),
225 headers_from_env,
226 );
227
228 let add_metadata = move |mut req: tonic::Request<()>| {
229 for key_and_value in metadata.iter() {
230 match key_and_value {
231 KeyAndValueRef::Ascii(key, value) => {
232 req.metadata_mut().append(key, value.to_owned())
233 }
234 KeyAndValueRef::Binary(key, value) => {
235 req.metadata_mut().append_bin(key, value.to_owned())
236 }
237 };
238 }
239
240 Ok(req)
241 };
242
243 let interceptor = match self.interceptor {
244 Some(mut interceptor) => {
245 BoxInterceptor(Box::new(move |req| interceptor.call(add_metadata(req)?)))
246 }
247 None => BoxInterceptor(Box::new(add_metadata)),
248 };
249
250 if let Some(channel) = self.channel {
252 return Ok((channel, interceptor, compression));
253 }
254
255 let config = self.exporter_config;
256
257 let endpoint = match env::var(signal_endpoint_var)
263 .ok()
264 .or(env::var(OTEL_EXPORTER_OTLP_ENDPOINT).ok())
265 {
266 Some(val) => val,
267 None => {
268 if config.endpoint.is_empty() {
269 OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT.to_string()
270 } else {
271 config.endpoint
272 }
273 }
274 };
275
276 let endpoint = Channel::from_shared(endpoint).map_err(crate::Error::from)?;
277 let timeout = match env::var(signal_timeout_var)
278 .ok()
279 .or(env::var(OTEL_EXPORTER_OTLP_TIMEOUT).ok())
280 {
281 Some(val) => match val.parse() {
282 Ok(seconds) => Duration::from_secs(seconds),
283 Err(_) => config.timeout,
284 },
285 None => config.timeout,
286 };
287
288 #[cfg(feature = "tls")]
289 let channel = match tonic_config.tls_config {
290 Some(tls_config) => endpoint
291 .tls_config(tls_config)
292 .map_err(crate::Error::from)?,
293 None => endpoint,
294 }
295 .timeout(timeout)
296 .connect_lazy();
297
298 #[cfg(not(feature = "tls"))]
299 let channel = endpoint.timeout(timeout).connect_lazy();
300
301 Ok((channel, interceptor, compression))
302 }
303
304 #[cfg(feature = "logs")]
306 pub fn build_log_exporter(
307 self,
308 ) -> Result<crate::logs::LogExporter, opentelemetry::logs::LogError> {
309 use crate::exporter::tonic::logs::TonicLogsClient;
310
311 let (channel, interceptor, compression) = self.build_channel(
312 crate::logs::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
313 crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
314 crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
315 crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS,
316 )?;
317
318 let client = TonicLogsClient::new(channel, interceptor, compression);
319
320 Ok(crate::logs::LogExporter::new(client))
321 }
322
323 #[cfg(feature = "metrics")]
325 pub fn build_metrics_exporter(
326 self,
327 aggregation_selector: Box<dyn opentelemetry_sdk::metrics::reader::AggregationSelector>,
328 temporality_selector: Box<dyn opentelemetry_sdk::metrics::reader::TemporalitySelector>,
329 ) -> opentelemetry::metrics::Result<crate::MetricsExporter> {
330 use crate::MetricsExporter;
331 use metrics::TonicMetricsClient;
332
333 let (channel, interceptor, compression) = self.build_channel(
334 crate::metric::OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
335 crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
336 crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
337 crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS,
338 )?;
339
340 let client = TonicMetricsClient::new(channel, interceptor, compression);
341
342 Ok(MetricsExporter::new(
343 client,
344 temporality_selector,
345 aggregation_selector,
346 ))
347 }
348
349 #[cfg(feature = "trace")]
351 pub fn build_span_exporter(
352 self,
353 ) -> Result<crate::SpanExporter, opentelemetry::trace::TraceError> {
354 use crate::exporter::tonic::trace::TonicTracesClient;
355
356 let (channel, interceptor, compression) = self.build_channel(
357 crate::span::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
358 crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
359 crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
360 crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS,
361 )?;
362
363 let client = TonicTracesClient::new(channel, interceptor, compression);
364
365 Ok(crate::SpanExporter::new(client))
366 }
367}
368
369fn merge_metadata_with_headers_from_env(
370 metadata: MetadataMap,
371 headers_from_env: HeaderMap,
372) -> MetadataMap {
373 if headers_from_env.is_empty() {
374 metadata
375 } else {
376 let mut existing_headers: HeaderMap = metadata.into_headers();
377 existing_headers.extend(headers_from_env);
378
379 MetadataMap::from_headers(existing_headers)
380 }
381}
382
383fn parse_headers_from_env(signal_headers_var: &str) -> HeaderMap {
384 env::var(signal_headers_var)
385 .or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
386 .map(|input| {
387 parse_header_string(&input)
388 .filter_map(|(key, value)| {
389 Some((
390 HeaderName::from_str(key).ok()?,
391 HeaderValue::from_str(&value).ok()?,
392 ))
393 })
394 .collect::<HeaderMap>()
395 })
396 .unwrap_or_default()
397}
398
399#[cfg(test)]
400mod tests {
401 use crate::exporter::tests::run_env_test;
402 #[cfg(feature = "gzip-tonic")]
403 use crate::exporter::Compression;
404 use crate::TonicExporterBuilder;
405 use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS};
406 use http::{HeaderMap, HeaderName, HeaderValue};
407 use tonic::metadata::{MetadataMap, MetadataValue};
408
409 #[test]
410 fn test_with_metadata() {
411 let mut metadata = MetadataMap::new();
413 metadata.insert("foo", "bar".parse().unwrap());
414 let builder = TonicExporterBuilder::default().with_metadata(metadata);
415 let result = builder.tonic_config.metadata.unwrap();
416 let foo = result.get("foo").unwrap();
417 assert_eq!(foo, &MetadataValue::try_from("bar").unwrap());
418 assert!(result.get("User-Agent").is_some());
419
420 let mut metadata = MetadataMap::new();
422 metadata.insert("user-agent", "baz".parse().unwrap());
423 let builder = TonicExporterBuilder::default().with_metadata(metadata);
424 let result = builder.tonic_config.metadata.unwrap();
425 assert_eq!(
426 result.get("User-Agent").unwrap(),
427 &MetadataValue::try_from("baz").unwrap()
428 );
429 assert_eq!(
430 result.len(),
431 TonicExporterBuilder::default()
432 .tonic_config
433 .metadata
434 .unwrap()
435 .len()
436 );
437 }
438
439 #[test]
440 #[cfg(feature = "gzip-tonic")]
441 fn test_with_compression() {
442 let mut metadata = MetadataMap::new();
444 metadata.insert("foo", "bar".parse().unwrap());
445 let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
446 assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
447 }
448
449 #[test]
450 fn test_parse_headers_from_env() {
451 run_env_test(
452 vec![
453 (OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"),
454 (OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"),
455 ],
456 || {
457 assert_eq!(
458 super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS),
459 HeaderMap::from_iter([
460 (
461 HeaderName::from_static("k1"),
462 HeaderValue::from_static("v1")
463 ),
464 (
465 HeaderName::from_static("k2"),
466 HeaderValue::from_static("v2")
467 ),
468 ])
469 );
470
471 assert_eq!(
472 super::parse_headers_from_env("EMPTY_ENV"),
473 HeaderMap::from_iter([(
474 HeaderName::from_static("k3"),
475 HeaderValue::from_static("v3")
476 )])
477 );
478 },
479 )
480 }
481
482 #[test]
483 fn test_merge_metadata_with_headers_from_env() {
484 run_env_test(
485 vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")],
486 || {
487 let headers_from_env =
488 super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS);
489
490 let mut metadata = MetadataMap::new();
491 metadata.insert("foo", "bar".parse().unwrap());
492 metadata.insert("k1", "v0".parse().unwrap());
493
494 let result =
495 super::merge_metadata_with_headers_from_env(metadata, headers_from_env);
496
497 assert_eq!(
498 result.get("foo").unwrap(),
499 MetadataValue::from_static("bar")
500 );
501 assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1"));
502 assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2"));
503 },
504 );
505 }
506}