opentelemetry_otlp/exporter/tonic/
logs.rs

1use async_trait::async_trait;
2use core::fmt;
3use opentelemetry::logs::{LogError, LogResult};
4use opentelemetry_proto::tonic::collector::logs::v1::{
5    logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
6};
7use opentelemetry_sdk::export::logs::{LogData, LogExporter};
8use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
9
10use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
11
12use super::BoxInterceptor;
13
14pub(crate) struct TonicLogsClient {
15    inner: Option<ClientInner>,
16    #[allow(dead_code)]
17    // <allow dead> would be removed once we support set_resource for metrics.
18    resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
19}
20
21struct ClientInner {
22    client: LogsServiceClient<Channel>,
23    interceptor: BoxInterceptor,
24}
25
26impl fmt::Debug for TonicLogsClient {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        f.write_str("TonicLogsClient")
29    }
30}
31
32impl TonicLogsClient {
33    pub(super) fn new(
34        channel: Channel,
35        interceptor: BoxInterceptor,
36        compression: Option<CompressionEncoding>,
37    ) -> Self {
38        let mut client = LogsServiceClient::new(channel);
39        if let Some(compression) = compression {
40            client = client
41                .send_compressed(compression)
42                .accept_compressed(compression);
43        }
44
45        TonicLogsClient {
46            inner: Some(ClientInner {
47                client,
48                interceptor,
49            }),
50            resource: Default::default(),
51        }
52    }
53}
54
55#[async_trait]
56impl LogExporter for TonicLogsClient {
57    async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
58        let (mut client, metadata, extensions) = match &mut self.inner {
59            Some(inner) => {
60                let (m, e, _) = inner
61                    .interceptor
62                    .call(Request::new(()))
63                    .map_err(|e| LogError::Other(Box::new(e)))?
64                    .into_parts();
65                (inner.client.clone(), m, e)
66            }
67            None => return Err(LogError::Other("exporter is already shut down".into())),
68        };
69
70        //TODO: avoid cloning here.
71        let owned_batch = batch
72            .into_iter()
73            .map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
74            .collect::<Vec<LogData>>();
75
76        let resource_logs = group_logs_by_resource_and_scope(owned_batch, &self.resource);
77
78        client
79            .export(Request::from_parts(
80                metadata,
81                extensions,
82                ExportLogsServiceRequest { resource_logs },
83            ))
84            .await
85            .map_err(crate::Error::from)?;
86
87        Ok(())
88    }
89
90    fn shutdown(&mut self) {
91        let _ = self.inner.take();
92    }
93
94    fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
95        self.resource = resource.into();
96    }
97}