mz_ore/
tracing.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Tracing utilities.
17//!
18//! This module contains application tracing utilities built on top of the
19//! [`tracing`] and [`opentelemetry`] libraries. The key exports are:
20//!
21//!  * The **[`configure`]** function, which configures the `tracing` and
22//!    `opentelemetry` crates with sensible defaults and should be called during
23//!    initialization of every Materialize binary.
24//!
25//!  * The **[`OpenTelemetryContext`]** type, which carries a tracing span
26//!    across thread or task boundaries within a process.
27
28use std::collections::BTreeMap;
29use std::io;
30use std::io::IsTerminal;
31use std::str::FromStr;
32use std::sync::LazyLock;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::{Arc, OnceLock};
35use std::time::Duration;
36
37#[cfg(feature = "tokio-console")]
38use console_subscriber::ConsoleLayer;
39use derivative::Derivative;
40use http::HeaderMap;
41use hyper_tls::HttpsConnector;
42use hyper_util::client::legacy::connect::HttpConnector;
43use opentelemetry::global::Error;
44use opentelemetry::propagation::{Extractor, Injector};
45use opentelemetry::trace::TracerProvider;
46use opentelemetry::{KeyValue, global};
47use opentelemetry_sdk::propagation::TraceContextPropagator;
48use opentelemetry_sdk::{Resource, trace};
49use prometheus::IntCounter;
50use tonic::metadata::MetadataMap;
51use tonic::transport::Endpoint;
52use tracing::{Event, Level, Span, Subscriber, warn};
53#[cfg(feature = "capture")]
54use tracing_capture::{CaptureLayer, SharedStorage};
55use tracing_opentelemetry::OpenTelemetrySpanExt;
56use tracing_subscriber::filter::Directive;
57use tracing_subscriber::fmt::format::{Writer, format};
58use tracing_subscriber::fmt::{self, FmtContext, FormatEvent, FormatFields};
59use tracing_subscriber::layer::{Layer, SubscriberExt};
60use tracing_subscriber::registry::LookupSpan;
61use tracing_subscriber::util::SubscriberInitExt;
62use tracing_subscriber::{EnvFilter, Registry, reload};
63
64use crate::metric;
65use crate::metrics::MetricsRegistry;
66#[cfg(feature = "tokio-console")]
67use crate::netio::SocketAddr;
68
69/// Application tracing configuration.
70///
71/// See the [`configure`] function for details.
72#[derive(Derivative)]
73#[derivative(Debug)]
74pub struct TracingConfig<F> {
75    /// The name of the service.
76    pub service_name: &'static str,
77    /// Configuration of the stderr log.
78    pub stderr_log: StderrLogConfig,
79    /// Optional configuration of the [`opentelemetry`] library.
80    pub opentelemetry: Option<OpenTelemetryConfig>,
81    /// Optional configuration for the [Tokio console] integration.
82    ///
83    /// [Tokio console]: https://github.com/tokio-rs/console
84    #[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
85    #[cfg(feature = "tokio-console")]
86    pub tokio_console: Option<TokioConsoleConfig>,
87    /// Optional configuration for capturing spans during tests.
88    #[cfg(feature = "capture")]
89    #[derivative(Debug = "ignore")]
90    pub capture: Option<SharedStorage>,
91    /// Optional Sentry configuration.
92    pub sentry: Option<SentryConfig<F>>,
93    /// The version of this build of the service.
94    pub build_version: &'static str,
95    /// The commit SHA of this build of the service.
96    pub build_sha: &'static str,
97    /// Registry for prometheus metrics.
98    pub registry: MetricsRegistry,
99}
100
101/// Configures Sentry reporting.
102#[derive(Debug, Clone)]
103pub struct SentryConfig<F> {
104    /// Sentry data source name to submit events to.
105    pub dsn: String,
106    /// The environment name to report to Sentry.
107    ///
108    /// If unset, the Sentry SDK will attempt to read the value from the
109    /// `SENTRY_ENVIRONMENT` environment variable.
110    pub environment: Option<String>,
111    /// Additional tags to include on each Sentry event/exception.
112    pub tags: BTreeMap<String, String>,
113    /// A filter that classifies events before sending them to Sentry.
114    pub event_filter: F,
115}
116
117/// Configures the stderr log.
118#[derive(Debug)]
119pub struct StderrLogConfig {
120    /// The format in which to emit messages.
121    pub format: StderrLogFormat,
122    /// A filter which determines which events are emitted to the log.
123    pub filter: EnvFilter,
124}
125
126/// Specifies the format of a stderr log message.
127#[derive(Debug, Clone)]
128pub enum StderrLogFormat {
129    /// Format as human readable, optionally colored text.
130    ///
131    /// Best suited for direct human consumption in a terminal.
132    Text {
133        /// An optional prefix for each log message.
134        prefix: Option<String>,
135    },
136    /// Format as JSON (in reality, JSONL).
137    ///
138    /// Best suited for ingestion in structured logging aggregators.
139    Json,
140}
141
142/// Configuration for the [`opentelemetry`] library.
143#[derive(Debug)]
144pub struct OpenTelemetryConfig {
145    /// The [OTLP/HTTP] endpoint to export OpenTelemetry data to.
146    ///
147    /// [OTLP/HTTP]: https://github.com/open-telemetry/opentelemetry-specification/blob/b13c1648bae16323868a5caf614bc10c917cc6ca/specification/protocol/otlp.md#otlphttp
148    pub endpoint: String,
149    /// Additional headers to send with every request to the endpoint.
150    pub headers: HeaderMap,
151    /// A filter which determines which events are exported.
152    pub filter: EnvFilter,
153    /// How many spans can be queued before dropping.
154    pub max_batch_queue_size: usize,
155    /// How many spans to process in a single batch
156    pub max_export_batch_size: usize,
157    /// How many concurrent export tasks to allow.
158    /// More tasks can lead to more memory consumed by the exporter.
159    pub max_concurrent_exports: usize,
160    /// Delay between consecutive batch exports.
161    pub batch_scheduled_delay: Duration,
162    /// How long to wait for a batch to be sent before dropping it.
163    pub max_export_timeout: Duration,
164    /// `opentelemetry::sdk::resource::Resource` to include with all
165    /// traces.
166    pub resource: Resource,
167}
168
169/// Configuration of the [Tokio console] integration.
170///
171/// [Tokio console]: https://github.com/tokio-rs/console
172#[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
173#[cfg(feature = "tokio-console")]
174#[derive(Debug, Clone)]
175pub struct TokioConsoleConfig {
176    /// The address on which to listen for Tokio console connections.
177    ///
178    /// See [`console_subscriber::Builder::server_addr`].
179    pub listen_addr: SocketAddr,
180    /// How frequently to publish updates to clients.
181    ///
182    /// See [`console_subscriber::Builder::publish_interval`].
183    pub publish_interval: Duration,
184    /// How long data is retained for completed tasks.
185    ///
186    /// See [`console_subscriber::Builder::retention`].
187    pub retention: Duration,
188}
189
190type Reloader = Arc<dyn Fn(EnvFilter, Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
191type DirectiveReloader = Arc<dyn Fn(Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
192
193/// A handle to the tracing infrastructure configured with [`configure`].
194#[derive(Clone)]
195pub struct TracingHandle {
196    stderr_log: Reloader,
197    opentelemetry: Reloader,
198    sentry: DirectiveReloader,
199}
200
201impl TracingHandle {
202    /// Creates a inoperative tracing handle.
203    ///
204    /// Primarily useful in tests.
205    pub fn disabled() -> TracingHandle {
206        TracingHandle {
207            stderr_log: Arc::new(|_, _| Ok(())),
208            opentelemetry: Arc::new(|_, _| Ok(())),
209            sentry: Arc::new(|_| Ok(())),
210        }
211    }
212
213    /// Dynamically reloads the stderr log filter.
214    pub fn reload_stderr_log_filter(
215        &self,
216        filter: EnvFilter,
217        defaults: Vec<Directive>,
218    ) -> Result<(), anyhow::Error> {
219        (self.stderr_log)(filter, defaults)
220    }
221
222    /// Dynamically reloads the OpenTelemetry log filter.
223    pub fn reload_opentelemetry_filter(
224        &self,
225        filter: EnvFilter,
226        defaults: Vec<Directive>,
227    ) -> Result<(), anyhow::Error> {
228        (self.opentelemetry)(filter, defaults)
229    }
230
231    /// Dynamically reloads the additional sentry directives.
232    pub fn reload_sentry_directives(&self, defaults: Vec<Directive>) -> Result<(), anyhow::Error> {
233        (self.sentry)(defaults)
234    }
235}
236
237impl std::fmt::Debug for TracingHandle {
238    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
239        f.debug_struct("TracingHandle").finish_non_exhaustive()
240    }
241}
242
243// Note that the following defaults are used on startup, regardless of the
244// parameters in LaunchDarkly. If we need to, we can add cli flags to control
245// then going forward.
246
247/// By default we turn off tracing from the following crates, because they
248/// have error spans which are noisy.
249///
250/// Note: folks should feel free to add more crates here if we find more
251/// with long lived Spans.
252pub const LOGGING_DEFAULTS_STR: [&str; 1] = ["kube_client::client::builder=off"];
253/// Same as [`LOGGING_DEFAULTS_STR`], but structured as [`Directive`]s.
254pub static LOGGING_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
255    LOGGING_DEFAULTS_STR
256        .into_iter()
257        .map(|directive| Directive::from_str(directive).expect("valid directive"))
258        .collect()
259});
260/// By default we turn off tracing from the following crates, because they
261/// have long-lived Spans, which OpenTelemetry does not handle well.
262///
263/// Note: folks should feel free to add more crates here if we find more
264/// with long lived Spans.
265pub const OPENTELEMETRY_DEFAULTS_STR: [&str; 2] = ["h2=off", "hyper=off"];
266/// Same as [`OPENTELEMETRY_DEFAULTS_STR`], but structured as [`Directive`]s.
267pub static OPENTELEMETRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
268    OPENTELEMETRY_DEFAULTS_STR
269        .into_iter()
270        .map(|directive| Directive::from_str(directive).expect("valid directive"))
271        .collect()
272});
273
274/// By default we turn off tracing from the following crates, because they
275/// have error spans which are noisy.
276pub const SENTRY_DEFAULTS_STR: [&str; 2] =
277    ["kube_client::client::builder=off", "mysql_async::conn=off"];
278/// Same as [`SENTRY_DEFAULTS_STR`], but structured as [`Directive`]s.
279pub static SENTRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
280    SENTRY_DEFAULTS_STR
281        .into_iter()
282        .map(|directive| Directive::from_str(directive).expect("valid directive"))
283        .collect()
284});
285
286/// The [`GLOBAL_SUBSCRIBER`] type.
287type GlobalSubscriber = Arc<dyn Subscriber + Send + Sync + 'static>;
288
289/// An [`Arc`] of the tracing [`Subscriber`] constructed and initialized in
290/// [`configure`]. The value is written when [`configure`] runs.
291pub static GLOBAL_SUBSCRIBER: OnceLock<GlobalSubscriber> = OnceLock::new();
292
293/// Enables application tracing via the [`tracing`] and [`opentelemetry`]
294/// libraries.
295///
296/// The `tracing` library is configured to emit events as textual log lines to
297/// stderr. [`StderrLogConfig`] offer a small degree of control over this
298/// behavior.
299///
300/// If the `opentelemetry` parameter is `Some`, the `tracing` library is
301/// additionally configured to export events to an observability backend, like
302/// [Jaeger] or [Honeycomb].
303///
304/// The `tokio_console` parameter enables integration with the [Tokio console].
305/// When enabled, `tracing` events are collected and made available to the Tokio
306/// console via a server running on port
307///
308/// [Jaeger]: https://jaegertracing.io
309/// [Honeycomb]: https://www.honeycomb.io
310/// [Tokio console]: https://github.com/tokio-rs/console
311// Setting up OpenTelemetry in the background requires we are in a Tokio runtime
312// context, hence the `async`.
313#[allow(clippy::unused_async)]
314pub async fn configure<F>(config: TracingConfig<F>) -> Result<TracingHandle, anyhow::Error>
315where
316    F: Fn(&tracing::Metadata<'_>) -> sentry_tracing::EventFilter + Send + Sync + 'static,
317{
318    let stderr_log_layer: Box<dyn Layer<Registry> + Send + Sync> = match config.stderr_log.format {
319        StderrLogFormat::Text { prefix } => {
320            // See: https://no-color.org/
321            let no_color = std::env::var_os("NO_COLOR").unwrap_or_else(|| "".into()) != "";
322            Box::new(
323                fmt::layer()
324                    .with_writer(io::stderr)
325                    .event_format(PrefixFormat {
326                        inner: format(),
327                        prefix,
328                    })
329                    .with_ansi(!no_color && io::stderr().is_terminal()),
330            )
331        }
332        StderrLogFormat::Json => Box::new(
333            fmt::layer()
334                .with_writer(io::stderr)
335                .json()
336                .with_current_span(true),
337        ),
338    };
339    let (stderr_log_filter, stderr_log_filter_reloader) = reload::Layer::new({
340        let mut filter = config.stderr_log.filter;
341        for directive in LOGGING_DEFAULTS.iter() {
342            filter = filter.add_directive(directive.clone());
343        }
344        filter
345    });
346    let stderr_log_layer = stderr_log_layer.with_filter(stderr_log_filter);
347    let stderr_log_reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
348        for directive in &defaults {
349            filter = filter.add_directive(directive.clone());
350        }
351        Ok(stderr_log_filter_reloader.reload(filter)?)
352    });
353
354    let (otel_layer, otel_reloader): (_, Reloader) = if let Some(otel_config) = config.opentelemetry
355    {
356        opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
357
358        // Manually set up an OpenSSL-backed, h2, proxied `Channel`,
359        // with the timeout configured according to:
360        // https://docs.rs/opentelemetry-otlp/latest/opentelemetry_otlp/struct.TonicExporterBuilder.html#method.with_channel
361        let channel = Endpoint::from_shared(otel_config.endpoint)?
362            .timeout(Duration::from_secs(
363                opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT,
364            ))
365            // TODO(guswynn): investigate if this should be non-lazy.
366            .connect_with_connector_lazy({
367                let mut http = HttpConnector::new();
368                http.enforce_http(false);
369                HttpsConnector::from((
370                    http,
371                    // This is the same as the default, plus an h2 ALPN request.
372                    tokio_native_tls::TlsConnector::from(
373                        native_tls::TlsConnector::builder()
374                            .request_alpns(&["h2"])
375                            .build()
376                            .unwrap(),
377                    ),
378                ))
379            });
380        let exporter = opentelemetry_otlp::new_exporter()
381            .tonic()
382            .with_channel(channel)
383            .with_metadata(MetadataMap::from_headers(otel_config.headers));
384        let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default()
385            .with_max_queue_size(otel_config.max_batch_queue_size)
386            .with_max_export_batch_size(otel_config.max_export_batch_size)
387            .with_max_concurrent_exports(otel_config.max_concurrent_exports)
388            .with_scheduled_delay(otel_config.batch_scheduled_delay)
389            .with_max_export_timeout(otel_config.max_export_timeout)
390            .build();
391        let tracer = opentelemetry_otlp::new_pipeline()
392            .tracing()
393            .with_trace_config(
394                trace::Config::default().with_resource(
395                    // The latter resources win, so if the user specifies
396                    // `service.name` in the configuration, it will override the
397                    // `service.name` value we configure here.
398                    Resource::new([KeyValue::new(
399                        "service.name",
400                        config.service_name.to_string(),
401                    )])
402                    .merge(&otel_config.resource),
403                ),
404            )
405            .with_exporter(exporter)
406            .with_batch_config(batch_config)
407            .install_batch(opentelemetry_sdk::runtime::Tokio)
408            .unwrap()
409            .tracer(config.service_name);
410
411        // Create our own error handler to:
412        //   1. Rate limit the number of error logs. By default the OTel library will emit
413        //      an enormous number of duplicate logs if any errors occur, one per batch
414        //      send attempt, until the error is resolved.
415        //   2. Log the errors via our tracing layer, so they are formatted consistently
416        //      with the rest of our logs, rather than the direct `eprintln` used by the
417        //      OTel library.
418        const OPENTELEMETRY_ERROR_MSG_BACKOFF_SECONDS: u64 = 30;
419        let last_log_in_epoch_seconds = AtomicU64::default();
420        opentelemetry::global::set_error_handler(move |err| {
421            let now = std::time::SystemTime::now()
422                .duration_since(std::time::SystemTime::UNIX_EPOCH)
423                .expect("Failed to get duration since Unix epoch")
424                .as_secs();
425            let last_log = last_log_in_epoch_seconds.load(Ordering::SeqCst);
426
427            if now.saturating_sub(last_log) >= OPENTELEMETRY_ERROR_MSG_BACKOFF_SECONDS {
428                if last_log_in_epoch_seconds
429                    .compare_exchange_weak(last_log, now, Ordering::Relaxed, Ordering::Relaxed)
430                    .is_err()
431                {
432                    return;
433                }
434                use crate::error::ErrorExt;
435                match err {
436                    Error::Trace(err) => {
437                        warn!("OpenTelemetry error: {}", err.display_with_causes());
438                    }
439                    // TODO(guswynn): turn off the metrics feature?
440                    Error::Metric(err) => {
441                        warn!("OpenTelemetry error: {}", err.display_with_causes());
442                    }
443                    Error::Other(err) => {
444                        warn!("OpenTelemetry error: {}", err);
445                    }
446                    _ => {
447                        warn!("unknown OpenTelemetry error");
448                    }
449                }
450            }
451        })
452        .expect("valid error handler");
453
454        let (filter, filter_handle) = reload::Layer::new({
455            let mut filter = otel_config.filter;
456            for directive in OPENTELEMETRY_DEFAULTS.iter() {
457                filter = filter.add_directive(directive.clone());
458            }
459            filter
460        });
461        let metrics_layer = MetricsLayer::new(&config.registry);
462        let layer = tracing_opentelemetry::layer()
463            // OpenTelemetry does not handle long-lived Spans well, and they end up continuously
464            // eating memory until OOM. So we set a max number of events that are allowed to be
465            // logged to a Span, once this max is passed, old events will get dropped
466            //
467            // TODO(parker-timmerman|guswynn): make this configurable with LaunchDarkly
468            .max_events_per_span(2048)
469            .with_tracer(tracer)
470            .and_then(metrics_layer)
471            // WARNING, ENTERING SPOOKY ZONE 2.0
472            //
473            // Notice we use `with_filter` here. `and_then` will apply the filter globally.
474            .with_filter(filter);
475        let reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
476            // Re-apply our defaults on reload.
477            for directive in &defaults {
478                filter = filter.add_directive(directive.clone());
479            }
480            Ok(filter_handle.reload(filter)?)
481        });
482        (Some(layer), reloader)
483    } else {
484        let reloader = Arc::new(|_, _| Ok(()));
485        (None, reloader)
486    };
487
488    #[cfg(feature = "tokio-console")]
489    let tokio_console_layer = if let Some(console_config) = config.tokio_console.clone() {
490        let builder = ConsoleLayer::builder()
491            .publish_interval(console_config.publish_interval)
492            .retention(console_config.retention);
493        let builder = match console_config.listen_addr {
494            SocketAddr::Inet(addr) => builder.server_addr(addr),
495            SocketAddr::Unix(addr) => {
496                let path = addr.as_pathname().unwrap().as_ref();
497                builder.server_addr(path)
498            }
499            SocketAddr::Turmoil(_) => unimplemented!(),
500        };
501        Some(builder.spawn())
502    } else {
503        None
504    };
505
506    let (sentry_layer, sentry_reloader): (_, DirectiveReloader) =
507        if let Some(sentry_config) = config.sentry {
508            let guard = sentry::init((
509                sentry_config.dsn,
510                sentry::ClientOptions {
511                    attach_stacktrace: true,
512                    release: Some(format!("materialize@{0}", config.build_version).into()),
513                    environment: sentry_config.environment.map(Into::into),
514                    ..Default::default()
515                },
516            ));
517
518            // Forgetting the guard ensures that the Sentry transport won't shut down for the
519            // lifetime of the process.
520            std::mem::forget(guard);
521
522            sentry::configure_scope(|scope| {
523                scope.set_tag("service_name", config.service_name);
524                scope.set_tag("build_sha", config.build_sha.to_string());
525                for (k, v) in sentry_config.tags {
526                    scope.set_tag(&k, v);
527                }
528            });
529
530            let (filter, filter_handle) = reload::Layer::new({
531                // Please see the comment on `with_filter` below.
532                let mut filter = EnvFilter::new("info");
533                for directive in SENTRY_DEFAULTS.iter() {
534                    filter = filter.add_directive(directive.clone());
535                }
536                filter
537            });
538            let layer = sentry_tracing::layer()
539                .event_filter(sentry_config.event_filter)
540                // WARNING, ENTERING THE SPOOKY ZONE
541                //
542                // While sentry provides an event filter above that maps events to types of sentry events, its `Layer`
543                // implementation does not participate in `tracing`'s level-fast-path implementation, which depends on
544                // a hidden api (<https://github.com/tokio-rs/tracing/blob/b28c9351dd4f34ed3c7d5df88bb5c2e694d9c951/tracing-subscriber/src/layer/mod.rs#L861-L867>)
545                // which is primarily manged by filters (like below). The fast path skips verbose log
546                // (and span) levels that no layer is interested by reading a single atomic. Usually, not implementing this
547                // api means "give me everything, including `trace`, unless you attach a filter to me.
548                //
549                // The curious thing here (and a bug in tracing) is that _some configurations of our layer stack above_,
550                // if you don't have this filter can cause the fast-path to trigger, despite the fact
551                // that the sentry layer would specifically communicating that it wants to see
552                // everything. This bug appears to be related to the presence of a `reload::Layer`
553                // _around a filter, not a layer_, and guswynn is tracking investigating it here:
554                // <https://github.com/MaterializeInc/database-issues/issues/4794>. Because we don't
555                // enable a reload-able filter in CI/locally, but DO in production (the otel layer), it
556                // was once possible to trigger and rely on the fast path in CI, but not notice that it
557                // was disabled in production.
558                //
559                // The behavior of this optimization is now tested in various scenarios (in
560                // `test/tracing`). Regardless, when the upstream bug is fixed/resolved,
561                // we will continue to place this here, as the sentry layer only cares about
562                // events <= INFO, so we want to use the fast-path if no other layer
563                // is interested in high-fidelity events.
564                .with_filter(filter);
565            let reloader = Arc::new(move |defaults: Vec<Directive>| {
566                // Please see the comment on `with_filter` above.
567                let mut filter = EnvFilter::new("info");
568                // Re-apply our defaults on reload.
569                for directive in &defaults {
570                    filter = filter.add_directive(directive.clone());
571                }
572                Ok(filter_handle.reload(filter)?)
573            });
574            (Some(layer), reloader)
575        } else {
576            let reloader = Arc::new(|_| Ok(()));
577            (None, reloader)
578        };
579
580    #[cfg(feature = "capture")]
581    let capture = config.capture.map(|storage| CaptureLayer::new(&storage));
582
583    let stack = tracing_subscriber::registry();
584    let stack = stack.with(stderr_log_layer);
585    #[cfg(feature = "capture")]
586    let stack = stack.with(capture);
587    let stack = stack.with(otel_layer);
588    #[cfg(feature = "tokio-console")]
589    let stack = stack.with(tokio_console_layer);
590    let stack = stack.with(sentry_layer);
591
592    // Set the stack as a global subscriber.
593    assert!(GLOBAL_SUBSCRIBER.set(Arc::new(stack)).is_ok());
594    // Initialize the subscriber.
595    Arc::clone(GLOBAL_SUBSCRIBER.get().unwrap()).init();
596
597    #[cfg(feature = "tokio-console")]
598    if let Some(console_config) = config.tokio_console {
599        let endpoint = match console_config.listen_addr {
600            SocketAddr::Inet(addr) => format!("http://{addr}"),
601            SocketAddr::Unix(addr) => format!("file://localhost{addr}"),
602            SocketAddr::Turmoil(_) => unimplemented!(),
603        };
604        tracing::info!("starting tokio console on {endpoint}");
605    }
606
607    let handle = TracingHandle {
608        stderr_log: stderr_log_reloader,
609        opentelemetry: otel_reloader,
610        sentry: sentry_reloader,
611    };
612
613    Ok(handle)
614}
615
616/// Returns the [`Level`] of a crate from an [`EnvFilter`] by performing an
617/// exact match between `crate` and the original `EnvFilter` directive.
618pub fn crate_level(filter: &EnvFilter, crate_name: &'static str) -> Level {
619    // TODO: implement `would_enable` on `EnvFilter` or equivalent
620    // to avoid having to manually parse out the directives. This
621    // would also significantly broaden the lookups the fn is able
622    // to do (modules, spans, fields, etc).
623
624    let mut default_level = Level::ERROR;
625    // EnvFilter roundtrips through its Display fmt, so it
626    // is safe to split out its individual directives here
627    for directive in format!("{}", filter).split(',') {
628        match directive.split('=').collect::<Vec<_>>().as_slice() {
629            [target, level] => {
630                if *target == crate_name {
631                    match Level::from_str(*level) {
632                        Ok(level) => return level,
633                        Err(err) => warn!("invalid level for {}: {}", target, err),
634                    }
635                }
636            }
637            [token] => match Level::from_str(*token) {
638                Ok(level) => default_level = default_level.max(level),
639                Err(_) => {
640                    // a target without a level is interpreted as trace
641                    if *token == crate_name {
642                        default_level = default_level.max(Level::TRACE);
643                    }
644                }
645            },
646            _ => {}
647        }
648    }
649
650    default_level
651}
652
653/// A wrapper around a [`FormatEvent`] that adds an optional prefix to each
654/// event.
655#[derive(Debug)]
656pub struct PrefixFormat<F> {
657    inner: F,
658    prefix: Option<String>,
659}
660
661impl<F, C, N> FormatEvent<C, N> for PrefixFormat<F>
662where
663    C: Subscriber + for<'a> LookupSpan<'a>,
664    N: for<'a> FormatFields<'a> + 'static,
665    F: FormatEvent<C, N>,
666{
667    fn format_event(
668        &self,
669        ctx: &FmtContext<'_, C, N>,
670        mut writer: Writer<'_>,
671        event: &Event<'_>,
672    ) -> std::fmt::Result {
673        match &self.prefix {
674            None => self.inner.format_event(ctx, writer, event)?,
675            Some(prefix) => {
676                let mut prefix = yansi::Paint::new(prefix);
677                if writer.has_ansi_escapes() {
678                    prefix = prefix.bold();
679                }
680                write!(writer, "{}: ", prefix)?;
681                self.inner.format_event(ctx, writer, event)?;
682            }
683        }
684        Ok(())
685    }
686}
687
688/// An OpenTelemetry context.
689///
690/// Allows associating [`tracing`] spans across task or thread boundaries.
691#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
692pub struct OpenTelemetryContext {
693    inner: BTreeMap<String, String>,
694}
695
696impl OpenTelemetryContext {
697    /// Attaches this `Context` to the current [`tracing`] span,
698    /// as its parent.
699    ///
700    /// If there is not enough information in this `OpenTelemetryContext`
701    /// to create a context, then the current thread's `Context` is used
702    /// defaulting to the default `Context`.
703    pub fn attach_as_parent(&self) {
704        let parent_cx = global::get_text_map_propagator(|prop| prop.extract(self));
705        tracing::Span::current().set_parent(parent_cx);
706    }
707
708    /// Attaches this `Context` to the given [`tracing`] Span, as its parent.
709    /// as its parent.
710    ///
711    /// If there is not enough information in this `OpenTelemetryContext`
712    /// to create a context, then the current thread's `Context` is used
713    /// defaulting to the default `Context`.
714    pub fn attach_as_parent_to(&self, span: &Span) {
715        let parent_cx = global::get_text_map_propagator(|prop| prop.extract(self));
716        span.set_parent(parent_cx);
717    }
718
719    /// Obtains a `Context` from the current [`tracing`] span.
720    pub fn obtain() -> Self {
721        let mut context = Self::empty();
722        global::get_text_map_propagator(|propagator| {
723            propagator.inject_context(&tracing::Span::current().context(), &mut context)
724        });
725
726        context
727    }
728
729    /// Obtains an empty `Context`.
730    pub fn empty() -> Self {
731        Self {
732            inner: BTreeMap::new(),
733        }
734    }
735}
736
737impl Extractor for OpenTelemetryContext {
738    fn get(&self, key: &str) -> Option<&str> {
739        self.inner.get(&key.to_lowercase()).map(|v| v.as_str())
740    }
741
742    fn keys(&self) -> Vec<&str> {
743        self.inner.keys().map(|k| k.as_str()).collect::<Vec<_>>()
744    }
745}
746
747impl Injector for OpenTelemetryContext {
748    fn set(&mut self, key: &str, value: String) {
749        self.inner.insert(key.to_lowercase(), value);
750    }
751}
752
753impl From<OpenTelemetryContext> for BTreeMap<String, String> {
754    fn from(ctx: OpenTelemetryContext) -> Self {
755        ctx.inner
756    }
757}
758
759impl From<BTreeMap<String, String>> for OpenTelemetryContext {
760    fn from(map: BTreeMap<String, String>) -> Self {
761        Self { inner: map }
762    }
763}
764
765struct MetricsLayer {
766    on_close: IntCounter,
767}
768
769impl MetricsLayer {
770    fn new(registry: &MetricsRegistry) -> Self {
771        MetricsLayer {
772            on_close: registry.register(metric!(
773                name: "mz_otel_on_close",
774                help: "count of on_close events sent to otel",
775            )),
776        }
777    }
778}
779
780impl<S: tracing::Subscriber> Layer<S> for MetricsLayer {
781    fn on_close(&self, _id: tracing::span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
782        self.on_close.inc()
783    }
784}
785
786#[cfg(test)]
787mod tests {
788    use std::str::FromStr;
789    use tracing::Level;
790    use tracing_subscriber::filter::{EnvFilter, LevelFilter, Targets};
791
792    #[crate::test]
793    fn overriding_targets() {
794        let user_defined = Targets::new().with_target("my_crate", Level::INFO);
795
796        let default = Targets::new().with_target("my_crate", LevelFilter::OFF);
797        assert!(!default.would_enable("my_crate", &Level::INFO));
798
799        // The user_defined filters should override the default
800        let filters = Targets::new()
801            .with_targets(default)
802            .with_targets(user_defined);
803        assert!(filters.would_enable("my_crate", &Level::INFO));
804    }
805
806    #[crate::test]
807    fn crate_level() {
808        // target=level directives only. should default to ERROR if unspecified
809        let filter = EnvFilter::from_str("abc=trace,def=debug").expect("valid");
810        assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
811        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
812        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
813        assert_eq!(
814            super::crate_level(&filter, "abc::doesnt::exist"),
815            Level::ERROR
816        );
817        assert_eq!(super::crate_level(&filter, "doesnt::exist"), Level::ERROR);
818
819        // add in a global default
820        let filter = EnvFilter::from_str("abc=trace,def=debug,info").expect("valid");
821        assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
822        assert_eq!(
823            super::crate_level(&filter, "abc::doesnt:exist"),
824            Level::INFO
825        );
826        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
827        assert_eq!(super::crate_level(&filter, "nan"), Level::INFO);
828
829        // a directive with mod path doesn't match the top-level crate
830        let filter = EnvFilter::from_str("abc::def::ghi=trace,debug").expect("valid");
831        assert_eq!(super::crate_level(&filter, "abc"), Level::DEBUG);
832        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
833        assert_eq!(
834            super::crate_level(&filter, "gets_the_default"),
835            Level::DEBUG
836        );
837
838        // directives with spans and fields don't match the top-level crate
839        let filter =
840            EnvFilter::from_str("abc[s]=trace,def[s{g=h}]=debug,[{s2}]=debug,info").expect("valid");
841        assert_eq!(super::crate_level(&filter, "abc"), Level::INFO);
842        assert_eq!(super::crate_level(&filter, "def"), Level::INFO);
843        assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
844
845        // a bare target without a level is taken as trace
846        let filter = EnvFilter::from_str("abc,info").expect("valid");
847        assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
848        assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
849        // the contract of `crate_level` is that it only matches top-level crates.
850        // if we had a proper EnvFilter::would_match impl, this assertion should
851        // be Level::TRACE
852        assert_eq!(super::crate_level(&filter, "abc::def"), Level::INFO);
853    }
854}