mz_ore/
tracing.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Tracing utilities.
17//!
18//! This module contains application tracing utilities built on top of the
19//! [`tracing`] and [`opentelemetry`] libraries. The key exports are:
20//!
21//!  * The **[`configure`]** function, which configures the `tracing` and
22//!    `opentelemetry` crates with sensible defaults and should be called during
23//!    initialization of every Materialize binary.
24//!
25//!  * The **[`OpenTelemetryContext`]** type, which carries a tracing span
26//!    across thread or task boundaries within a process.
27
28use std::collections::BTreeMap;
29use std::io;
30use std::io::IsTerminal;
31use std::str::FromStr;
32use std::sync::LazyLock;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::{Arc, OnceLock};
35use std::time::Duration;
36
37#[cfg(feature = "tokio-console")]
38use console_subscriber::ConsoleLayer;
39use derivative::Derivative;
40use http::HeaderMap;
41use hyper_tls::HttpsConnector;
42use hyper_util::client::legacy::connect::HttpConnector;
43use opentelemetry::global::Error;
44use opentelemetry::propagation::{Extractor, Injector};
45use opentelemetry::trace::TracerProvider;
46use opentelemetry::{KeyValue, global};
47use opentelemetry_sdk::propagation::TraceContextPropagator;
48use opentelemetry_sdk::{Resource, trace};
49use prometheus::IntCounter;
50use tonic::metadata::MetadataMap;
51use tonic::transport::Endpoint;
52use tracing::{Event, Level, Span, Subscriber, warn};
53#[cfg(feature = "capture")]
54use tracing_capture::{CaptureLayer, SharedStorage};
55use tracing_opentelemetry::OpenTelemetrySpanExt;
56use tracing_subscriber::filter::Directive;
57use tracing_subscriber::fmt::format::{Writer, format};
58use tracing_subscriber::fmt::{self, FmtContext, FormatEvent, FormatFields};
59use tracing_subscriber::layer::{Layer, SubscriberExt};
60use tracing_subscriber::registry::LookupSpan;
61use tracing_subscriber::util::SubscriberInitExt;
62use tracing_subscriber::{EnvFilter, Registry, reload};
63
64use crate::metric;
65use crate::metrics::MetricsRegistry;
66#[cfg(feature = "tokio-console")]
67use crate::netio::SocketAddr;
68
69/// Application tracing configuration.
70///
71/// See the [`configure`] function for details.
72#[derive(Derivative)]
73#[derivative(Debug)]
74pub struct TracingConfig<F> {
75    /// The name of the service.
76    pub service_name: &'static str,
77    /// Configuration of the stderr log.
78    pub stderr_log: StderrLogConfig,
79    /// Optional configuration of the [`opentelemetry`] library.
80    pub opentelemetry: Option<OpenTelemetryConfig>,
81    /// Optional configuration for the [Tokio console] integration.
82    ///
83    /// [Tokio console]: https://github.com/tokio-rs/console
84    #[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
85    #[cfg(feature = "tokio-console")]
86    pub tokio_console: Option<TokioConsoleConfig>,
87    /// Optional configuration for capturing spans during tests.
88    #[cfg(feature = "capture")]
89    #[derivative(Debug = "ignore")]
90    pub capture: Option<SharedStorage>,
91    /// Optional Sentry configuration.
92    pub sentry: Option<SentryConfig<F>>,
93    /// The version of this build of the service.
94    pub build_version: &'static str,
95    /// The commit SHA of this build of the service.
96    pub build_sha: &'static str,
97    /// Registry for prometheus metrics.
98    pub registry: MetricsRegistry,
99}
100
101/// Configures Sentry reporting.
102#[derive(Debug, Clone)]
103pub struct SentryConfig<F> {
104    /// Sentry data source name to submit events to.
105    pub dsn: String,
106    /// The environment name to report to Sentry.
107    ///
108    /// If unset, the Sentry SDK will attempt to read the value from the
109    /// `SENTRY_ENVIRONMENT` environment variable.
110    pub environment: Option<String>,
111    /// Additional tags to include on each Sentry event/exception.
112    pub tags: BTreeMap<String, String>,
113    /// A filter that classifies events before sending them to Sentry.
114    pub event_filter: F,
115}
116
117/// Configures the stderr log.
118#[derive(Debug)]
119pub struct StderrLogConfig {
120    /// The format in which to emit messages.
121    pub format: StderrLogFormat,
122    /// A filter which determines which events are emitted to the log.
123    pub filter: EnvFilter,
124}
125
126/// Specifies the format of a stderr log message.
127#[derive(Debug, Clone)]
128pub enum StderrLogFormat {
129    /// Format as human readable, optionally colored text.
130    ///
131    /// Best suited for direct human consumption in a terminal.
132    Text {
133        /// An optional prefix for each log message.
134        prefix: Option<String>,
135    },
136    /// Format as JSON (in reality, JSONL).
137    ///
138    /// Best suited for ingestion in structured logging aggregators.
139    Json,
140}
141
142/// Configuration for the [`opentelemetry`] library.
143#[derive(Debug)]
144pub struct OpenTelemetryConfig {
145    /// The [OTLP/HTTP] endpoint to export OpenTelemetry data to.
146    ///
147    /// [OTLP/HTTP]: https://github.com/open-telemetry/opentelemetry-specification/blob/b13c1648bae16323868a5caf614bc10c917cc6ca/specification/protocol/otlp.md#otlphttp
148    pub endpoint: String,
149    /// Additional headers to send with every request to the endpoint.
150    pub headers: HeaderMap,
151    /// A filter which determines which events are exported.
152    pub filter: EnvFilter,
153    /// How many spans can be queued before dropping.
154    pub max_batch_queue_size: usize,
155    /// How many spans to process in a single batch
156    pub max_export_batch_size: usize,
157    /// How many concurrent export tasks to allow.
158    /// More tasks can lead to more memory consumed by the exporter.
159    pub max_concurrent_exports: usize,
160    /// Delay between consecutive batch exports.
161    pub batch_scheduled_delay: Duration,
162    /// How long to wait for a batch to be sent before dropping it.
163    pub max_export_timeout: Duration,
164    /// `opentelemetry::sdk::resource::Resource` to include with all
165    /// traces.
166    pub resource: Resource,
167}
168
169/// Configuration of the [Tokio console] integration.
170///
171/// [Tokio console]: https://github.com/tokio-rs/console
172#[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
173#[cfg(feature = "tokio-console")]
174#[derive(Debug, Clone)]
175pub struct TokioConsoleConfig {
176    /// The address on which to listen for Tokio console connections.
177    ///
178    /// See [`console_subscriber::Builder::server_addr`].
179    pub listen_addr: SocketAddr,
180    /// How frequently to publish updates to clients.
181    ///
182    /// See [`console_subscriber::Builder::publish_interval`].
183    pub publish_interval: Duration,
184    /// How long data is retained for completed tasks.
185    ///
186    /// See [`console_subscriber::Builder::retention`].
187    pub retention: Duration,
188}
189
190type Reloader = Arc<dyn Fn(EnvFilter, Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
191type DirectiveReloader = Arc<dyn Fn(Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
192
193/// A handle to the tracing infrastructure configured with [`configure`].
194#[derive(Clone)]
195pub struct TracingHandle {
196    stderr_log: Reloader,
197    opentelemetry: Reloader,
198    sentry: DirectiveReloader,
199}
200
201impl TracingHandle {
202    /// Creates a inoperative tracing handle.
203    ///
204    /// Primarily useful in tests.
205    pub fn disabled() -> TracingHandle {
206        TracingHandle {
207            stderr_log: Arc::new(|_, _| Ok(())),
208            opentelemetry: Arc::new(|_, _| Ok(())),
209            sentry: Arc::new(|_| Ok(())),
210        }
211    }
212
213    /// Dynamically reloads the stderr log filter.
214    pub fn reload_stderr_log_filter(
215        &self,
216        filter: EnvFilter,
217        defaults: Vec<Directive>,
218    ) -> Result<(), anyhow::Error> {
219        (self.stderr_log)(filter, defaults)
220    }
221
222    /// Dynamically reloads the OpenTelemetry log filter.
223    pub fn reload_opentelemetry_filter(
224        &self,
225        filter: EnvFilter,
226        defaults: Vec<Directive>,
227    ) -> Result<(), anyhow::Error> {
228        (self.opentelemetry)(filter, defaults)
229    }
230
231    /// Dynamically reloads the additional sentry directives.
232    pub fn reload_sentry_directives(&self, defaults: Vec<Directive>) -> Result<(), anyhow::Error> {
233        (self.sentry)(defaults)
234    }
235}
236
237impl std::fmt::Debug for TracingHandle {
238    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
239        f.debug_struct("TracingHandle").finish_non_exhaustive()
240    }
241}
242
243/// A guard for the tracing infrastructure configured with [`configure`].
244///
245/// This guard should be kept alive for the lifetime of the program.
246#[must_use = "Must hold for the lifetime of the program, otherwise tracing will be shutdown"]
247pub struct TracingGuard {
248    _sentry_guard: Option<sentry::ClientInitGuard>,
249}
250
251impl Drop for TracingGuard {
252    fn drop(&mut self) {
253        opentelemetry::global::shutdown_tracer_provider();
254    }
255}
256
257impl std::fmt::Debug for TracingGuard {
258    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
259        f.debug_struct("TracingGuard").finish_non_exhaustive()
260    }
261}
262
263// Note that the following defaults are used on startup, regardless of the
264// parameters in LaunchDarkly. If we need to, we can add cli flags to control
265// then going forward.
266
267/// By default we turn off tracing from the following crates, because they
268/// have error spans which are noisy.
269///
270/// Note: folks should feel free to add more crates here if we find more
271/// with long lived Spans.
272pub const LOGGING_DEFAULTS_STR: [&str; 1] = ["kube_client::client::builder=off"];
273/// Same as [`LOGGING_DEFAULTS_STR`], but structured as [`Directive`]s.
274pub static LOGGING_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
275    LOGGING_DEFAULTS_STR
276        .into_iter()
277        .map(|directive| Directive::from_str(directive).expect("valid directive"))
278        .collect()
279});
280/// By default we turn off tracing from the following crates, because they
281/// have long-lived Spans, which OpenTelemetry does not handle well.
282///
283/// Note: folks should feel free to add more crates here if we find more
284/// with long lived Spans.
285pub const OPENTELEMETRY_DEFAULTS_STR: [&str; 2] = ["h2=off", "hyper=off"];
286/// Same as [`OPENTELEMETRY_DEFAULTS_STR`], but structured as [`Directive`]s.
287pub static OPENTELEMETRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
288    OPENTELEMETRY_DEFAULTS_STR
289        .into_iter()
290        .map(|directive| Directive::from_str(directive).expect("valid directive"))
291        .collect()
292});
293
294/// By default we turn off tracing from the following crates, because they
295/// have error spans which are noisy.
296pub const SENTRY_DEFAULTS_STR: [&str; 2] =
297    ["kube_client::client::builder=off", "mysql_async::conn=off"];
298/// Same as [`SENTRY_DEFAULTS_STR`], but structured as [`Directive`]s.
299pub static SENTRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
300    SENTRY_DEFAULTS_STR
301        .into_iter()
302        .map(|directive| Directive::from_str(directive).expect("valid directive"))
303        .collect()
304});
305
306/// The [`GLOBAL_SUBSCRIBER`] type.
307type GlobalSubscriber = Arc<dyn Subscriber + Send + Sync + 'static>;
308
309/// An [`Arc`] of the tracing [`Subscriber`] constructed and initialized in
310/// [`configure`]. The value is written when [`configure`] runs.
311pub static GLOBAL_SUBSCRIBER: OnceLock<GlobalSubscriber> = OnceLock::new();
312
313/// Enables application tracing via the [`tracing`] and [`opentelemetry`]
314/// libraries.
315///
316/// The `tracing` library is configured to emit events as textual log lines to
317/// stderr. [`StderrLogConfig`] offer a small degree of control over this
318/// behavior.
319///
320/// If the `opentelemetry` parameter is `Some`, the `tracing` library is
321/// additionally configured to export events to an observability backend, like
322/// [Jaeger] or [Honeycomb].
323///
324/// The `tokio_console` parameter enables integration with the [Tokio console].
325/// When enabled, `tracing` events are collected and made available to the Tokio
326/// console via a server running on port
327///
328/// [Jaeger]: https://jaegertracing.io
329/// [Honeycomb]: https://www.honeycomb.io
330/// [Tokio console]: https://github.com/tokio-rs/console
331// Setting up OpenTelemetry in the background requires we are in a Tokio runtime
332// context, hence the `async`.
333#[allow(clippy::unused_async)]
334pub async fn configure<F>(
335    config: TracingConfig<F>,
336) -> Result<(TracingHandle, TracingGuard), anyhow::Error>
337where
338    F: Fn(&tracing::Metadata<'_>) -> sentry_tracing::EventFilter + Send + Sync + 'static,
339{
340    let stderr_log_layer: Box<dyn Layer<Registry> + Send + Sync> = match config.stderr_log.format {
341        StderrLogFormat::Text { prefix } => {
342            // See: https://no-color.org/
343            let no_color = std::env::var_os("NO_COLOR").unwrap_or_else(|| "".into()) != "";
344            Box::new(
345                fmt::layer()
346                    .with_writer(io::stderr)
347                    .event_format(PrefixFormat {
348                        inner: format(),
349                        prefix,
350                    })
351                    .with_ansi(!no_color && io::stderr().is_terminal()),
352            )
353        }
354        StderrLogFormat::Json => Box::new(
355            fmt::layer()
356                .with_writer(io::stderr)
357                .json()
358                .with_current_span(true),
359        ),
360    };
361    let (stderr_log_filter, stderr_log_filter_reloader) = reload::Layer::new({
362        let mut filter = config.stderr_log.filter;
363        for directive in LOGGING_DEFAULTS.iter() {
364            filter = filter.add_directive(directive.clone());
365        }
366        filter
367    });
368    let stderr_log_layer = stderr_log_layer.with_filter(stderr_log_filter);
369    let stderr_log_reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
370        for directive in &defaults {
371            filter = filter.add_directive(directive.clone());
372        }
373        Ok(stderr_log_filter_reloader.reload(filter)?)
374    });
375
376    let (otel_layer, otel_reloader): (_, Reloader) = if let Some(otel_config) = config.opentelemetry
377    {
378        opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
379
380        // Manually set up an OpenSSL-backed, h2, proxied `Channel`,
381        // with the timeout configured according to:
382        // https://docs.rs/opentelemetry-otlp/latest/opentelemetry_otlp/struct.TonicExporterBuilder.html#method.with_channel
383        let channel = Endpoint::from_shared(otel_config.endpoint)?
384            .timeout(Duration::from_secs(
385                opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT,
386            ))
387            // TODO(guswynn): investigate if this should be non-lazy.
388            .connect_with_connector_lazy({
389                let mut http = HttpConnector::new();
390                http.enforce_http(false);
391                HttpsConnector::from((
392                    http,
393                    // This is the same as the default, plus an h2 ALPN request.
394                    tokio_native_tls::TlsConnector::from(
395                        native_tls::TlsConnector::builder()
396                            .request_alpns(&["h2"])
397                            .build()
398                            .unwrap(),
399                    ),
400                ))
401            });
402        let exporter = opentelemetry_otlp::new_exporter()
403            .tonic()
404            .with_channel(channel)
405            .with_metadata(MetadataMap::from_headers(otel_config.headers));
406        let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default()
407            .with_max_queue_size(otel_config.max_batch_queue_size)
408            .with_max_export_batch_size(otel_config.max_export_batch_size)
409            .with_max_concurrent_exports(otel_config.max_concurrent_exports)
410            .with_scheduled_delay(otel_config.batch_scheduled_delay)
411            .with_max_export_timeout(otel_config.max_export_timeout)
412            .build();
413        let tracer = opentelemetry_otlp::new_pipeline()
414            .tracing()
415            .with_trace_config(
416                trace::Config::default().with_resource(
417                    // The latter resources win, so if the user specifies
418                    // `service.name` in the configuration, it will override the
419                    // `service.name` value we configure here.
420                    Resource::new([KeyValue::new(
421                        "service.name",
422                        config.service_name.to_string(),
423                    )])
424                    .merge(&otel_config.resource),
425                ),
426            )
427            .with_exporter(exporter)
428            .with_batch_config(batch_config)
429            .install_batch(opentelemetry_sdk::runtime::Tokio)
430            .unwrap()
431            .tracer(config.service_name);
432
433        // Create our own error handler to:
434        //   1. Rate limit the number of error logs. By default the OTel library will emit
435        //      an enormous number of duplicate logs if any errors occur, one per batch
436        //      send attempt, until the error is resolved.
437        //   2. Log the errors via our tracing layer, so they are formatted consistently
438        //      with the rest of our logs, rather than the direct `eprintln` used by the
439        //      OTel library.
440        const OPENTELEMETRY_ERROR_MSG_BACKOFF_SECONDS: u64 = 30;
441        let last_log_in_epoch_seconds = AtomicU64::default();
442        opentelemetry::global::set_error_handler(move |err| {
443            let now = std::time::SystemTime::now()
444                .duration_since(std::time::SystemTime::UNIX_EPOCH)
445                .expect("Failed to get duration since Unix epoch")
446                .as_secs();
447            let last_log = last_log_in_epoch_seconds.load(Ordering::SeqCst);
448
449            if now.saturating_sub(last_log) >= OPENTELEMETRY_ERROR_MSG_BACKOFF_SECONDS {
450                if last_log_in_epoch_seconds
451                    .compare_exchange_weak(last_log, now, Ordering::Relaxed, Ordering::Relaxed)
452                    .is_err()
453                {
454                    return;
455                }
456                use crate::error::ErrorExt;
457                match err {
458                    Error::Trace(err) => {
459                        warn!("OpenTelemetry error: {}", err.display_with_causes());
460                    }
461                    // TODO(guswynn): turn off the metrics feature?
462                    Error::Metric(err) => {
463                        warn!("OpenTelemetry error: {}", err.display_with_causes());
464                    }
465                    Error::Other(err) => {
466                        warn!("OpenTelemetry error: {}", err);
467                    }
468                    _ => {
469                        warn!("unknown OpenTelemetry error");
470                    }
471                }
472            }
473        })
474        .expect("valid error handler");
475
476        let (filter, filter_handle) = reload::Layer::new({
477            let mut filter = otel_config.filter;
478            for directive in OPENTELEMETRY_DEFAULTS.iter() {
479                filter = filter.add_directive(directive.clone());
480            }
481            filter
482        });
483        let metrics_layer = MetricsLayer::new(&config.registry);
484        let layer = tracing_opentelemetry::layer()
485            // OpenTelemetry does not handle long-lived Spans well, and they end up continuously
486            // eating memory until OOM. So we set a max number of events that are allowed to be
487            // logged to a Span, once this max is passed, old events will get dropped
488            //
489            // TODO(parker-timmerman|guswynn): make this configurable with LaunchDarkly
490            .max_events_per_span(2048)
491            .with_tracer(tracer)
492            .and_then(metrics_layer)
493            // WARNING, ENTERING SPOOKY ZONE 2.0
494            //
495            // Notice we use `with_filter` here. `and_then` will apply the filter globally.
496            .with_filter(filter);
497        let reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
498            // Re-apply our defaults on reload.
499            for directive in &defaults {
500                filter = filter.add_directive(directive.clone());
501            }
502            Ok(filter_handle.reload(filter)?)
503        });
504        (Some(layer), reloader)
505    } else {
506        let reloader = Arc::new(|_, _| Ok(()));
507        (None, reloader)
508    };
509
510    #[cfg(feature = "tokio-console")]
511    let tokio_console_layer = if let Some(console_config) = config.tokio_console.clone() {
512        let builder = ConsoleLayer::builder()
513            .publish_interval(console_config.publish_interval)
514            .retention(console_config.retention);
515        let builder = match console_config.listen_addr {
516            SocketAddr::Inet(addr) => builder.server_addr(addr),
517            SocketAddr::Unix(addr) => {
518                let path = addr.as_pathname().unwrap().as_ref();
519                builder.server_addr(path)
520            }
521            SocketAddr::Turmoil(_) => unimplemented!(),
522        };
523        Some(builder.spawn())
524    } else {
525        None
526    };
527
528    let (sentry_guard, sentry_layer, sentry_reloader): (_, _, DirectiveReloader) =
529        if let Some(sentry_config) = config.sentry {
530            let guard = sentry::init((
531                sentry_config.dsn,
532                sentry::ClientOptions {
533                    attach_stacktrace: true,
534                    release: Some(format!("materialize@{0}", config.build_version).into()),
535                    environment: sentry_config.environment.map(Into::into),
536                    ..Default::default()
537                },
538            ));
539
540            sentry::configure_scope(|scope| {
541                scope.set_tag("service_name", config.service_name);
542                scope.set_tag("build_sha", config.build_sha.to_string());
543                for (k, v) in sentry_config.tags {
544                    scope.set_tag(&k, v);
545                }
546            });
547
548            let (filter, filter_handle) = reload::Layer::new({
549                // Please see the comment on `with_filter` below.
550                let mut filter = EnvFilter::new("info");
551                for directive in SENTRY_DEFAULTS.iter() {
552                    filter = filter.add_directive(directive.clone());
553                }
554                filter
555            });
556            let layer = sentry_tracing::layer()
557                .event_filter(sentry_config.event_filter)
558                // WARNING, ENTERING THE SPOOKY ZONE
559                //
560                // While sentry provides an event filter above that maps events to types of sentry events, its `Layer`
561                // implementation does not participate in `tracing`'s level-fast-path implementation, which depends on
562                // a hidden api (<https://github.com/tokio-rs/tracing/blob/b28c9351dd4f34ed3c7d5df88bb5c2e694d9c951/tracing-subscriber/src/layer/mod.rs#L861-L867>)
563                // which is primarily manged by filters (like below). The fast path skips verbose log
564                // (and span) levels that no layer is interested by reading a single atomic. Usually, not implementing this
565                // api means "give me everything, including `trace`, unless you attach a filter to me.
566                //
567                // The curious thing here (and a bug in tracing) is that _some configurations of our layer stack above_,
568                // if you don't have this filter can cause the fast-path to trigger, despite the fact
569                // that the sentry layer would specifically communicating that it wants to see
570                // everything. This bug appears to be related to the presence of a `reload::Layer`
571                // _around a filter, not a layer_, and guswynn is tracking investigating it here:
572                // <https://github.com/MaterializeInc/database-issues/issues/4794>. Because we don't
573                // enable a reload-able filter in CI/locally, but DO in production (the otel layer), it
574                // was once possible to trigger and rely on the fast path in CI, but not notice that it
575                // was disabled in production.
576                //
577                // The behavior of this optimization is now tested in various scenarios (in
578                // `test/tracing`). Regardless, when the upstream bug is fixed/resolved,
579                // we will continue to place this here, as the sentry layer only cares about
580                // events <= INFO, so we want to use the fast-path if no other layer
581                // is interested in high-fidelity events.
582                .with_filter(filter);
583            let reloader = Arc::new(move |defaults: Vec<Directive>| {
584                // Please see the comment on `with_filter` above.
585                let mut filter = EnvFilter::new("info");
586                // Re-apply our defaults on reload.
587                for directive in &defaults {
588                    filter = filter.add_directive(directive.clone());
589                }
590                Ok(filter_handle.reload(filter)?)
591            });
592            (Some(guard), Some(layer), reloader)
593        } else {
594            let reloader = Arc::new(|_| Ok(()));
595            (None, None, reloader)
596        };
597
598    #[cfg(feature = "capture")]
599    let capture = config.capture.map(|storage| CaptureLayer::new(&storage));
600
601    let stack = tracing_subscriber::registry();
602    let stack = stack.with(stderr_log_layer);
603    #[cfg(feature = "capture")]
604    let stack = stack.with(capture);
605    let stack = stack.with(otel_layer);
606    #[cfg(feature = "tokio-console")]
607    let stack = stack.with(tokio_console_layer);
608    let stack = stack.with(sentry_layer);
609
610    // Set the stack as a global subscriber.
611    assert!(GLOBAL_SUBSCRIBER.set(Arc::new(stack)).is_ok());
612    // Initialize the subscriber.
613    Arc::clone(GLOBAL_SUBSCRIBER.get().unwrap()).init();
614
615    #[cfg(feature = "tokio-console")]
616    if let Some(console_config) = config.tokio_console {
617        let endpoint = match console_config.listen_addr {
618            SocketAddr::Inet(addr) => format!("http://{addr}"),
619            SocketAddr::Unix(addr) => format!("file://localhost{addr}"),
620            SocketAddr::Turmoil(_) => unimplemented!(),
621        };
622        tracing::info!("starting tokio console on {endpoint}");
623    }
624
625    let handle = TracingHandle {
626        stderr_log: stderr_log_reloader,
627        opentelemetry: otel_reloader,
628        sentry: sentry_reloader,
629    };
630    let guard = TracingGuard {
631        _sentry_guard: sentry_guard,
632    };
633
634    Ok((handle, guard))
635}
636
637/// Returns the [`Level`] of a crate from an [`EnvFilter`] by performing an
638/// exact match between `crate` and the original `EnvFilter` directive.
639pub fn crate_level(filter: &EnvFilter, crate_name: &'static str) -> Level {
640    // TODO: implement `would_enable` on `EnvFilter` or equivalent
641    // to avoid having to manually parse out the directives. This
642    // would also significantly broaden the lookups the fn is able
643    // to do (modules, spans, fields, etc).
644
645    let mut default_level = Level::ERROR;
646    // EnvFilter roundtrips through its Display fmt, so it
647    // is safe to split out its individual directives here
648    for directive in format!("{}", filter).split(',') {
649        match directive.split('=').collect::<Vec<_>>().as_slice() {
650            [target, level] => {
651                if *target == crate_name {
652                    match Level::from_str(*level) {
653                        Ok(level) => return level,
654                        Err(err) => warn!("invalid level for {}: {}", target, err),
655                    }
656                }
657            }
658            [token] => match Level::from_str(*token) {
659                Ok(level) => default_level = default_level.max(level),
660                Err(_) => {
661                    // a target without a level is interpreted as trace
662                    if *token == crate_name {
663                        default_level = default_level.max(Level::TRACE);
664                    }
665                }
666            },
667            _ => {}
668        }
669    }
670
671    default_level
672}
673
674/// A wrapper around a [`FormatEvent`] that adds an optional prefix to each
675/// event.
676#[derive(Debug)]
677pub struct PrefixFormat<F> {
678    inner: F,
679    prefix: Option<String>,
680}
681
682impl<F, C, N> FormatEvent<C, N> for PrefixFormat<F>
683where
684    C: Subscriber + for<'a> LookupSpan<'a>,
685    N: for<'a> FormatFields<'a> + 'static,
686    F: FormatEvent<C, N>,
687{
688    fn format_event(
689        &self,
690        ctx: &FmtContext<'_, C, N>,
691        mut writer: Writer<'_>,
692        event: &Event<'_>,
693    ) -> std::fmt::Result {
694        match &self.prefix {
695            None => self.inner.format_event(ctx, writer, event)?,
696            Some(prefix) => {
697                let mut prefix = yansi::Paint::new(prefix);
698                if writer.has_ansi_escapes() {
699                    prefix = prefix.bold();
700                }
701                write!(writer, "{}: ", prefix)?;
702                self.inner.format_event(ctx, writer, event)?;
703            }
704        }
705        Ok(())
706    }
707}
708
709/// An OpenTelemetry context.
710///
711/// Allows associating [`tracing`] spans across task or thread boundaries.
712#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
713pub struct OpenTelemetryContext {
714    inner: BTreeMap<String, String>,
715}
716
717impl OpenTelemetryContext {
718    /// Attaches this `Context` to the current [`tracing`] span,
719    /// as its parent.
720    ///
721    /// If there is not enough information in this `OpenTelemetryContext`
722    /// to create a context, then the current thread's `Context` is used
723    /// defaulting to the default `Context`.
724    pub fn attach_as_parent(&self) {
725        let parent_cx = global::get_text_map_propagator(|prop| prop.extract(self));
726        tracing::Span::current().set_parent(parent_cx);
727    }
728
729    /// Attaches this `Context` to the given [`tracing`] Span, as its parent.
730    /// as its parent.
731    ///
732    /// If there is not enough information in this `OpenTelemetryContext`
733    /// to create a context, then the current thread's `Context` is used
734    /// defaulting to the default `Context`.
735    pub fn attach_as_parent_to(&self, span: &Span) {
736        let parent_cx = global::get_text_map_propagator(|prop| prop.extract(self));
737        span.set_parent(parent_cx);
738    }
739
740    /// Obtains a `Context` from the current [`tracing`] span.
741    pub fn obtain() -> Self {
742        let mut context = Self::empty();
743        global::get_text_map_propagator(|propagator| {
744            propagator.inject_context(&tracing::Span::current().context(), &mut context)
745        });
746
747        context
748    }
749
750    /// Obtains an empty `Context`.
751    pub fn empty() -> Self {
752        Self {
753            inner: BTreeMap::new(),
754        }
755    }
756}
757
758impl Extractor for OpenTelemetryContext {
759    fn get(&self, key: &str) -> Option<&str> {
760        self.inner.get(&key.to_lowercase()).map(|v| v.as_str())
761    }
762
763    fn keys(&self) -> Vec<&str> {
764        self.inner.keys().map(|k| k.as_str()).collect::<Vec<_>>()
765    }
766}
767
768impl Injector for OpenTelemetryContext {
769    fn set(&mut self, key: &str, value: String) {
770        self.inner.insert(key.to_lowercase(), value);
771    }
772}
773
774impl From<OpenTelemetryContext> for BTreeMap<String, String> {
775    fn from(ctx: OpenTelemetryContext) -> Self {
776        ctx.inner
777    }
778}
779
780impl From<BTreeMap<String, String>> for OpenTelemetryContext {
781    fn from(map: BTreeMap<String, String>) -> Self {
782        Self { inner: map }
783    }
784}
785
786struct MetricsLayer {
787    on_close: IntCounter,
788}
789
790impl MetricsLayer {
791    fn new(registry: &MetricsRegistry) -> Self {
792        MetricsLayer {
793            on_close: registry.register(metric!(
794                name: "mz_otel_on_close",
795                help: "count of on_close events sent to otel",
796            )),
797        }
798    }
799}
800
801impl<S: tracing::Subscriber> Layer<S> for MetricsLayer {
802    fn on_close(&self, _id: tracing::span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
803        self.on_close.inc()
804    }
805}
806
807#[cfg(test)]
808mod tests {
809    use std::str::FromStr;
810    use tracing::Level;
811    use tracing_subscriber::filter::{EnvFilter, LevelFilter, Targets};
812
813    #[crate::test]
814    fn overriding_targets() {
815        let user_defined = Targets::new().with_target("my_crate", Level::INFO);
816
817        let default = Targets::new().with_target("my_crate", LevelFilter::OFF);
818        assert!(!default.would_enable("my_crate", &Level::INFO));
819
820        // The user_defined filters should override the default
821        let filters = Targets::new()
822            .with_targets(default)
823            .with_targets(user_defined);
824        assert!(filters.would_enable("my_crate", &Level::INFO));
825    }
826
827    #[crate::test]
828    fn crate_level() {
829        // target=level directives only. should default to ERROR if unspecified
830        let filter = EnvFilter::from_str("abc=trace,def=debug").expect("valid");
831        assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
832        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
833        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
834        assert_eq!(
835            super::crate_level(&filter, "abc::doesnt::exist"),
836            Level::ERROR
837        );
838        assert_eq!(super::crate_level(&filter, "doesnt::exist"), Level::ERROR);
839
840        // add in a global default
841        let filter = EnvFilter::from_str("abc=trace,def=debug,info").expect("valid");
842        assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
843        assert_eq!(
844            super::crate_level(&filter, "abc::doesnt:exist"),
845            Level::INFO
846        );
847        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
848        assert_eq!(super::crate_level(&filter, "nan"), Level::INFO);
849
850        // a directive with mod path doesn't match the top-level crate
851        let filter = EnvFilter::from_str("abc::def::ghi=trace,debug").expect("valid");
852        assert_eq!(super::crate_level(&filter, "abc"), Level::DEBUG);
853        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
854        assert_eq!(
855            super::crate_level(&filter, "gets_the_default"),
856            Level::DEBUG
857        );
858
859        // directives with spans and fields don't match the top-level crate
860        let filter =
861            EnvFilter::from_str("abc[s]=trace,def[s{g=h}]=debug,[{s2}]=debug,info").expect("valid");
862        assert_eq!(super::crate_level(&filter, "abc"), Level::INFO);
863        assert_eq!(super::crate_level(&filter, "def"), Level::INFO);
864        assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
865
866        // a bare target without a level is taken as trace
867        let filter = EnvFilter::from_str("abc,info").expect("valid");
868        assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
869        assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
870        // the contract of `crate_level` is that it only matches top-level crates.
871        // if we had a proper EnvFilter::would_match impl, this assertion should
872        // be Level::TRACE
873        assert_eq!(super::crate_level(&filter, "abc::def"), Level::INFO);
874    }
875}