opentelemetry_proto/transform/
metrics.rs

1// The prost currently will generate a non optional deprecated field for labels.
2// We cannot assign value to it otherwise clippy will complain.
3// We cannot ignore it as it's not an optional field.
4// We can remove this after we removed the labels field from proto.
5#[allow(deprecated)]
6#[cfg(feature = "gen-tonic-messages")]
7pub mod tonic {
8    use std::any::Any;
9    use std::fmt;
10
11    use opentelemetry::{global, metrics::MetricsError, Key, Value};
12    use opentelemetry_sdk::metrics::data::{
13        self, Exemplar as SdkExemplar, ExponentialHistogram as SdkExponentialHistogram,
14        Gauge as SdkGauge, Histogram as SdkHistogram, Metric as SdkMetric,
15        ScopeMetrics as SdkScopeMetrics, Sum as SdkSum, Temporality,
16    };
17    use opentelemetry_sdk::Resource as SdkResource;
18
19    use crate::proto::tonic::{
20        collector::metrics::v1::ExportMetricsServiceRequest,
21        common::v1::KeyValue,
22        metrics::v1::{
23            exemplar, exemplar::Value as TonicExemplarValue,
24            exponential_histogram_data_point::Buckets as TonicBuckets,
25            metric::Data as TonicMetricData, number_data_point,
26            number_data_point::Value as TonicDataPointValue,
27            AggregationTemporality as TonicTemporality, AggregationTemporality,
28            DataPointFlags as TonicDataPointFlags, Exemplar as TonicExemplar,
29            ExponentialHistogram as TonicExponentialHistogram,
30            ExponentialHistogramDataPoint as TonicExponentialHistogramDataPoint,
31            Gauge as TonicGauge, Histogram as TonicHistogram,
32            HistogramDataPoint as TonicHistogramDataPoint, Metric as TonicMetric,
33            NumberDataPoint as TonicNumberDataPoint, ResourceMetrics as TonicResourceMetrics,
34            ScopeMetrics as TonicScopeMetrics, Sum as TonicSum,
35        },
36        resource::v1::Resource as TonicResource,
37    };
38    use crate::transform::common::to_nanos;
39
40    impl From<u64> for exemplar::Value {
41        fn from(value: u64) -> Self {
42            exemplar::Value::AsInt(i64::try_from(value).unwrap_or_default())
43        }
44    }
45
46    impl From<i64> for exemplar::Value {
47        fn from(value: i64) -> Self {
48            exemplar::Value::AsInt(value)
49        }
50    }
51
52    impl From<f64> for exemplar::Value {
53        fn from(value: f64) -> Self {
54            exemplar::Value::AsDouble(value)
55        }
56    }
57
58    impl From<u64> for number_data_point::Value {
59        fn from(value: u64) -> Self {
60            number_data_point::Value::AsInt(i64::try_from(value).unwrap_or_default())
61        }
62    }
63
64    impl From<i64> for number_data_point::Value {
65        fn from(value: i64) -> Self {
66            number_data_point::Value::AsInt(value)
67        }
68    }
69
70    impl From<f64> for number_data_point::Value {
71        fn from(value: f64) -> Self {
72            number_data_point::Value::AsDouble(value)
73        }
74    }
75
76    impl From<(&Key, &Value)> for KeyValue {
77        fn from(kv: (&Key, &Value)) -> Self {
78            KeyValue {
79                key: kv.0.to_string(),
80                value: Some(kv.1.clone().into()),
81            }
82        }
83    }
84
85    impl From<&opentelemetry::KeyValue> for KeyValue {
86        fn from(kv: &opentelemetry::KeyValue) -> Self {
87            KeyValue {
88                key: kv.key.to_string(),
89                value: Some(kv.value.clone().into()),
90            }
91        }
92    }
93
94    impl From<Temporality> for AggregationTemporality {
95        fn from(temporality: Temporality) -> Self {
96            match temporality {
97                Temporality::Cumulative => AggregationTemporality::Cumulative,
98                Temporality::Delta => AggregationTemporality::Delta,
99                other => {
100                    opentelemetry::global::handle_error(MetricsError::Other(format!(
101                        "Unknown temporality {:?}, using default instead.",
102                        other
103                    )));
104                    AggregationTemporality::Cumulative
105                }
106            }
107        }
108    }
109
110    impl From<&data::ResourceMetrics> for ExportMetricsServiceRequest {
111        fn from(rm: &data::ResourceMetrics) -> Self {
112            ExportMetricsServiceRequest {
113                resource_metrics: vec![TonicResourceMetrics {
114                    resource: Some((&rm.resource).into()),
115                    scope_metrics: rm.scope_metrics.iter().map(Into::into).collect(),
116                    schema_url: rm.resource.schema_url().map(Into::into).unwrap_or_default(),
117                }],
118            }
119        }
120    }
121
122    impl From<&SdkResource> for TonicResource {
123        fn from(resource: &SdkResource) -> Self {
124            TonicResource {
125                attributes: resource.iter().map(Into::into).collect(),
126                dropped_attributes_count: 0,
127            }
128        }
129    }
130
131    impl From<&SdkScopeMetrics> for TonicScopeMetrics {
132        fn from(sm: &SdkScopeMetrics) -> Self {
133            TonicScopeMetrics {
134                scope: Some((&sm.scope, None).into()),
135                metrics: sm.metrics.iter().map(Into::into).collect(),
136                schema_url: sm
137                    .scope
138                    .schema_url
139                    .as_ref()
140                    .map(ToString::to_string)
141                    .unwrap_or_default(),
142            }
143        }
144    }
145
146    impl From<&SdkMetric> for TonicMetric {
147        fn from(metric: &SdkMetric) -> Self {
148            TonicMetric {
149                name: metric.name.to_string(),
150                description: metric.description.to_string(),
151                unit: metric.unit.to_string(),
152                metadata: vec![], // internal and currently unused
153                data: metric.data.as_any().try_into().ok(),
154            }
155        }
156    }
157
158    impl TryFrom<&dyn Any> for TonicMetricData {
159        type Error = ();
160
161        fn try_from(data: &dyn Any) -> Result<Self, Self::Error> {
162            if let Some(hist) = data.downcast_ref::<SdkHistogram<i64>>() {
163                Ok(TonicMetricData::Histogram(hist.into()))
164            } else if let Some(hist) = data.downcast_ref::<SdkHistogram<u64>>() {
165                Ok(TonicMetricData::Histogram(hist.into()))
166            } else if let Some(hist) = data.downcast_ref::<SdkHistogram<f64>>() {
167                Ok(TonicMetricData::Histogram(hist.into()))
168            } else if let Some(hist) = data.downcast_ref::<SdkExponentialHistogram<i64>>() {
169                Ok(TonicMetricData::ExponentialHistogram(hist.into()))
170            } else if let Some(hist) = data.downcast_ref::<SdkExponentialHistogram<u64>>() {
171                Ok(TonicMetricData::ExponentialHistogram(hist.into()))
172            } else if let Some(hist) = data.downcast_ref::<SdkExponentialHistogram<f64>>() {
173                Ok(TonicMetricData::ExponentialHistogram(hist.into()))
174            } else if let Some(sum) = data.downcast_ref::<SdkSum<u64>>() {
175                Ok(TonicMetricData::Sum(sum.into()))
176            } else if let Some(sum) = data.downcast_ref::<SdkSum<i64>>() {
177                Ok(TonicMetricData::Sum(sum.into()))
178            } else if let Some(sum) = data.downcast_ref::<SdkSum<f64>>() {
179                Ok(TonicMetricData::Sum(sum.into()))
180            } else if let Some(gauge) = data.downcast_ref::<SdkGauge<u64>>() {
181                Ok(TonicMetricData::Gauge(gauge.into()))
182            } else if let Some(gauge) = data.downcast_ref::<SdkGauge<i64>>() {
183                Ok(TonicMetricData::Gauge(gauge.into()))
184            } else if let Some(gauge) = data.downcast_ref::<SdkGauge<f64>>() {
185                Ok(TonicMetricData::Gauge(gauge.into()))
186            } else {
187                global::handle_error(MetricsError::Other("unknown aggregator".into()));
188                Err(())
189            }
190        }
191    }
192
193    trait Numeric: Into<TonicExemplarValue> + Into<TonicDataPointValue> + Copy {
194        // lossy at large values for u64 and i64 but otlp histograms only handle float values
195        fn into_f64(self) -> f64;
196    }
197
198    impl Numeric for u64 {
199        fn into_f64(self) -> f64 {
200            self as f64
201        }
202    }
203
204    impl Numeric for i64 {
205        fn into_f64(self) -> f64 {
206            self as f64
207        }
208    }
209
210    impl Numeric for f64 {
211        fn into_f64(self) -> f64 {
212            self
213        }
214    }
215
216    impl<T> From<&SdkHistogram<T>> for TonicHistogram
217    where
218        T: Numeric,
219    {
220        fn from(hist: &SdkHistogram<T>) -> Self {
221            TonicHistogram {
222                data_points: hist
223                    .data_points
224                    .iter()
225                    .map(|dp| TonicHistogramDataPoint {
226                        attributes: dp.attributes.iter().map(Into::into).collect(),
227                        start_time_unix_nano: to_nanos(dp.start_time),
228                        time_unix_nano: to_nanos(dp.time),
229                        count: dp.count,
230                        sum: Some(dp.sum.into_f64()),
231                        bucket_counts: dp.bucket_counts.clone(),
232                        explicit_bounds: dp.bounds.clone(),
233                        exemplars: dp.exemplars.iter().map(Into::into).collect(),
234                        flags: TonicDataPointFlags::default() as u32,
235                        min: dp.min.map(Numeric::into_f64),
236                        max: dp.max.map(Numeric::into_f64),
237                    })
238                    .collect(),
239                aggregation_temporality: TonicTemporality::from(hist.temporality).into(),
240            }
241        }
242    }
243
244    impl<T> From<&SdkExponentialHistogram<T>> for TonicExponentialHistogram
245    where
246        T: Numeric,
247    {
248        fn from(hist: &SdkExponentialHistogram<T>) -> Self {
249            TonicExponentialHistogram {
250                data_points: hist
251                    .data_points
252                    .iter()
253                    .map(|dp| TonicExponentialHistogramDataPoint {
254                        attributes: dp.attributes.iter().map(Into::into).collect(),
255                        start_time_unix_nano: to_nanos(dp.start_time),
256                        time_unix_nano: to_nanos(dp.time),
257                        count: dp.count as u64,
258                        sum: Some(dp.sum.into_f64()),
259                        scale: dp.scale.into(),
260                        zero_count: dp.zero_count,
261                        positive: Some(TonicBuckets {
262                            offset: dp.positive_bucket.offset,
263                            bucket_counts: dp.positive_bucket.counts.clone(),
264                        }),
265                        negative: Some(TonicBuckets {
266                            offset: dp.negative_bucket.offset,
267                            bucket_counts: dp.negative_bucket.counts.clone(),
268                        }),
269                        flags: TonicDataPointFlags::default() as u32,
270                        exemplars: dp.exemplars.iter().map(Into::into).collect(),
271                        min: dp.min.map(Numeric::into_f64),
272                        max: dp.max.map(Numeric::into_f64),
273                        zero_threshold: dp.zero_threshold,
274                    })
275                    .collect(),
276                aggregation_temporality: TonicTemporality::from(hist.temporality).into(),
277            }
278        }
279    }
280
281    impl<T> From<&SdkSum<T>> for TonicSum
282    where
283        T: fmt::Debug + Into<TonicExemplarValue> + Into<TonicDataPointValue> + Copy,
284    {
285        fn from(sum: &SdkSum<T>) -> Self {
286            TonicSum {
287                data_points: sum
288                    .data_points
289                    .iter()
290                    .map(|dp| TonicNumberDataPoint {
291                        attributes: dp.attributes.iter().map(Into::into).collect(),
292                        start_time_unix_nano: dp.start_time.map(to_nanos).unwrap_or_default(),
293                        time_unix_nano: dp.time.map(to_nanos).unwrap_or_default(),
294                        exemplars: dp.exemplars.iter().map(Into::into).collect(),
295                        flags: TonicDataPointFlags::default() as u32,
296                        value: Some(dp.value.into()),
297                    })
298                    .collect(),
299                aggregation_temporality: TonicTemporality::from(sum.temporality).into(),
300                is_monotonic: sum.is_monotonic,
301            }
302        }
303    }
304
305    impl<T> From<&SdkGauge<T>> for TonicGauge
306    where
307        T: fmt::Debug + Into<TonicExemplarValue> + Into<TonicDataPointValue> + Copy,
308    {
309        fn from(gauge: &SdkGauge<T>) -> Self {
310            TonicGauge {
311                data_points: gauge
312                    .data_points
313                    .iter()
314                    .map(|dp| TonicNumberDataPoint {
315                        attributes: dp.attributes.iter().map(Into::into).collect(),
316                        start_time_unix_nano: dp.start_time.map(to_nanos).unwrap_or_default(),
317                        time_unix_nano: dp.time.map(to_nanos).unwrap_or_default(),
318                        exemplars: dp.exemplars.iter().map(Into::into).collect(),
319                        flags: TonicDataPointFlags::default() as u32,
320                        value: Some(dp.value.into()),
321                    })
322                    .collect(),
323            }
324        }
325    }
326
327    impl<T> From<&SdkExemplar<T>> for TonicExemplar
328    where
329        T: Into<TonicExemplarValue> + Copy,
330    {
331        fn from(ex: &SdkExemplar<T>) -> Self {
332            TonicExemplar {
333                filtered_attributes: ex
334                    .filtered_attributes
335                    .iter()
336                    .map(|kv| (&kv.key, &kv.value).into())
337                    .collect(),
338                time_unix_nano: to_nanos(ex.time),
339                span_id: ex.span_id.into(),
340                trace_id: ex.trace_id.into(),
341                value: Some(ex.value.into()),
342            }
343        }
344    }
345}