Skip to main content

mz_ore/
tracing.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Tracing utilities.
17//!
18//! This module contains application tracing utilities built on top of the
19//! [`tracing`] and [`opentelemetry`] libraries. The key exports are:
20//!
21//!  * The **[`configure`]** function, which configures the `tracing` and
22//!    `opentelemetry` crates with sensible defaults and should be called during
23//!    initialization of every Materialize binary.
24//!
25//!  * The **[`OpenTelemetryContext`]** type, which carries a tracing span
26//!    across thread or task boundaries within a process.
27
28use std::collections::BTreeMap;
29use std::hash::{Hash, Hasher};
30use std::io;
31use std::io::IsTerminal;
32use std::str::FromStr;
33use std::sync::LazyLock;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::{Arc, Mutex, OnceLock};
36use std::time::Duration;
37
38#[cfg(feature = "tokio-console")]
39use console_subscriber::ConsoleLayer;
40use derivative::Derivative;
41use http::HeaderMap;
42use hyper_tls::HttpsConnector;
43use hyper_util::client::legacy::connect::HttpConnector;
44use opentelemetry::propagation::{Extractor, Injector};
45use opentelemetry::trace::TracerProvider;
46use opentelemetry::{KeyValue, global};
47use opentelemetry_otlp::WithTonicConfig;
48use opentelemetry_sdk::propagation::TraceContextPropagator;
49use opentelemetry_sdk::runtime::Tokio;
50use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
51use opentelemetry_sdk::{Resource, trace};
52use prometheus::IntCounter;
53use tonic::metadata::MetadataMap;
54use tonic::transport::Endpoint;
55use tracing::{Event, Level, Span, Subscriber, warn};
56#[cfg(feature = "capture")]
57use tracing_capture::{CaptureLayer, SharedStorage};
58use tracing_opentelemetry::OpenTelemetrySpanExt;
59use tracing_subscriber::filter::Directive;
60use tracing_subscriber::fmt::format::{Writer, format};
61use tracing_subscriber::fmt::{self, FmtContext, FormatEvent, FormatFields};
62use tracing_subscriber::layer::{Layer, SubscriberExt};
63use tracing_subscriber::registry::LookupSpan;
64use tracing_subscriber::util::SubscriberInitExt;
65use tracing_subscriber::{EnvFilter, Registry, reload};
66
67use crate::metric;
68use crate::metrics::MetricsRegistry;
69#[cfg(feature = "tokio-console")]
70use crate::netio::SocketAddr;
71use crate::now::{EpochMillis, NowFn, SYSTEM_TIME};
72
73/// Application tracing configuration.
74///
75/// See the [`configure`] function for details.
76#[derive(Derivative)]
77#[derivative(Debug)]
78pub struct TracingConfig<F> {
79    /// The name of the service.
80    pub service_name: &'static str,
81    /// Configuration of the stderr log.
82    pub stderr_log: StderrLogConfig,
83    /// Optional configuration of the [`opentelemetry`] library.
84    pub opentelemetry: Option<OpenTelemetryConfig>,
85    /// Optional configuration for the [Tokio console] integration.
86    ///
87    /// [Tokio console]: https://github.com/tokio-rs/console
88    #[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
89    #[cfg(feature = "tokio-console")]
90    pub tokio_console: Option<TokioConsoleConfig>,
91    /// Optional configuration for capturing spans during tests.
92    #[cfg(feature = "capture")]
93    #[derivative(Debug = "ignore")]
94    pub capture: Option<SharedStorage>,
95    /// Optional Sentry configuration.
96    pub sentry: Option<SentryConfig<F>>,
97    /// The version of this build of the service.
98    pub build_version: &'static str,
99    /// The commit SHA of this build of the service.
100    pub build_sha: &'static str,
101    /// Registry for prometheus metrics.
102    pub registry: MetricsRegistry,
103}
104
105/// Configures Sentry reporting.
106#[derive(Debug, Clone)]
107pub struct SentryConfig<F> {
108    /// Sentry data source name to submit events to.
109    pub dsn: String,
110    /// The environment name to report to Sentry.
111    ///
112    /// If unset, the Sentry SDK will attempt to read the value from the
113    /// `SENTRY_ENVIRONMENT` environment variable.
114    pub environment: Option<String>,
115    /// Additional tags to include on each Sentry event/exception.
116    pub tags: BTreeMap<String, String>,
117    /// A filter that classifies events before sending them to Sentry.
118    pub event_filter: F,
119}
120
121/// Configures the stderr log.
122#[derive(Debug)]
123pub struct StderrLogConfig {
124    /// The format in which to emit messages.
125    pub format: StderrLogFormat,
126    /// A filter which determines which events are emitted to the log.
127    pub filter: EnvFilter,
128}
129
130/// Specifies the format of a stderr log message.
131#[derive(Debug, Clone)]
132pub enum StderrLogFormat {
133    /// Format as human readable, optionally colored text.
134    ///
135    /// Best suited for direct human consumption in a terminal.
136    Text {
137        /// An optional prefix for each log message.
138        prefix: Option<String>,
139    },
140    /// Format as JSON (in reality, JSONL).
141    ///
142    /// Best suited for ingestion in structured logging aggregators.
143    Json,
144}
145
146/// Configuration for the [`opentelemetry`] library.
147#[derive(Debug)]
148pub struct OpenTelemetryConfig {
149    /// The [OTLP/HTTP] endpoint to export OpenTelemetry data to.
150    ///
151    /// [OTLP/HTTP]: https://github.com/open-telemetry/opentelemetry-specification/blob/b13c1648bae16323868a5caf614bc10c917cc6ca/specification/protocol/otlp.md#otlphttp
152    pub endpoint: String,
153    /// Additional headers to send with every request to the endpoint.
154    pub headers: HeaderMap,
155    /// A filter which determines which events are exported.
156    pub filter: EnvFilter,
157    /// How many spans can be queued before dropping.
158    pub max_batch_queue_size: usize,
159    /// How many spans to process in a single batch
160    pub max_export_batch_size: usize,
161    /// How many concurrent export tasks to allow.
162    /// More tasks can lead to more memory consumed by the exporter.
163    pub max_concurrent_exports: usize,
164    /// Delay between consecutive batch exports.
165    pub batch_scheduled_delay: Duration,
166    /// How long to wait for a batch to be sent before dropping it.
167    pub max_export_timeout: Duration,
168    /// `opentelemetry::sdk::resource::Resource` to include with all
169    /// traces.
170    pub resource: Resource,
171}
172
173/// Configuration of the [Tokio console] integration.
174///
175/// [Tokio console]: https://github.com/tokio-rs/console
176#[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
177#[cfg(feature = "tokio-console")]
178#[derive(Debug, Clone)]
179pub struct TokioConsoleConfig {
180    /// The address on which to listen for Tokio console connections.
181    ///
182    /// See [`console_subscriber::Builder::server_addr`].
183    pub listen_addr: SocketAddr,
184    /// How frequently to publish updates to clients.
185    ///
186    /// See [`console_subscriber::Builder::publish_interval`].
187    pub publish_interval: Duration,
188    /// How long data is retained for completed tasks.
189    ///
190    /// See [`console_subscriber::Builder::retention`].
191    pub retention: Duration,
192}
193
194type Reloader = Arc<dyn Fn(EnvFilter, Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
195type DirectiveReloader = Arc<dyn Fn(Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
196
197/// A handle to the tracing infrastructure configured with [`configure`].
198#[derive(Clone)]
199pub struct TracingHandle {
200    stderr_log: Reloader,
201    opentelemetry: Reloader,
202    sentry: DirectiveReloader,
203}
204
205impl TracingHandle {
206    /// Creates a inoperative tracing handle.
207    ///
208    /// Primarily useful in tests.
209    pub fn disabled() -> TracingHandle {
210        TracingHandle {
211            stderr_log: Arc::new(|_, _| Ok(())),
212            opentelemetry: Arc::new(|_, _| Ok(())),
213            sentry: Arc::new(|_| Ok(())),
214        }
215    }
216
217    /// Dynamically reloads the stderr log filter.
218    pub fn reload_stderr_log_filter(
219        &self,
220        filter: EnvFilter,
221        defaults: Vec<Directive>,
222    ) -> Result<(), anyhow::Error> {
223        (self.stderr_log)(filter, defaults)
224    }
225
226    /// Dynamically reloads the OpenTelemetry log filter.
227    pub fn reload_opentelemetry_filter(
228        &self,
229        filter: EnvFilter,
230        defaults: Vec<Directive>,
231    ) -> Result<(), anyhow::Error> {
232        (self.opentelemetry)(filter, defaults)
233    }
234
235    /// Dynamically reloads the additional sentry directives.
236    pub fn reload_sentry_directives(&self, defaults: Vec<Directive>) -> Result<(), anyhow::Error> {
237        (self.sentry)(defaults)
238    }
239}
240
241impl std::fmt::Debug for TracingHandle {
242    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
243        f.debug_struct("TracingHandle").finish_non_exhaustive()
244    }
245}
246
247// Note that the following defaults are used on startup, regardless of the
248// parameters in LaunchDarkly. If we need to, we can add cli flags to control
249// then going forward.
250
251/// By default we turn off tracing from the following crates, because they
252/// have error spans which are noisy.
253///
254/// Note: folks should feel free to add more crates here if we find more
255/// with long lived Spans.
256pub const LOGGING_DEFAULTS_STR: [&str; 2] = [
257    "kube_client::client::builder=off",
258    // aws_config is very noisy at the INFO level by default.
259    // It logs every time it successfully loads credentials.
260    "aws_config::profile::credentials=off",
261];
262/// Same as [`LOGGING_DEFAULTS_STR`], but structured as [`Directive`]s.
263pub static LOGGING_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
264    LOGGING_DEFAULTS_STR
265        .into_iter()
266        .map(|directive| Directive::from_str(directive).expect("valid directive"))
267        .collect()
268});
269/// By default we turn off tracing from the following crates, because they
270/// have long-lived Spans, which OpenTelemetry does not handle well.
271///
272/// Note: folks should feel free to add more crates here if we find more
273/// with long lived Spans.
274pub const OPENTELEMETRY_DEFAULTS_STR: [&str; 2] = ["h2=off", "hyper=off"];
275/// Same as [`OPENTELEMETRY_DEFAULTS_STR`], but structured as [`Directive`]s.
276pub static OPENTELEMETRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
277    OPENTELEMETRY_DEFAULTS_STR
278        .into_iter()
279        .map(|directive| Directive::from_str(directive).expect("valid directive"))
280        .collect()
281});
282
283/// By default we turn off tracing from the following crates, because they
284/// have error spans which are noisy.
285pub const SENTRY_DEFAULTS_STR: [&str; 2] =
286    ["kube_client::client::builder=off", "mysql_async::conn=off"];
287/// Same as [`SENTRY_DEFAULTS_STR`], but structured as [`Directive`]s.
288pub static SENTRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
289    SENTRY_DEFAULTS_STR
290        .into_iter()
291        .map(|directive| Directive::from_str(directive).expect("valid directive"))
292        .collect()
293});
294
295/// The [`GLOBAL_SUBSCRIBER`] type.
296type GlobalSubscriber = Arc<dyn Subscriber + Send + Sync + 'static>;
297
298/// An [`Arc`] of the tracing [`Subscriber`] constructed and initialized in
299/// [`configure`]. The value is written when [`configure`] runs.
300pub static GLOBAL_SUBSCRIBER: OnceLock<GlobalSubscriber> = OnceLock::new();
301
302/// Enables application tracing via the [`tracing`] and [`opentelemetry`]
303/// libraries.
304///
305/// The `tracing` library is configured to emit events as textual log lines to
306/// stderr. [`StderrLogConfig`] offer a small degree of control over this
307/// behavior.
308///
309/// If the `opentelemetry` parameter is `Some`, the `tracing` library is
310/// additionally configured to export events to an observability backend, like
311/// [Jaeger] or [Honeycomb].
312///
313/// The `tokio_console` parameter enables integration with the [Tokio console].
314/// When enabled, `tracing` events are collected and made available to the Tokio
315/// console via a server running on port
316///
317/// [Jaeger]: https://jaegertracing.io
318/// [Honeycomb]: https://www.honeycomb.io
319/// [Tokio console]: https://github.com/tokio-rs/console
320// Setting up OpenTelemetry in the background requires we are in a Tokio runtime
321// context, hence the `async`.
322#[allow(clippy::unused_async)]
323pub async fn configure<F>(config: TracingConfig<F>) -> Result<TracingHandle, anyhow::Error>
324where
325    F: Fn(&tracing::Metadata<'_>) -> sentry_tracing::EventFilter + Send + Sync + 'static,
326{
327    let stderr_log_layer: Box<dyn Layer<Registry> + Send + Sync> = match config.stderr_log.format {
328        StderrLogFormat::Text { prefix } => {
329            // See: https://no-color.org/
330            let no_color = std::env::var_os("NO_COLOR").unwrap_or_else(|| "".into()) != "";
331            Box::new(
332                fmt::layer()
333                    .with_writer(io::stderr)
334                    .event_format(PrefixFormat {
335                        inner: format(),
336                        prefix,
337                    })
338                    .with_ansi(!no_color && io::stderr().is_terminal()),
339            )
340        }
341        StderrLogFormat::Json => Box::new(
342            fmt::layer()
343                .with_writer(io::stderr)
344                .json()
345                .with_current_span(true),
346        ),
347    };
348    let (stderr_log_filter, stderr_log_filter_reloader) = reload::Layer::new({
349        let mut filter = config.stderr_log.filter;
350        for directive in LOGGING_DEFAULTS.iter() {
351            filter = filter.add_directive(directive.clone());
352        }
353        filter
354    });
355    // Add rate limiting for OpenTelemetry internal logs to prevent log spam
356    // when there are issues with the OpenTelemetry pipeline (e.g., channel full,
357    // connection errors). This only affects logs from "opentelemetry*" targets.
358    let otel_rate_limit_filter = OpenTelemetryRateLimitingFilter::new(Duration::from_secs(
359        OPENTELEMETRY_RATE_LIMIT_BACKOFF_SECS,
360    ));
361    // IMPORTANT: The order matters here. The outer filter's `max_level_hint()` is used
362    // to determine the global tracing level. The `otel_rate_limit_filter` doesn't provide
363    // a `max_level_hint()` (defaults to TRACE), so the `stderr_log_filter` (EnvFilter)
364    // must be the outer filter to ensure the correct max level is reported.
365    let stderr_log_layer = stderr_log_layer
366        .with_filter(otel_rate_limit_filter)
367        .with_filter(stderr_log_filter);
368    let stderr_log_reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
369        for directive in &defaults {
370            filter = filter.add_directive(directive.clone());
371        }
372        Ok(stderr_log_filter_reloader.reload(filter)?)
373    });
374
375    let (otel_layer, otel_reloader): (_, Reloader) = if let Some(otel_config) = config.opentelemetry
376    {
377        opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
378
379        // Manually set up an OpenSSL-backed, h2, proxied `Channel`,
380        // with the timeout configured according to:
381        // https://docs.rs/opentelemetry-otlp/latest/opentelemetry_otlp/struct.TonicExporterBuilder.html#method.with_channel
382        let channel = Endpoint::from_shared(otel_config.endpoint)?
383            .timeout(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT)
384            // TODO(guswynn): investigate if this should be non-lazy.
385            .connect_with_connector_lazy({
386                let mut http = HttpConnector::new();
387                http.enforce_http(false);
388                HttpsConnector::from((
389                    http,
390                    // This is the same as the default, plus an h2 ALPN request.
391                    tokio_native_tls::TlsConnector::from(
392                        native_tls::TlsConnector::builder()
393                            .request_alpns(&["h2"])
394                            .build()
395                            .unwrap(),
396                    ),
397                ))
398            });
399        let exporter = opentelemetry_otlp::SpanExporter::builder()
400            .with_tonic()
401            .with_channel(channel)
402            .with_metadata(MetadataMap::from_headers(otel_config.headers))
403            .build()?;
404        let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default()
405            .with_max_queue_size(otel_config.max_batch_queue_size)
406            .with_max_export_batch_size(otel_config.max_export_batch_size)
407            .with_max_concurrent_exports(otel_config.max_concurrent_exports)
408            .with_scheduled_delay(otel_config.batch_scheduled_delay)
409            .with_max_export_timeout(otel_config.max_export_timeout)
410            .build();
411        let batch_span_processor = BatchSpanProcessor::builder(exporter, Tokio)
412            .with_batch_config(batch_config)
413            .build();
414        let tracer = trace::SdkTracerProvider::builder()
415            .with_resource(
416                Resource::builder()
417                    .with_service_name(config.service_name.to_string())
418                    .with_attributes(
419                        otel_config
420                            .resource
421                            .iter()
422                            .map(|(k, v)| KeyValue::new(k.clone(), v.clone())),
423                        // TODO handle schema url?
424                    )
425                    .build(),
426            )
427            .with_span_processor(batch_span_processor)
428            .with_max_events_per_span(2048)
429            .build()
430            .tracer(config.service_name);
431
432        let (filter, filter_handle) = reload::Layer::new({
433            let mut filter = otel_config.filter;
434            for directive in OPENTELEMETRY_DEFAULTS.iter() {
435                filter = filter.add_directive(directive.clone());
436            }
437            filter
438        });
439        let metrics_layer = MetricsLayer::new(&config.registry);
440        let layer = tracing_opentelemetry::layer()
441            // OpenTelemetry does not handle long-lived Spans well, and they end up continuously
442            // eating memory until OOM. So we set a max number of events that are allowed to be
443            // logged to a Span, once this max is passed, old events will get dropped
444            //
445            // TODO(parker-timmerman|guswynn): make this configurable with LaunchDarkly
446            .with_tracer(tracer)
447            .and_then(metrics_layer)
448            // WARNING, ENTERING SPOOKY ZONE 2.0
449            //
450            // Notice we use `with_filter` here. `and_then` will apply the filter globally.
451            .with_filter(filter);
452        let reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
453            // Re-apply our defaults on reload.
454            for directive in &defaults {
455                filter = filter.add_directive(directive.clone());
456            }
457            Ok(filter_handle.reload(filter)?)
458        });
459        (Some(layer), reloader)
460    } else {
461        let reloader = Arc::new(|_, _| Ok(()));
462        (None, reloader)
463    };
464
465    #[cfg(feature = "tokio-console")]
466    let tokio_console_layer = if let Some(console_config) = config.tokio_console.clone() {
467        let builder = ConsoleLayer::builder()
468            .publish_interval(console_config.publish_interval)
469            .retention(console_config.retention);
470        let builder = match console_config.listen_addr {
471            SocketAddr::Inet(addr) => builder.server_addr(addr),
472            SocketAddr::Unix(addr) => {
473                let path = addr.as_pathname().unwrap().as_ref();
474                builder.server_addr(path)
475            }
476            SocketAddr::Turmoil(_) => unimplemented!(),
477        };
478        Some(builder.spawn())
479    } else {
480        None
481    };
482
483    let (sentry_layer, sentry_reloader): (_, DirectiveReloader) =
484        if let Some(sentry_config) = config.sentry {
485            let guard = sentry::init((
486                sentry_config.dsn,
487                sentry::ClientOptions {
488                    attach_stacktrace: true,
489                    release: Some(format!("materialize@{0}", config.build_version).into()),
490                    environment: sentry_config.environment.map(Into::into),
491                    ..Default::default()
492                },
493            ));
494
495            // Forgetting the guard ensures that the Sentry transport won't shut down for the
496            // lifetime of the process.
497            std::mem::forget(guard);
498
499            sentry::configure_scope(|scope| {
500                scope.set_tag("service_name", config.service_name);
501                scope.set_tag("build_sha", config.build_sha.to_string());
502                for (k, v) in sentry_config.tags {
503                    scope.set_tag(&k, v);
504                }
505            });
506
507            let (filter, filter_handle) = reload::Layer::new({
508                // Please see the comment on `with_filter` below.
509                let mut filter = EnvFilter::new("info");
510                for directive in SENTRY_DEFAULTS.iter() {
511                    filter = filter.add_directive(directive.clone());
512                }
513                filter
514            });
515            let layer = sentry_tracing::layer()
516                .event_filter(sentry_config.event_filter)
517                // WARNING, ENTERING THE SPOOKY ZONE
518                //
519                // While sentry provides an event filter above that maps events to types of sentry events, its `Layer`
520                // implementation does not participate in `tracing`'s level-fast-path implementation, which depends on
521                // a hidden api (<https://github.com/tokio-rs/tracing/blob/b28c9351dd4f34ed3c7d5df88bb5c2e694d9c951/tracing-subscriber/src/layer/mod.rs#L861-L867>)
522                // which is primarily manged by filters (like below). The fast path skips verbose log
523                // (and span) levels that no layer is interested by reading a single atomic. Usually, not implementing this
524                // api means "give me everything, including `trace`, unless you attach a filter to me.
525                //
526                // The curious thing here (and a bug in tracing) is that _some configurations of our layer stack above_,
527                // if you don't have this filter can cause the fast-path to trigger, despite the fact
528                // that the sentry layer would specifically communicating that it wants to see
529                // everything. This bug appears to be related to the presence of a `reload::Layer`
530                // _around a filter, not a layer_, and guswynn is tracking investigating it here:
531                // <https://github.com/MaterializeInc/database-issues/issues/4794>. Because we don't
532                // enable a reload-able filter in CI/locally, but DO in production (the otel layer), it
533                // was once possible to trigger and rely on the fast path in CI, but not notice that it
534                // was disabled in production.
535                //
536                // The behavior of this optimization is now tested in various scenarios (in
537                // `test/tracing`). Regardless, when the upstream bug is fixed/resolved,
538                // we will continue to place this here, as the sentry layer only cares about
539                // events <= INFO, so we want to use the fast-path if no other layer
540                // is interested in high-fidelity events.
541                .with_filter(filter);
542            let reloader = Arc::new(move |defaults: Vec<Directive>| {
543                // Please see the comment on `with_filter` above.
544                let mut filter = EnvFilter::new("info");
545                // Re-apply our defaults on reload.
546                for directive in &defaults {
547                    filter = filter.add_directive(directive.clone());
548                }
549                Ok(filter_handle.reload(filter)?)
550            });
551            (Some(layer), reloader)
552        } else {
553            let reloader = Arc::new(|_| Ok(()));
554            (None, reloader)
555        };
556
557    #[cfg(feature = "capture")]
558    let capture = config.capture.map(|storage| CaptureLayer::new(&storage));
559
560    let stack = tracing_subscriber::registry();
561    let stack = stack.with(stderr_log_layer);
562    #[cfg(feature = "capture")]
563    let stack = stack.with(capture);
564    let stack = stack.with(otel_layer);
565    #[cfg(feature = "tokio-console")]
566    let stack = stack.with(tokio_console_layer);
567    let stack = stack.with(sentry_layer);
568
569    // Set the stack as a global subscriber.
570    assert!(GLOBAL_SUBSCRIBER.set(Arc::new(stack)).is_ok());
571    // Initialize the subscriber.
572    Arc::clone(GLOBAL_SUBSCRIBER.get().unwrap()).init();
573
574    #[cfg(feature = "tokio-console")]
575    if let Some(console_config) = config.tokio_console {
576        let endpoint = match console_config.listen_addr {
577            SocketAddr::Inet(addr) => format!("http://{addr}"),
578            SocketAddr::Unix(addr) => format!("file://localhost{addr}"),
579            SocketAddr::Turmoil(_) => unimplemented!(),
580        };
581        tracing::info!("starting tokio console on {endpoint}");
582    }
583
584    let handle = TracingHandle {
585        stderr_log: stderr_log_reloader,
586        opentelemetry: otel_reloader,
587        sentry: sentry_reloader,
588    };
589
590    Ok(handle)
591}
592
593/// Returns the [`Level`] of a crate from an [`EnvFilter`] by performing an
594/// exact match between `crate` and the original `EnvFilter` directive.
595pub fn crate_level(filter: &EnvFilter, crate_name: &'static str) -> Level {
596    // TODO: implement `would_enable` on `EnvFilter` or equivalent
597    // to avoid having to manually parse out the directives. This
598    // would also significantly broaden the lookups the fn is able
599    // to do (modules, spans, fields, etc).
600
601    let mut default_level = Level::ERROR;
602    // EnvFilter roundtrips through its Display fmt, so it
603    // is safe to split out its individual directives here
604    for directive in format!("{}", filter).split(',') {
605        match directive.split('=').collect::<Vec<_>>().as_slice() {
606            [target, level] => {
607                if *target == crate_name {
608                    match Level::from_str(*level) {
609                        Ok(level) => return level,
610                        Err(err) => warn!("invalid level for {}: {}", target, err),
611                    }
612                }
613            }
614            [token] => match Level::from_str(*token) {
615                Ok(level) => default_level = default_level.max(level),
616                Err(_) => {
617                    // a target without a level is interpreted as trace
618                    if *token == crate_name {
619                        default_level = default_level.max(Level::TRACE);
620                    }
621                }
622            },
623            _ => {}
624        }
625    }
626
627    default_level
628}
629
630/// A wrapper around a [`FormatEvent`] that adds an optional prefix to each
631/// event.
632#[derive(Debug)]
633pub struct PrefixFormat<F> {
634    inner: F,
635    prefix: Option<String>,
636}
637
638impl<F, C, N> FormatEvent<C, N> for PrefixFormat<F>
639where
640    C: Subscriber + for<'a> LookupSpan<'a>,
641    N: for<'a> FormatFields<'a> + 'static,
642    F: FormatEvent<C, N>,
643{
644    fn format_event(
645        &self,
646        ctx: &FmtContext<'_, C, N>,
647        mut writer: Writer<'_>,
648        event: &Event<'_>,
649    ) -> std::fmt::Result {
650        match &self.prefix {
651            None => self.inner.format_event(ctx, writer, event)?,
652            Some(prefix) => {
653                let mut prefix = yansi::Paint::new(prefix);
654                if writer.has_ansi_escapes() {
655                    prefix = prefix.bold();
656                }
657                write!(writer, "{}: ", prefix)?;
658                self.inner.format_event(ctx, writer, event)?;
659            }
660        }
661        Ok(())
662    }
663}
664
665/// An OpenTelemetry context.
666///
667/// Allows associating [`tracing`] spans across task or thread boundaries.
668#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
669pub struct OpenTelemetryContext {
670    inner: BTreeMap<String, String>,
671}
672
673impl OpenTelemetryContext {
674    /// Attaches this `Context` to the current [`tracing`] span,
675    /// as its parent.
676    ///
677    /// If there is not enough information in this `OpenTelemetryContext`
678    /// to create a context, then the current thread's `Context` is used
679    /// defaulting to the default `Context`.
680    pub fn attach_as_parent(&self) {
681        self.attach_as_parent_to(&tracing::Span::current())
682    }
683
684    /// Attaches this `Context` to the given [`tracing`] Span, as its parent.
685    ///
686    /// If there is not enough information in this `OpenTelemetryContext`
687    /// to create a context, then the current thread's `Context` is used
688    /// defaulting to the default `Context`.
689    pub fn attach_as_parent_to(&self, span: &Span) {
690        let parent_cx = global::get_text_map_propagator(|prop| prop.extract(self));
691        let _ = span.set_parent(parent_cx);
692    }
693
694    /// Obtains a `Context` from the current [`tracing`] span.
695    pub fn obtain() -> Self {
696        let mut context = Self::empty();
697        global::get_text_map_propagator(|propagator| {
698            propagator.inject_context(&tracing::Span::current().context(), &mut context)
699        });
700
701        context
702    }
703
704    /// Obtains an empty `Context`.
705    pub fn empty() -> Self {
706        Self {
707            inner: BTreeMap::new(),
708        }
709    }
710}
711
712impl Extractor for OpenTelemetryContext {
713    fn get(&self, key: &str) -> Option<&str> {
714        self.inner.get(&key.to_lowercase()).map(|v| v.as_str())
715    }
716
717    fn keys(&self) -> Vec<&str> {
718        self.inner.keys().map(|k| k.as_str()).collect::<Vec<_>>()
719    }
720}
721
722impl Injector for OpenTelemetryContext {
723    fn set(&mut self, key: &str, value: String) {
724        self.inner.insert(key.to_lowercase(), value);
725    }
726}
727
728impl From<OpenTelemetryContext> for BTreeMap<String, String> {
729    fn from(ctx: OpenTelemetryContext) -> Self {
730        ctx.inner
731    }
732}
733
734impl From<BTreeMap<String, String>> for OpenTelemetryContext {
735    fn from(map: BTreeMap<String, String>) -> Self {
736        Self { inner: map }
737    }
738}
739
740struct MetricsLayer {
741    on_close: IntCounter,
742}
743
744impl MetricsLayer {
745    fn new(registry: &MetricsRegistry) -> Self {
746        MetricsLayer {
747            on_close: registry.register(metric!(
748                name: "mz_otel_on_close",
749                help: "count of on_close events sent to otel",
750            )),
751        }
752    }
753}
754
755impl<S: tracing::Subscriber> Layer<S> for MetricsLayer {
756    fn on_close(&self, _id: tracing::span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
757        self.on_close.inc()
758    }
759}
760
761/// The target prefix for OpenTelemetry internal logs.
762/// OpenTelemetry crates emit logs with targets like "opentelemetry", "opentelemetry_sdk",
763/// "opentelemetry_otlp", etc.
764const OPENTELEMETRY_TARGET_PREFIX: &str = "opentelemetry";
765
766/// Default backoff duration for rate-limiting OpenTelemetry internal logs.
767const OPENTELEMETRY_RATE_LIMIT_BACKOFF_SECS: u64 = 30;
768
769/// A rate-limiting filter that throttles OpenTelemetry internal log messages.
770///
771/// OpenTelemetry can emit a large number of duplicate error messages (e.g., when
772/// the batch processor channel is full or when there are connection issues).
773/// This filter rate-limits these messages to avoid log spam while still ensuring
774/// the errors are visible.
775///
776/// Only logs from OpenTelemetry targets (those starting with "opentelemetry") are
777/// rate-limited. All other logs pass through unchanged.
778#[derive(Debug)]
779pub struct OpenTelemetryRateLimitingFilter {
780    /// How long to suppress duplicate messages from OpenTelemetry.
781    backoff_duration: Duration,
782    /// Tracks the last time each unique message key was logged.
783    /// The key is a hash of (target, message format string).
784    last_logged: Mutex<BTreeMap<u64, EpochMillis>>,
785    /// Counter for the number of suppressed messages.
786    suppressed_count: AtomicU64,
787    /// Function to get the current time (helps with testing).
788    now_fn: NowFn,
789}
790
791impl OpenTelemetryRateLimitingFilter {
792    /// Creates a new rate-limiting filter with the specified backoff duration.
793    ///
794    /// Messages from OpenTelemetry targets will be suppressed if they occur
795    /// more frequently than once per `backoff_duration`.
796    pub fn new(backoff_duration: Duration) -> Self {
797        Self {
798            backoff_duration,
799            last_logged: Mutex::new(BTreeMap::new()),
800            suppressed_count: AtomicU64::new(0),
801            now_fn: SYSTEM_TIME.clone(),
802        }
803    }
804
805    /// Sets a custom time function for testing purposes.
806    #[cfg(test)]
807    fn with_now_fn(mut self, now_fn: NowFn) -> Self {
808        self.now_fn = now_fn;
809        self
810    }
811
812    /// Computes a hash key for the given event metadata.
813    fn compute_key(metadata: &tracing::Metadata<'_>) -> u64 {
814        let mut hasher = std::hash::DefaultHasher::new();
815        metadata.target().hash(&mut hasher);
816        metadata.name().hash(&mut hasher);
817        // Include the file and line if available for more precise deduplication
818        if let Some(file) = metadata.file() {
819            file.hash(&mut hasher);
820        }
821        if let Some(line) = metadata.line() {
822            line.hash(&mut hasher);
823        }
824        hasher.finish()
825    }
826
827    /// Returns true if the event should be logged (not rate-limited).
828    fn should_log(&self, metadata: &tracing::Metadata<'_>) -> bool {
829        // Only rate-limit OpenTelemetry internal logs
830        if !metadata.target().starts_with(OPENTELEMETRY_TARGET_PREFIX) {
831            return true;
832        }
833
834        let key = Self::compute_key(metadata);
835        let now = (self.now_fn)();
836
837        let mut last_logged = self.last_logged.lock().unwrap();
838
839        if let Some(last_time) = last_logged.get(&key) {
840            if Duration::from_millis(now - last_time) < self.backoff_duration {
841                self.suppressed_count.fetch_add(1, Ordering::Relaxed);
842                return false;
843            }
844        }
845
846        last_logged.insert(key, now);
847
848        // Periodically clean up old entries to prevent unbounded growth
849        // Keep entries that are still within the backoff window
850        if last_logged.len() > 1000 {
851            last_logged
852                .retain(|_, time| Duration::from_millis(now - *time) < self.backoff_duration);
853        }
854
855        true
856    }
857
858    /// Returns the number of messages that have been suppressed.
859    pub fn suppressed_count(&self) -> u64 {
860        self.suppressed_count.load(Ordering::Relaxed)
861    }
862}
863
864impl<S> tracing_subscriber::layer::Filter<S> for OpenTelemetryRateLimitingFilter
865where
866    S: tracing::Subscriber + for<'lookup> LookupSpan<'lookup>,
867{
868    fn enabled(
869        &self,
870        metadata: &tracing::Metadata<'_>,
871        _ctx: &tracing_subscriber::layer::Context<'_, S>,
872    ) -> bool {
873        self.should_log(metadata)
874    }
875
876    fn event_enabled(
877        &self,
878        event: &Event<'_>,
879        _ctx: &tracing_subscriber::layer::Context<'_, S>,
880    ) -> bool {
881        self.should_log(event.metadata())
882    }
883}
884
885#[cfg(test)]
886mod tests {
887    use std::str::FromStr;
888    use tracing::Level;
889    use tracing_subscriber::filter::{EnvFilter, LevelFilter, Targets};
890
891    #[crate::test]
892    fn overriding_targets() {
893        let user_defined = Targets::new().with_target("my_crate", Level::INFO);
894
895        let default = Targets::new().with_target("my_crate", LevelFilter::OFF);
896        assert!(!default.would_enable("my_crate", &Level::INFO));
897
898        // The user_defined filters should override the default
899        let filters = Targets::new()
900            .with_targets(default)
901            .with_targets(user_defined);
902        assert!(filters.would_enable("my_crate", &Level::INFO));
903    }
904
905    #[crate::test]
906    fn crate_level() {
907        // target=level directives only. should default to ERROR if unspecified
908        let filter = EnvFilter::from_str("abc=trace,def=debug").expect("valid");
909        assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
910        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
911        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
912        assert_eq!(
913            super::crate_level(&filter, "abc::doesnt::exist"),
914            Level::ERROR
915        );
916        assert_eq!(super::crate_level(&filter, "doesnt::exist"), Level::ERROR);
917
918        // add in a global default
919        let filter = EnvFilter::from_str("abc=trace,def=debug,info").expect("valid");
920        assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
921        assert_eq!(
922            super::crate_level(&filter, "abc::doesnt:exist"),
923            Level::INFO
924        );
925        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
926        assert_eq!(super::crate_level(&filter, "nan"), Level::INFO);
927
928        // a directive with mod path doesn't match the top-level crate
929        let filter = EnvFilter::from_str("abc::def::ghi=trace,debug").expect("valid");
930        assert_eq!(super::crate_level(&filter, "abc"), Level::DEBUG);
931        assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
932        assert_eq!(
933            super::crate_level(&filter, "gets_the_default"),
934            Level::DEBUG
935        );
936
937        // directives with spans and fields don't match the top-level crate
938        let filter =
939            EnvFilter::from_str("abc[s]=trace,def[s{g=h}]=debug,[{s2}]=debug,info").expect("valid");
940        assert_eq!(super::crate_level(&filter, "abc"), Level::INFO);
941        assert_eq!(super::crate_level(&filter, "def"), Level::INFO);
942        assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
943
944        // a bare target without a level is taken as trace
945        let filter = EnvFilter::from_str("abc,info").expect("valid");
946        assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
947        assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
948        // the contract of `crate_level` is that it only matches top-level crates.
949        // if we had a proper EnvFilter::would_match impl, this assertion should
950        // be Level::TRACE
951        assert_eq!(super::crate_level(&filter, "abc::def"), Level::INFO);
952    }
953
954    #[crate::test]
955    fn otel_rate_limiting_filter_backoff() {
956        use std::sync::Arc;
957        use std::sync::atomic::{AtomicU64, Ordering};
958        use std::time::Duration;
959        use tracing::Callsite;
960
961        use crate::now::NowFn;
962
963        // Create a controllable time source
964        let current_time = Arc::new(AtomicU64::new(0));
965        let time_for_closure = Arc::clone(&current_time);
966        let now_fn: NowFn = NowFn::from(move || time_for_closure.load(Ordering::SeqCst));
967
968        let filter = super::OpenTelemetryRateLimitingFilter::new(Duration::from_millis(100))
969            .with_now_fn(now_fn);
970
971        // Test that key computation is stable
972        static OTEL_CALLSITE: tracing::callsite::DefaultCallsite =
973            tracing::callsite::DefaultCallsite::new(&tracing::Metadata::new(
974                "test_event",
975                "opentelemetry_sdk::trace",
976                Level::WARN,
977                Some(file!()),
978                Some(line!()),
979                Some(module_path!()),
980                tracing::field::FieldSet::new(&[], tracing::callsite::Identifier(&OTEL_CALLSITE)),
981                tracing::metadata::Kind::EVENT,
982            ));
983        let otel_meta = OTEL_CALLSITE.metadata();
984
985        static OTHER_CALLSITE: tracing::callsite::DefaultCallsite =
986            tracing::callsite::DefaultCallsite::new(&tracing::Metadata::new(
987                "test_event",
988                "my_app::module",
989                Level::WARN,
990                Some(file!()),
991                Some(line!()),
992                Some(module_path!()),
993                tracing::field::FieldSet::new(&[], tracing::callsite::Identifier(&OTHER_CALLSITE)),
994                tracing::metadata::Kind::EVENT,
995            ));
996        let other_meta = OTHER_CALLSITE.metadata();
997
998        // Non-OpenTelemetry events should always pass through
999        assert!(filter.should_log(other_meta));
1000        assert!(filter.should_log(other_meta));
1001        assert!(filter.should_log(other_meta));
1002        assert_eq!(filter.suppressed_count(), 0);
1003
1004        // First OpenTelemetry event should pass through
1005        assert!(filter.should_log(otel_meta));
1006        assert_eq!(filter.suppressed_count(), 0);
1007
1008        // Subsequent OpenTelemetry events within backoff should be suppressed
1009        current_time.store(50, Ordering::SeqCst); // 50ms later, still within 100ms backoff
1010        assert!(!filter.should_log(otel_meta));
1011        assert!(!filter.should_log(otel_meta));
1012        assert_eq!(filter.suppressed_count(), 2);
1013
1014        // After backoff, event should pass through again
1015        current_time.store(150, Ordering::SeqCst); // 150ms later, past 100ms backoff
1016        assert!(filter.should_log(otel_meta));
1017        assert_eq!(filter.suppressed_count(), 2);
1018    }
1019}