1use std::collections::BTreeMap;
29use std::io;
30use std::io::IsTerminal;
31use std::str::FromStr;
32use std::sync::LazyLock;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::{Arc, OnceLock};
35use std::time::Duration;
36
37#[cfg(feature = "tokio-console")]
38use console_subscriber::ConsoleLayer;
39use derivative::Derivative;
40use http::HeaderMap;
41use hyper_tls::HttpsConnector;
42use hyper_util::client::legacy::connect::HttpConnector;
43use opentelemetry::global::Error;
44use opentelemetry::propagation::{Extractor, Injector};
45use opentelemetry::trace::TracerProvider;
46use opentelemetry::{KeyValue, global};
47use opentelemetry_sdk::propagation::TraceContextPropagator;
48use opentelemetry_sdk::{Resource, trace};
49use prometheus::IntCounter;
50use tonic::metadata::MetadataMap;
51use tonic::transport::Endpoint;
52use tracing::{Event, Level, Span, Subscriber, warn};
53#[cfg(feature = "capture")]
54use tracing_capture::{CaptureLayer, SharedStorage};
55use tracing_opentelemetry::OpenTelemetrySpanExt;
56use tracing_subscriber::filter::Directive;
57use tracing_subscriber::fmt::format::{Writer, format};
58use tracing_subscriber::fmt::{self, FmtContext, FormatEvent, FormatFields};
59use tracing_subscriber::layer::{Layer, SubscriberExt};
60use tracing_subscriber::registry::LookupSpan;
61use tracing_subscriber::util::SubscriberInitExt;
62use tracing_subscriber::{EnvFilter, Registry, reload};
63
64use crate::metric;
65use crate::metrics::MetricsRegistry;
66#[cfg(feature = "tokio-console")]
67use crate::netio::SocketAddr;
68
69#[derive(Derivative)]
73#[derivative(Debug)]
74pub struct TracingConfig<F> {
75 pub service_name: &'static str,
77 pub stderr_log: StderrLogConfig,
79 pub opentelemetry: Option<OpenTelemetryConfig>,
81 #[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
85 #[cfg(feature = "tokio-console")]
86 pub tokio_console: Option<TokioConsoleConfig>,
87 #[cfg(feature = "capture")]
89 #[derivative(Debug = "ignore")]
90 pub capture: Option<SharedStorage>,
91 pub sentry: Option<SentryConfig<F>>,
93 pub build_version: &'static str,
95 pub build_sha: &'static str,
97 pub registry: MetricsRegistry,
99}
100
101#[derive(Debug, Clone)]
103pub struct SentryConfig<F> {
104 pub dsn: String,
106 pub environment: Option<String>,
111 pub tags: BTreeMap<String, String>,
113 pub event_filter: F,
115}
116
117#[derive(Debug)]
119pub struct StderrLogConfig {
120 pub format: StderrLogFormat,
122 pub filter: EnvFilter,
124}
125
126#[derive(Debug, Clone)]
128pub enum StderrLogFormat {
129 Text {
133 prefix: Option<String>,
135 },
136 Json,
140}
141
142#[derive(Debug)]
144pub struct OpenTelemetryConfig {
145 pub endpoint: String,
149 pub headers: HeaderMap,
151 pub filter: EnvFilter,
153 pub max_batch_queue_size: usize,
155 pub max_export_batch_size: usize,
157 pub max_concurrent_exports: usize,
160 pub batch_scheduled_delay: Duration,
162 pub max_export_timeout: Duration,
164 pub resource: Resource,
167}
168
169#[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
173#[cfg(feature = "tokio-console")]
174#[derive(Debug, Clone)]
175pub struct TokioConsoleConfig {
176 pub listen_addr: SocketAddr,
180 pub publish_interval: Duration,
184 pub retention: Duration,
188}
189
190type Reloader = Arc<dyn Fn(EnvFilter, Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
191type DirectiveReloader = Arc<dyn Fn(Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
192
193#[derive(Clone)]
195pub struct TracingHandle {
196 stderr_log: Reloader,
197 opentelemetry: Reloader,
198 sentry: DirectiveReloader,
199}
200
201impl TracingHandle {
202 pub fn disabled() -> TracingHandle {
206 TracingHandle {
207 stderr_log: Arc::new(|_, _| Ok(())),
208 opentelemetry: Arc::new(|_, _| Ok(())),
209 sentry: Arc::new(|_| Ok(())),
210 }
211 }
212
213 pub fn reload_stderr_log_filter(
215 &self,
216 filter: EnvFilter,
217 defaults: Vec<Directive>,
218 ) -> Result<(), anyhow::Error> {
219 (self.stderr_log)(filter, defaults)
220 }
221
222 pub fn reload_opentelemetry_filter(
224 &self,
225 filter: EnvFilter,
226 defaults: Vec<Directive>,
227 ) -> Result<(), anyhow::Error> {
228 (self.opentelemetry)(filter, defaults)
229 }
230
231 pub fn reload_sentry_directives(&self, defaults: Vec<Directive>) -> Result<(), anyhow::Error> {
233 (self.sentry)(defaults)
234 }
235}
236
237impl std::fmt::Debug for TracingHandle {
238 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
239 f.debug_struct("TracingHandle").finish_non_exhaustive()
240 }
241}
242
243pub const LOGGING_DEFAULTS_STR: [&str; 1] = ["kube_client::client::builder=off"];
253pub static LOGGING_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
255 LOGGING_DEFAULTS_STR
256 .into_iter()
257 .map(|directive| Directive::from_str(directive).expect("valid directive"))
258 .collect()
259});
260pub const OPENTELEMETRY_DEFAULTS_STR: [&str; 2] = ["h2=off", "hyper=off"];
266pub static OPENTELEMETRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
268 OPENTELEMETRY_DEFAULTS_STR
269 .into_iter()
270 .map(|directive| Directive::from_str(directive).expect("valid directive"))
271 .collect()
272});
273
274pub const SENTRY_DEFAULTS_STR: [&str; 2] =
277 ["kube_client::client::builder=off", "mysql_async::conn=off"];
278pub static SENTRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
280 SENTRY_DEFAULTS_STR
281 .into_iter()
282 .map(|directive| Directive::from_str(directive).expect("valid directive"))
283 .collect()
284});
285
286type GlobalSubscriber = Arc<dyn Subscriber + Send + Sync + 'static>;
288
289pub static GLOBAL_SUBSCRIBER: OnceLock<GlobalSubscriber> = OnceLock::new();
292
293#[allow(clippy::unused_async)]
314pub async fn configure<F>(config: TracingConfig<F>) -> Result<TracingHandle, anyhow::Error>
315where
316 F: Fn(&tracing::Metadata<'_>) -> sentry_tracing::EventFilter + Send + Sync + 'static,
317{
318 let stderr_log_layer: Box<dyn Layer<Registry> + Send + Sync> = match config.stderr_log.format {
319 StderrLogFormat::Text { prefix } => {
320 let no_color = std::env::var_os("NO_COLOR").unwrap_or_else(|| "".into()) != "";
322 Box::new(
323 fmt::layer()
324 .with_writer(io::stderr)
325 .event_format(PrefixFormat {
326 inner: format(),
327 prefix,
328 })
329 .with_ansi(!no_color && io::stderr().is_terminal()),
330 )
331 }
332 StderrLogFormat::Json => Box::new(
333 fmt::layer()
334 .with_writer(io::stderr)
335 .json()
336 .with_current_span(true),
337 ),
338 };
339 let (stderr_log_filter, stderr_log_filter_reloader) = reload::Layer::new({
340 let mut filter = config.stderr_log.filter;
341 for directive in LOGGING_DEFAULTS.iter() {
342 filter = filter.add_directive(directive.clone());
343 }
344 filter
345 });
346 let stderr_log_layer = stderr_log_layer.with_filter(stderr_log_filter);
347 let stderr_log_reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
348 for directive in &defaults {
349 filter = filter.add_directive(directive.clone());
350 }
351 Ok(stderr_log_filter_reloader.reload(filter)?)
352 });
353
354 let (otel_layer, otel_reloader): (_, Reloader) = if let Some(otel_config) = config.opentelemetry
355 {
356 opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
357
358 let channel = Endpoint::from_shared(otel_config.endpoint)?
362 .timeout(Duration::from_secs(
363 opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT,
364 ))
365 .connect_with_connector_lazy({
367 let mut http = HttpConnector::new();
368 http.enforce_http(false);
369 HttpsConnector::from((
370 http,
371 tokio_native_tls::TlsConnector::from(
373 native_tls::TlsConnector::builder()
374 .request_alpns(&["h2"])
375 .build()
376 .unwrap(),
377 ),
378 ))
379 });
380 let exporter = opentelemetry_otlp::new_exporter()
381 .tonic()
382 .with_channel(channel)
383 .with_metadata(MetadataMap::from_headers(otel_config.headers));
384 let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default()
385 .with_max_queue_size(otel_config.max_batch_queue_size)
386 .with_max_export_batch_size(otel_config.max_export_batch_size)
387 .with_max_concurrent_exports(otel_config.max_concurrent_exports)
388 .with_scheduled_delay(otel_config.batch_scheduled_delay)
389 .with_max_export_timeout(otel_config.max_export_timeout)
390 .build();
391 let tracer = opentelemetry_otlp::new_pipeline()
392 .tracing()
393 .with_trace_config(
394 trace::Config::default().with_resource(
395 Resource::new([KeyValue::new(
399 "service.name",
400 config.service_name.to_string(),
401 )])
402 .merge(&otel_config.resource),
403 ),
404 )
405 .with_exporter(exporter)
406 .with_batch_config(batch_config)
407 .install_batch(opentelemetry_sdk::runtime::Tokio)
408 .unwrap()
409 .tracer(config.service_name);
410
411 const OPENTELEMETRY_ERROR_MSG_BACKOFF_SECONDS: u64 = 30;
419 let last_log_in_epoch_seconds = AtomicU64::default();
420 opentelemetry::global::set_error_handler(move |err| {
421 let now = std::time::SystemTime::now()
422 .duration_since(std::time::SystemTime::UNIX_EPOCH)
423 .expect("Failed to get duration since Unix epoch")
424 .as_secs();
425 let last_log = last_log_in_epoch_seconds.load(Ordering::SeqCst);
426
427 if now.saturating_sub(last_log) >= OPENTELEMETRY_ERROR_MSG_BACKOFF_SECONDS {
428 if last_log_in_epoch_seconds
429 .compare_exchange_weak(last_log, now, Ordering::Relaxed, Ordering::Relaxed)
430 .is_err()
431 {
432 return;
433 }
434 use crate::error::ErrorExt;
435 match err {
436 Error::Trace(err) => {
437 warn!("OpenTelemetry error: {}", err.display_with_causes());
438 }
439 Error::Metric(err) => {
441 warn!("OpenTelemetry error: {}", err.display_with_causes());
442 }
443 Error::Other(err) => {
444 warn!("OpenTelemetry error: {}", err);
445 }
446 _ => {
447 warn!("unknown OpenTelemetry error");
448 }
449 }
450 }
451 })
452 .expect("valid error handler");
453
454 let (filter, filter_handle) = reload::Layer::new({
455 let mut filter = otel_config.filter;
456 for directive in OPENTELEMETRY_DEFAULTS.iter() {
457 filter = filter.add_directive(directive.clone());
458 }
459 filter
460 });
461 let metrics_layer = MetricsLayer::new(&config.registry);
462 let layer = tracing_opentelemetry::layer()
463 .max_events_per_span(2048)
469 .with_tracer(tracer)
470 .and_then(metrics_layer)
471 .with_filter(filter);
475 let reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
476 for directive in &defaults {
478 filter = filter.add_directive(directive.clone());
479 }
480 Ok(filter_handle.reload(filter)?)
481 });
482 (Some(layer), reloader)
483 } else {
484 let reloader = Arc::new(|_, _| Ok(()));
485 (None, reloader)
486 };
487
488 #[cfg(feature = "tokio-console")]
489 let tokio_console_layer = if let Some(console_config) = config.tokio_console.clone() {
490 let builder = ConsoleLayer::builder()
491 .publish_interval(console_config.publish_interval)
492 .retention(console_config.retention);
493 let builder = match console_config.listen_addr {
494 SocketAddr::Inet(addr) => builder.server_addr(addr),
495 SocketAddr::Unix(addr) => {
496 let path = addr.as_pathname().unwrap().as_ref();
497 builder.server_addr(path)
498 }
499 SocketAddr::Turmoil(_) => unimplemented!(),
500 };
501 Some(builder.spawn())
502 } else {
503 None
504 };
505
506 let (sentry_layer, sentry_reloader): (_, DirectiveReloader) =
507 if let Some(sentry_config) = config.sentry {
508 let guard = sentry::init((
509 sentry_config.dsn,
510 sentry::ClientOptions {
511 attach_stacktrace: true,
512 release: Some(format!("materialize@{0}", config.build_version).into()),
513 environment: sentry_config.environment.map(Into::into),
514 ..Default::default()
515 },
516 ));
517
518 std::mem::forget(guard);
521
522 sentry::configure_scope(|scope| {
523 scope.set_tag("service_name", config.service_name);
524 scope.set_tag("build_sha", config.build_sha.to_string());
525 for (k, v) in sentry_config.tags {
526 scope.set_tag(&k, v);
527 }
528 });
529
530 let (filter, filter_handle) = reload::Layer::new({
531 let mut filter = EnvFilter::new("info");
533 for directive in SENTRY_DEFAULTS.iter() {
534 filter = filter.add_directive(directive.clone());
535 }
536 filter
537 });
538 let layer = sentry_tracing::layer()
539 .event_filter(sentry_config.event_filter)
540 .with_filter(filter);
565 let reloader = Arc::new(move |defaults: Vec<Directive>| {
566 let mut filter = EnvFilter::new("info");
568 for directive in &defaults {
570 filter = filter.add_directive(directive.clone());
571 }
572 Ok(filter_handle.reload(filter)?)
573 });
574 (Some(layer), reloader)
575 } else {
576 let reloader = Arc::new(|_| Ok(()));
577 (None, reloader)
578 };
579
580 #[cfg(feature = "capture")]
581 let capture = config.capture.map(|storage| CaptureLayer::new(&storage));
582
583 let stack = tracing_subscriber::registry();
584 let stack = stack.with(stderr_log_layer);
585 #[cfg(feature = "capture")]
586 let stack = stack.with(capture);
587 let stack = stack.with(otel_layer);
588 #[cfg(feature = "tokio-console")]
589 let stack = stack.with(tokio_console_layer);
590 let stack = stack.with(sentry_layer);
591
592 assert!(GLOBAL_SUBSCRIBER.set(Arc::new(stack)).is_ok());
594 Arc::clone(GLOBAL_SUBSCRIBER.get().unwrap()).init();
596
597 #[cfg(feature = "tokio-console")]
598 if let Some(console_config) = config.tokio_console {
599 let endpoint = match console_config.listen_addr {
600 SocketAddr::Inet(addr) => format!("http://{addr}"),
601 SocketAddr::Unix(addr) => format!("file://localhost{addr}"),
602 SocketAddr::Turmoil(_) => unimplemented!(),
603 };
604 tracing::info!("starting tokio console on {endpoint}");
605 }
606
607 let handle = TracingHandle {
608 stderr_log: stderr_log_reloader,
609 opentelemetry: otel_reloader,
610 sentry: sentry_reloader,
611 };
612
613 Ok(handle)
614}
615
616pub fn crate_level(filter: &EnvFilter, crate_name: &'static str) -> Level {
619 let mut default_level = Level::ERROR;
625 for directive in format!("{}", filter).split(',') {
628 match directive.split('=').collect::<Vec<_>>().as_slice() {
629 [target, level] => {
630 if *target == crate_name {
631 match Level::from_str(*level) {
632 Ok(level) => return level,
633 Err(err) => warn!("invalid level for {}: {}", target, err),
634 }
635 }
636 }
637 [token] => match Level::from_str(*token) {
638 Ok(level) => default_level = default_level.max(level),
639 Err(_) => {
640 if *token == crate_name {
642 default_level = default_level.max(Level::TRACE);
643 }
644 }
645 },
646 _ => {}
647 }
648 }
649
650 default_level
651}
652
653#[derive(Debug)]
656pub struct PrefixFormat<F> {
657 inner: F,
658 prefix: Option<String>,
659}
660
661impl<F, C, N> FormatEvent<C, N> for PrefixFormat<F>
662where
663 C: Subscriber + for<'a> LookupSpan<'a>,
664 N: for<'a> FormatFields<'a> + 'static,
665 F: FormatEvent<C, N>,
666{
667 fn format_event(
668 &self,
669 ctx: &FmtContext<'_, C, N>,
670 mut writer: Writer<'_>,
671 event: &Event<'_>,
672 ) -> std::fmt::Result {
673 match &self.prefix {
674 None => self.inner.format_event(ctx, writer, event)?,
675 Some(prefix) => {
676 let mut prefix = yansi::Paint::new(prefix);
677 if writer.has_ansi_escapes() {
678 prefix = prefix.bold();
679 }
680 write!(writer, "{}: ", prefix)?;
681 self.inner.format_event(ctx, writer, event)?;
682 }
683 }
684 Ok(())
685 }
686}
687
688#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
692pub struct OpenTelemetryContext {
693 inner: BTreeMap<String, String>,
694}
695
696impl OpenTelemetryContext {
697 pub fn attach_as_parent(&self) {
704 let parent_cx = global::get_text_map_propagator(|prop| prop.extract(self));
705 tracing::Span::current().set_parent(parent_cx);
706 }
707
708 pub fn attach_as_parent_to(&self, span: &Span) {
715 let parent_cx = global::get_text_map_propagator(|prop| prop.extract(self));
716 span.set_parent(parent_cx);
717 }
718
719 pub fn obtain() -> Self {
721 let mut context = Self::empty();
722 global::get_text_map_propagator(|propagator| {
723 propagator.inject_context(&tracing::Span::current().context(), &mut context)
724 });
725
726 context
727 }
728
729 pub fn empty() -> Self {
731 Self {
732 inner: BTreeMap::new(),
733 }
734 }
735}
736
737impl Extractor for OpenTelemetryContext {
738 fn get(&self, key: &str) -> Option<&str> {
739 self.inner.get(&key.to_lowercase()).map(|v| v.as_str())
740 }
741
742 fn keys(&self) -> Vec<&str> {
743 self.inner.keys().map(|k| k.as_str()).collect::<Vec<_>>()
744 }
745}
746
747impl Injector for OpenTelemetryContext {
748 fn set(&mut self, key: &str, value: String) {
749 self.inner.insert(key.to_lowercase(), value);
750 }
751}
752
753impl From<OpenTelemetryContext> for BTreeMap<String, String> {
754 fn from(ctx: OpenTelemetryContext) -> Self {
755 ctx.inner
756 }
757}
758
759impl From<BTreeMap<String, String>> for OpenTelemetryContext {
760 fn from(map: BTreeMap<String, String>) -> Self {
761 Self { inner: map }
762 }
763}
764
765struct MetricsLayer {
766 on_close: IntCounter,
767}
768
769impl MetricsLayer {
770 fn new(registry: &MetricsRegistry) -> Self {
771 MetricsLayer {
772 on_close: registry.register(metric!(
773 name: "mz_otel_on_close",
774 help: "count of on_close events sent to otel",
775 )),
776 }
777 }
778}
779
780impl<S: tracing::Subscriber> Layer<S> for MetricsLayer {
781 fn on_close(&self, _id: tracing::span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
782 self.on_close.inc()
783 }
784}
785
786#[cfg(test)]
787mod tests {
788 use std::str::FromStr;
789 use tracing::Level;
790 use tracing_subscriber::filter::{EnvFilter, LevelFilter, Targets};
791
792 #[crate::test]
793 fn overriding_targets() {
794 let user_defined = Targets::new().with_target("my_crate", Level::INFO);
795
796 let default = Targets::new().with_target("my_crate", LevelFilter::OFF);
797 assert!(!default.would_enable("my_crate", &Level::INFO));
798
799 let filters = Targets::new()
801 .with_targets(default)
802 .with_targets(user_defined);
803 assert!(filters.would_enable("my_crate", &Level::INFO));
804 }
805
806 #[crate::test]
807 fn crate_level() {
808 let filter = EnvFilter::from_str("abc=trace,def=debug").expect("valid");
810 assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
811 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
812 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
813 assert_eq!(
814 super::crate_level(&filter, "abc::doesnt::exist"),
815 Level::ERROR
816 );
817 assert_eq!(super::crate_level(&filter, "doesnt::exist"), Level::ERROR);
818
819 let filter = EnvFilter::from_str("abc=trace,def=debug,info").expect("valid");
821 assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
822 assert_eq!(
823 super::crate_level(&filter, "abc::doesnt:exist"),
824 Level::INFO
825 );
826 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
827 assert_eq!(super::crate_level(&filter, "nan"), Level::INFO);
828
829 let filter = EnvFilter::from_str("abc::def::ghi=trace,debug").expect("valid");
831 assert_eq!(super::crate_level(&filter, "abc"), Level::DEBUG);
832 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
833 assert_eq!(
834 super::crate_level(&filter, "gets_the_default"),
835 Level::DEBUG
836 );
837
838 let filter =
840 EnvFilter::from_str("abc[s]=trace,def[s{g=h}]=debug,[{s2}]=debug,info").expect("valid");
841 assert_eq!(super::crate_level(&filter, "abc"), Level::INFO);
842 assert_eq!(super::crate_level(&filter, "def"), Level::INFO);
843 assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
844
845 let filter = EnvFilter::from_str("abc,info").expect("valid");
847 assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
848 assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
849 assert_eq!(super::crate_level(&filter, "abc::def"), Level::INFO);
853 }
854}