opentelemetry_otlp/exporter/tonic/
mod.rs

1use std::env;
2use std::fmt::{Debug, Formatter};
3use std::str::FromStr;
4use std::time::Duration;
5
6use http::{HeaderMap, HeaderName, HeaderValue};
7use tonic::codec::CompressionEncoding;
8use tonic::metadata::{KeyAndValueRef, MetadataMap};
9use tonic::service::Interceptor;
10use tonic::transport::Channel;
11#[cfg(feature = "tls")]
12use tonic::transport::ClientTlsConfig;
13
14use super::{default_headers, parse_header_string, OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT};
15use crate::exporter::Compression;
16use crate::{
17    ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT,
18    OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT,
19};
20
21#[cfg(feature = "logs")]
22mod logs;
23
24#[cfg(feature = "metrics")]
25mod metrics;
26
27#[cfg(feature = "trace")]
28mod trace;
29
30/// Configuration for [tonic]
31///
32/// [tonic]: https://github.com/hyperium/tonic
33#[derive(Debug, Default)]
34#[non_exhaustive]
35pub struct TonicConfig {
36    /// Custom metadata entries to send to the collector.
37    pub metadata: Option<MetadataMap>,
38
39    /// TLS settings for the collector endpoint.
40    #[cfg(feature = "tls")]
41    pub tls_config: Option<ClientTlsConfig>,
42
43    /// The compression algorithm to use when communicating with the collector.
44    pub compression: Option<Compression>,
45}
46
47impl TryFrom<Compression> for tonic::codec::CompressionEncoding {
48    type Error = crate::Error;
49
50    fn try_from(value: Compression) -> Result<Self, Self::Error> {
51        match value {
52            #[cfg(feature = "gzip-tonic")]
53            Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip),
54            #[cfg(not(feature = "gzip-tonic"))]
55            Compression::Gzip => Err(crate::Error::UnsupportedCompressionAlgorithm(
56                value.to_string(),
57            )),
58        }
59    }
60}
61
62fn resolve_compression(
63    tonic_config: &TonicConfig,
64    env_override: &str,
65) -> Result<Option<CompressionEncoding>, crate::Error> {
66    if let Some(compression) = tonic_config.compression {
67        Ok(Some(compression.try_into()?))
68    } else if let Ok(compression) = env::var(env_override) {
69        Ok(Some(compression.parse::<Compression>()?.try_into()?))
70    } else if let Ok(compression) = env::var(OTEL_EXPORTER_OTLP_COMPRESSION) {
71        Ok(Some(compression.parse::<Compression>()?.try_into()?))
72    } else {
73        Ok(None)
74    }
75}
76
77/// Configuration for the [tonic] OTLP GRPC exporter.
78///
79/// It allows you to
80/// - add additional metadata
81/// - set tls config (via the  `tls` feature)
82/// - specify custom [channel]s
83///
84/// [tonic]: <https://github.com/hyperium/tonic>
85/// [channel]: tonic::transport::Channel
86///
87/// ## Examples
88///
89/// ```no_run
90/// # #[cfg(feature="metrics")]
91/// use opentelemetry_sdk::metrics::reader::{
92///     DefaultAggregationSelector, DefaultTemporalitySelector,
93/// };
94///
95/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
96/// // Create a span exporter you can use to when configuring tracer providers
97/// # #[cfg(feature="trace")]
98/// let span_exporter = opentelemetry_otlp::new_exporter().tonic().build_span_exporter()?;
99///
100/// // Create a metrics exporter you can use when configuring meter providers
101/// # #[cfg(feature="metrics")]
102/// let metrics_exporter = opentelemetry_otlp::new_exporter()
103///     .tonic()
104///     .build_metrics_exporter(
105///         Box::new(DefaultAggregationSelector::new()),
106///         Box::new(DefaultTemporalitySelector::new()),
107///     )?;
108///
109/// // Create a log exporter you can use when configuring logger providers
110/// # #[cfg(feature="logs")]
111/// let log_exporter = opentelemetry_otlp::new_exporter().tonic().build_log_exporter()?;
112/// # Ok(())
113/// # }
114/// ```
115#[derive(Debug)]
116pub struct TonicExporterBuilder {
117    pub(crate) exporter_config: ExportConfig,
118    pub(crate) tonic_config: TonicConfig,
119    pub(crate) channel: Option<tonic::transport::Channel>,
120    pub(crate) interceptor: Option<BoxInterceptor>,
121}
122
123pub(crate) struct BoxInterceptor(Box<dyn Interceptor + Send + Sync>);
124impl tonic::service::Interceptor for BoxInterceptor {
125    fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
126        self.0.call(request)
127    }
128}
129
130impl Debug for BoxInterceptor {
131    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132        write!(f, "BoxInterceptor(..)")
133    }
134}
135
136impl Default for TonicExporterBuilder {
137    fn default() -> Self {
138        let tonic_config = TonicConfig {
139            metadata: Some(MetadataMap::from_headers(
140                (&default_headers())
141                    .try_into()
142                    .expect("Invalid tonic headers"),
143            )),
144            #[cfg(feature = "tls")]
145            tls_config: None,
146            compression: None,
147        };
148
149        TonicExporterBuilder {
150            exporter_config: ExportConfig {
151                protocol: crate::Protocol::Grpc,
152                ..Default::default()
153            },
154            tonic_config,
155            channel: Option::default(),
156            interceptor: Option::default(),
157        }
158    }
159}
160
161impl TonicExporterBuilder {
162    /// Set the TLS settings for the collector endpoint.
163    #[cfg(feature = "tls")]
164    pub fn with_tls_config(mut self, tls_config: ClientTlsConfig) -> Self {
165        self.tonic_config.tls_config = Some(tls_config);
166        self
167    }
168
169    /// Set custom metadata entries to send to the collector.
170    pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
171        // extending metadata maps is harder than just casting back/forth
172        let incoming_headers = metadata.into_headers();
173        let mut existing_headers = self
174            .tonic_config
175            .metadata
176            .unwrap_or_default()
177            .into_headers();
178        existing_headers.extend(incoming_headers);
179
180        self.tonic_config.metadata = Some(MetadataMap::from_headers(existing_headers));
181        self
182    }
183
184    /// Set the compression algorithm to use when communicating with the collector.
185    pub fn with_compression(mut self, compression: Compression) -> Self {
186        self.tonic_config.compression = Some(compression);
187        self
188    }
189
190    /// Use `channel` as tonic's transport channel.
191    /// this will override tls config and should only be used
192    /// when working with non-HTTP transports.
193    ///
194    /// Users MUST make sure the [`ExportConfig::timeout`] is
195    /// the same as the channel's timeout.
196    pub fn with_channel(mut self, channel: tonic::transport::Channel) -> Self {
197        self.channel = Some(channel);
198        self
199    }
200
201    /// Use a custom `interceptor` to modify each outbound request.
202    /// this can be used to modify the grpc metadata, for example
203    /// to inject auth tokens.
204    pub fn with_interceptor<I>(mut self, interceptor: I) -> Self
205    where
206        I: tonic::service::Interceptor + Clone + Send + Sync + 'static,
207    {
208        self.interceptor = Some(BoxInterceptor(Box::new(interceptor)));
209        self
210    }
211
212    fn build_channel(
213        self,
214        signal_endpoint_var: &str,
215        signal_timeout_var: &str,
216        signal_compression_var: &str,
217        signal_headers_var: &str,
218    ) -> Result<(Channel, BoxInterceptor, Option<CompressionEncoding>), crate::Error> {
219        let tonic_config = self.tonic_config;
220        let compression = resolve_compression(&tonic_config, signal_compression_var)?;
221
222        let headers_from_env = parse_headers_from_env(signal_headers_var);
223        let metadata = merge_metadata_with_headers_from_env(
224            tonic_config.metadata.unwrap_or_default(),
225            headers_from_env,
226        );
227
228        let add_metadata = move |mut req: tonic::Request<()>| {
229            for key_and_value in metadata.iter() {
230                match key_and_value {
231                    KeyAndValueRef::Ascii(key, value) => {
232                        req.metadata_mut().append(key, value.to_owned())
233                    }
234                    KeyAndValueRef::Binary(key, value) => {
235                        req.metadata_mut().append_bin(key, value.to_owned())
236                    }
237                };
238            }
239
240            Ok(req)
241        };
242
243        let interceptor = match self.interceptor {
244            Some(mut interceptor) => {
245                BoxInterceptor(Box::new(move |req| interceptor.call(add_metadata(req)?)))
246            }
247            None => BoxInterceptor(Box::new(add_metadata)),
248        };
249
250        // If a custom channel was provided, use that channel instead of creating one
251        if let Some(channel) = self.channel {
252            return Ok((channel, interceptor, compression));
253        }
254
255        let config = self.exporter_config;
256
257        // resolving endpoint string
258        // grpc doesn't have a "path" like http(See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md)
259        // the path of grpc calls are based on the protobuf service definition
260        // so we won't append one for default grpc endpoints
261        // If users for some reason want to use a custom path, they can use env var or builder to pass it
262        let endpoint = match env::var(signal_endpoint_var)
263            .ok()
264            .or(env::var(OTEL_EXPORTER_OTLP_ENDPOINT).ok())
265        {
266            Some(val) => val,
267            None => {
268                if config.endpoint.is_empty() {
269                    OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT.to_string()
270                } else {
271                    config.endpoint
272                }
273            }
274        };
275
276        let endpoint = Channel::from_shared(endpoint).map_err(crate::Error::from)?;
277        let timeout = match env::var(signal_timeout_var)
278            .ok()
279            .or(env::var(OTEL_EXPORTER_OTLP_TIMEOUT).ok())
280        {
281            Some(val) => match val.parse() {
282                Ok(seconds) => Duration::from_secs(seconds),
283                Err(_) => config.timeout,
284            },
285            None => config.timeout,
286        };
287
288        #[cfg(feature = "tls")]
289        let channel = match tonic_config.tls_config {
290            Some(tls_config) => endpoint
291                .tls_config(tls_config)
292                .map_err(crate::Error::from)?,
293            None => endpoint,
294        }
295        .timeout(timeout)
296        .connect_lazy();
297
298        #[cfg(not(feature = "tls"))]
299        let channel = endpoint.timeout(timeout).connect_lazy();
300
301        Ok((channel, interceptor, compression))
302    }
303
304    /// Build a new tonic log exporter
305    #[cfg(feature = "logs")]
306    pub fn build_log_exporter(
307        self,
308    ) -> Result<crate::logs::LogExporter, opentelemetry::logs::LogError> {
309        use crate::exporter::tonic::logs::TonicLogsClient;
310
311        let (channel, interceptor, compression) = self.build_channel(
312            crate::logs::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
313            crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
314            crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
315            crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS,
316        )?;
317
318        let client = TonicLogsClient::new(channel, interceptor, compression);
319
320        Ok(crate::logs::LogExporter::new(client))
321    }
322
323    /// Build a new tonic metrics exporter
324    #[cfg(feature = "metrics")]
325    pub fn build_metrics_exporter(
326        self,
327        aggregation_selector: Box<dyn opentelemetry_sdk::metrics::reader::AggregationSelector>,
328        temporality_selector: Box<dyn opentelemetry_sdk::metrics::reader::TemporalitySelector>,
329    ) -> opentelemetry::metrics::Result<crate::MetricsExporter> {
330        use crate::MetricsExporter;
331        use metrics::TonicMetricsClient;
332
333        let (channel, interceptor, compression) = self.build_channel(
334            crate::metric::OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
335            crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
336            crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
337            crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS,
338        )?;
339
340        let client = TonicMetricsClient::new(channel, interceptor, compression);
341
342        Ok(MetricsExporter::new(
343            client,
344            temporality_selector,
345            aggregation_selector,
346        ))
347    }
348
349    /// Build a new tonic span exporter
350    #[cfg(feature = "trace")]
351    pub fn build_span_exporter(
352        self,
353    ) -> Result<crate::SpanExporter, opentelemetry::trace::TraceError> {
354        use crate::exporter::tonic::trace::TonicTracesClient;
355
356        let (channel, interceptor, compression) = self.build_channel(
357            crate::span::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
358            crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
359            crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
360            crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS,
361        )?;
362
363        let client = TonicTracesClient::new(channel, interceptor, compression);
364
365        Ok(crate::SpanExporter::new(client))
366    }
367}
368
369fn merge_metadata_with_headers_from_env(
370    metadata: MetadataMap,
371    headers_from_env: HeaderMap,
372) -> MetadataMap {
373    if headers_from_env.is_empty() {
374        metadata
375    } else {
376        let mut existing_headers: HeaderMap = metadata.into_headers();
377        existing_headers.extend(headers_from_env);
378
379        MetadataMap::from_headers(existing_headers)
380    }
381}
382
383fn parse_headers_from_env(signal_headers_var: &str) -> HeaderMap {
384    env::var(signal_headers_var)
385        .or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
386        .map(|input| {
387            parse_header_string(&input)
388                .filter_map(|(key, value)| {
389                    Some((
390                        HeaderName::from_str(key).ok()?,
391                        HeaderValue::from_str(&value).ok()?,
392                    ))
393                })
394                .collect::<HeaderMap>()
395        })
396        .unwrap_or_default()
397}
398
399#[cfg(test)]
400mod tests {
401    use crate::exporter::tests::run_env_test;
402    #[cfg(feature = "gzip-tonic")]
403    use crate::exporter::Compression;
404    use crate::TonicExporterBuilder;
405    use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS};
406    use http::{HeaderMap, HeaderName, HeaderValue};
407    use tonic::metadata::{MetadataMap, MetadataValue};
408
409    #[test]
410    fn test_with_metadata() {
411        // metadata should merge with the current one with priority instead of just replacing it
412        let mut metadata = MetadataMap::new();
413        metadata.insert("foo", "bar".parse().unwrap());
414        let builder = TonicExporterBuilder::default().with_metadata(metadata);
415        let result = builder.tonic_config.metadata.unwrap();
416        let foo = result.get("foo").unwrap();
417        assert_eq!(foo, &MetadataValue::try_from("bar").unwrap());
418        assert!(result.get("User-Agent").is_some());
419
420        // metadata should override entries with the same key in the default one
421        let mut metadata = MetadataMap::new();
422        metadata.insert("user-agent", "baz".parse().unwrap());
423        let builder = TonicExporterBuilder::default().with_metadata(metadata);
424        let result = builder.tonic_config.metadata.unwrap();
425        assert_eq!(
426            result.get("User-Agent").unwrap(),
427            &MetadataValue::try_from("baz").unwrap()
428        );
429        assert_eq!(
430            result.len(),
431            TonicExporterBuilder::default()
432                .tonic_config
433                .metadata
434                .unwrap()
435                .len()
436        );
437    }
438
439    #[test]
440    #[cfg(feature = "gzip-tonic")]
441    fn test_with_compression() {
442        // metadata should merge with the current one with priority instead of just replacing it
443        let mut metadata = MetadataMap::new();
444        metadata.insert("foo", "bar".parse().unwrap());
445        let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
446        assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
447    }
448
449    #[test]
450    fn test_parse_headers_from_env() {
451        run_env_test(
452            vec![
453                (OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"),
454                (OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"),
455            ],
456            || {
457                assert_eq!(
458                    super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS),
459                    HeaderMap::from_iter([
460                        (
461                            HeaderName::from_static("k1"),
462                            HeaderValue::from_static("v1")
463                        ),
464                        (
465                            HeaderName::from_static("k2"),
466                            HeaderValue::from_static("v2")
467                        ),
468                    ])
469                );
470
471                assert_eq!(
472                    super::parse_headers_from_env("EMPTY_ENV"),
473                    HeaderMap::from_iter([(
474                        HeaderName::from_static("k3"),
475                        HeaderValue::from_static("v3")
476                    )])
477                );
478            },
479        )
480    }
481
482    #[test]
483    fn test_merge_metadata_with_headers_from_env() {
484        run_env_test(
485            vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")],
486            || {
487                let headers_from_env =
488                    super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS);
489
490                let mut metadata = MetadataMap::new();
491                metadata.insert("foo", "bar".parse().unwrap());
492                metadata.insert("k1", "v0".parse().unwrap());
493
494                let result =
495                    super::merge_metadata_with_headers_from_env(metadata, headers_from_env);
496
497                assert_eq!(
498                    result.get("foo").unwrap(),
499                    MetadataValue::from_static("bar")
500                );
501                assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1"));
502                assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2"));
503            },
504        );
505    }
506}