1use std::collections::BTreeMap;
29use std::io;
30use std::io::IsTerminal;
31use std::str::FromStr;
32use std::sync::LazyLock;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::{Arc, OnceLock};
35use std::time::Duration;
36
37#[cfg(feature = "tokio-console")]
38use console_subscriber::ConsoleLayer;
39use derivative::Derivative;
40use http::HeaderMap;
41use hyper_tls::HttpsConnector;
42use hyper_util::client::legacy::connect::HttpConnector;
43use opentelemetry::global::Error;
44use opentelemetry::propagation::{Extractor, Injector};
45use opentelemetry::trace::TracerProvider;
46use opentelemetry::{KeyValue, global};
47use opentelemetry_sdk::propagation::TraceContextPropagator;
48use opentelemetry_sdk::{Resource, trace};
49use prometheus::IntCounter;
50use tonic::metadata::MetadataMap;
51use tonic::transport::Endpoint;
52use tracing::{Event, Level, Span, Subscriber, warn};
53#[cfg(feature = "capture")]
54use tracing_capture::{CaptureLayer, SharedStorage};
55use tracing_opentelemetry::OpenTelemetrySpanExt;
56use tracing_subscriber::filter::Directive;
57use tracing_subscriber::fmt::format::{Writer, format};
58use tracing_subscriber::fmt::{self, FmtContext, FormatEvent, FormatFields};
59use tracing_subscriber::layer::{Layer, SubscriberExt};
60use tracing_subscriber::registry::LookupSpan;
61use tracing_subscriber::util::SubscriberInitExt;
62use tracing_subscriber::{EnvFilter, Registry, reload};
63
64use crate::metric;
65use crate::metrics::MetricsRegistry;
66#[cfg(feature = "tokio-console")]
67use crate::netio::SocketAddr;
68
69#[derive(Derivative)]
73#[derivative(Debug)]
74pub struct TracingConfig<F> {
75 pub service_name: &'static str,
77 pub stderr_log: StderrLogConfig,
79 pub opentelemetry: Option<OpenTelemetryConfig>,
81 #[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
85 #[cfg(feature = "tokio-console")]
86 pub tokio_console: Option<TokioConsoleConfig>,
87 #[cfg(feature = "capture")]
89 #[derivative(Debug = "ignore")]
90 pub capture: Option<SharedStorage>,
91 pub sentry: Option<SentryConfig<F>>,
93 pub build_version: &'static str,
95 pub build_sha: &'static str,
97 pub registry: MetricsRegistry,
99}
100
101#[derive(Debug, Clone)]
103pub struct SentryConfig<F> {
104 pub dsn: String,
106 pub environment: Option<String>,
111 pub tags: BTreeMap<String, String>,
113 pub event_filter: F,
115}
116
117#[derive(Debug)]
119pub struct StderrLogConfig {
120 pub format: StderrLogFormat,
122 pub filter: EnvFilter,
124}
125
126#[derive(Debug, Clone)]
128pub enum StderrLogFormat {
129 Text {
133 prefix: Option<String>,
135 },
136 Json,
140}
141
142#[derive(Debug)]
144pub struct OpenTelemetryConfig {
145 pub endpoint: String,
149 pub headers: HeaderMap,
151 pub filter: EnvFilter,
153 pub max_batch_queue_size: usize,
155 pub max_export_batch_size: usize,
157 pub max_concurrent_exports: usize,
160 pub batch_scheduled_delay: Duration,
162 pub max_export_timeout: Duration,
164 pub resource: Resource,
167}
168
169#[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
173#[cfg(feature = "tokio-console")]
174#[derive(Debug, Clone)]
175pub struct TokioConsoleConfig {
176 pub listen_addr: SocketAddr,
180 pub publish_interval: Duration,
184 pub retention: Duration,
188}
189
190type Reloader = Arc<dyn Fn(EnvFilter, Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
191type DirectiveReloader = Arc<dyn Fn(Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
192
193#[derive(Clone)]
195pub struct TracingHandle {
196 stderr_log: Reloader,
197 opentelemetry: Reloader,
198 sentry: DirectiveReloader,
199}
200
201impl TracingHandle {
202 pub fn disabled() -> TracingHandle {
206 TracingHandle {
207 stderr_log: Arc::new(|_, _| Ok(())),
208 opentelemetry: Arc::new(|_, _| Ok(())),
209 sentry: Arc::new(|_| Ok(())),
210 }
211 }
212
213 pub fn reload_stderr_log_filter(
215 &self,
216 filter: EnvFilter,
217 defaults: Vec<Directive>,
218 ) -> Result<(), anyhow::Error> {
219 (self.stderr_log)(filter, defaults)
220 }
221
222 pub fn reload_opentelemetry_filter(
224 &self,
225 filter: EnvFilter,
226 defaults: Vec<Directive>,
227 ) -> Result<(), anyhow::Error> {
228 (self.opentelemetry)(filter, defaults)
229 }
230
231 pub fn reload_sentry_directives(&self, defaults: Vec<Directive>) -> Result<(), anyhow::Error> {
233 (self.sentry)(defaults)
234 }
235}
236
237impl std::fmt::Debug for TracingHandle {
238 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
239 f.debug_struct("TracingHandle").finish_non_exhaustive()
240 }
241}
242
243#[must_use = "Must hold for the lifetime of the program, otherwise tracing will be shutdown"]
247pub struct TracingGuard {
248 _sentry_guard: Option<sentry::ClientInitGuard>,
249}
250
251impl Drop for TracingGuard {
252 fn drop(&mut self) {
253 opentelemetry::global::shutdown_tracer_provider();
254 }
255}
256
257impl std::fmt::Debug for TracingGuard {
258 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
259 f.debug_struct("TracingGuard").finish_non_exhaustive()
260 }
261}
262
263pub const LOGGING_DEFAULTS_STR: [&str; 1] = ["kube_client::client::builder=off"];
273pub static LOGGING_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
275 LOGGING_DEFAULTS_STR
276 .into_iter()
277 .map(|directive| Directive::from_str(directive).expect("valid directive"))
278 .collect()
279});
280pub const OPENTELEMETRY_DEFAULTS_STR: [&str; 2] = ["h2=off", "hyper=off"];
286pub static OPENTELEMETRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
288 OPENTELEMETRY_DEFAULTS_STR
289 .into_iter()
290 .map(|directive| Directive::from_str(directive).expect("valid directive"))
291 .collect()
292});
293
294pub const SENTRY_DEFAULTS_STR: [&str; 2] =
297 ["kube_client::client::builder=off", "mysql_async::conn=off"];
298pub static SENTRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
300 SENTRY_DEFAULTS_STR
301 .into_iter()
302 .map(|directive| Directive::from_str(directive).expect("valid directive"))
303 .collect()
304});
305
306type GlobalSubscriber = Arc<dyn Subscriber + Send + Sync + 'static>;
308
309pub static GLOBAL_SUBSCRIBER: OnceLock<GlobalSubscriber> = OnceLock::new();
312
313#[allow(clippy::unused_async)]
334pub async fn configure<F>(
335 config: TracingConfig<F>,
336) -> Result<(TracingHandle, TracingGuard), anyhow::Error>
337where
338 F: Fn(&tracing::Metadata<'_>) -> sentry_tracing::EventFilter + Send + Sync + 'static,
339{
340 let stderr_log_layer: Box<dyn Layer<Registry> + Send + Sync> = match config.stderr_log.format {
341 StderrLogFormat::Text { prefix } => {
342 let no_color = std::env::var_os("NO_COLOR").unwrap_or_else(|| "".into()) != "";
344 Box::new(
345 fmt::layer()
346 .with_writer(io::stderr)
347 .event_format(PrefixFormat {
348 inner: format(),
349 prefix,
350 })
351 .with_ansi(!no_color && io::stderr().is_terminal()),
352 )
353 }
354 StderrLogFormat::Json => Box::new(
355 fmt::layer()
356 .with_writer(io::stderr)
357 .json()
358 .with_current_span(true),
359 ),
360 };
361 let (stderr_log_filter, stderr_log_filter_reloader) = reload::Layer::new({
362 let mut filter = config.stderr_log.filter;
363 for directive in LOGGING_DEFAULTS.iter() {
364 filter = filter.add_directive(directive.clone());
365 }
366 filter
367 });
368 let stderr_log_layer = stderr_log_layer.with_filter(stderr_log_filter);
369 let stderr_log_reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
370 for directive in &defaults {
371 filter = filter.add_directive(directive.clone());
372 }
373 Ok(stderr_log_filter_reloader.reload(filter)?)
374 });
375
376 let (otel_layer, otel_reloader): (_, Reloader) = if let Some(otel_config) = config.opentelemetry
377 {
378 opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
379
380 let channel = Endpoint::from_shared(otel_config.endpoint)?
384 .timeout(Duration::from_secs(
385 opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT,
386 ))
387 .connect_with_connector_lazy({
389 let mut http = HttpConnector::new();
390 http.enforce_http(false);
391 HttpsConnector::from((
392 http,
393 tokio_native_tls::TlsConnector::from(
395 native_tls::TlsConnector::builder()
396 .request_alpns(&["h2"])
397 .build()
398 .unwrap(),
399 ),
400 ))
401 });
402 let exporter = opentelemetry_otlp::new_exporter()
403 .tonic()
404 .with_channel(channel)
405 .with_metadata(MetadataMap::from_headers(otel_config.headers));
406 let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default()
407 .with_max_queue_size(otel_config.max_batch_queue_size)
408 .with_max_export_batch_size(otel_config.max_export_batch_size)
409 .with_max_concurrent_exports(otel_config.max_concurrent_exports)
410 .with_scheduled_delay(otel_config.batch_scheduled_delay)
411 .with_max_export_timeout(otel_config.max_export_timeout)
412 .build();
413 let tracer = opentelemetry_otlp::new_pipeline()
414 .tracing()
415 .with_trace_config(
416 trace::Config::default().with_resource(
417 Resource::new([KeyValue::new(
421 "service.name",
422 config.service_name.to_string(),
423 )])
424 .merge(&otel_config.resource),
425 ),
426 )
427 .with_exporter(exporter)
428 .with_batch_config(batch_config)
429 .install_batch(opentelemetry_sdk::runtime::Tokio)
430 .unwrap()
431 .tracer(config.service_name);
432
433 const OPENTELEMETRY_ERROR_MSG_BACKOFF_SECONDS: u64 = 30;
441 let last_log_in_epoch_seconds = AtomicU64::default();
442 opentelemetry::global::set_error_handler(move |err| {
443 let now = std::time::SystemTime::now()
444 .duration_since(std::time::SystemTime::UNIX_EPOCH)
445 .expect("Failed to get duration since Unix epoch")
446 .as_secs();
447 let last_log = last_log_in_epoch_seconds.load(Ordering::SeqCst);
448
449 if now.saturating_sub(last_log) >= OPENTELEMETRY_ERROR_MSG_BACKOFF_SECONDS {
450 if last_log_in_epoch_seconds
451 .compare_exchange_weak(last_log, now, Ordering::Relaxed, Ordering::Relaxed)
452 .is_err()
453 {
454 return;
455 }
456 use crate::error::ErrorExt;
457 match err {
458 Error::Trace(err) => {
459 warn!("OpenTelemetry error: {}", err.display_with_causes());
460 }
461 Error::Metric(err) => {
463 warn!("OpenTelemetry error: {}", err.display_with_causes());
464 }
465 Error::Other(err) => {
466 warn!("OpenTelemetry error: {}", err);
467 }
468 _ => {
469 warn!("unknown OpenTelemetry error");
470 }
471 }
472 }
473 })
474 .expect("valid error handler");
475
476 let (filter, filter_handle) = reload::Layer::new({
477 let mut filter = otel_config.filter;
478 for directive in OPENTELEMETRY_DEFAULTS.iter() {
479 filter = filter.add_directive(directive.clone());
480 }
481 filter
482 });
483 let metrics_layer = MetricsLayer::new(&config.registry);
484 let layer = tracing_opentelemetry::layer()
485 .max_events_per_span(2048)
491 .with_tracer(tracer)
492 .and_then(metrics_layer)
493 .with_filter(filter);
497 let reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
498 for directive in &defaults {
500 filter = filter.add_directive(directive.clone());
501 }
502 Ok(filter_handle.reload(filter)?)
503 });
504 (Some(layer), reloader)
505 } else {
506 let reloader = Arc::new(|_, _| Ok(()));
507 (None, reloader)
508 };
509
510 #[cfg(feature = "tokio-console")]
511 let tokio_console_layer = if let Some(console_config) = config.tokio_console.clone() {
512 let builder = ConsoleLayer::builder()
513 .publish_interval(console_config.publish_interval)
514 .retention(console_config.retention);
515 let builder = match console_config.listen_addr {
516 SocketAddr::Inet(addr) => builder.server_addr(addr),
517 SocketAddr::Unix(addr) => {
518 let path = addr.as_pathname().unwrap().as_ref();
519 builder.server_addr(path)
520 }
521 SocketAddr::Turmoil(_) => unimplemented!(),
522 };
523 Some(builder.spawn())
524 } else {
525 None
526 };
527
528 let (sentry_guard, sentry_layer, sentry_reloader): (_, _, DirectiveReloader) =
529 if let Some(sentry_config) = config.sentry {
530 let guard = sentry::init((
531 sentry_config.dsn,
532 sentry::ClientOptions {
533 attach_stacktrace: true,
534 release: Some(format!("materialize@{0}", config.build_version).into()),
535 environment: sentry_config.environment.map(Into::into),
536 ..Default::default()
537 },
538 ));
539
540 sentry::configure_scope(|scope| {
541 scope.set_tag("service_name", config.service_name);
542 scope.set_tag("build_sha", config.build_sha.to_string());
543 for (k, v) in sentry_config.tags {
544 scope.set_tag(&k, v);
545 }
546 });
547
548 let (filter, filter_handle) = reload::Layer::new({
549 let mut filter = EnvFilter::new("info");
551 for directive in SENTRY_DEFAULTS.iter() {
552 filter = filter.add_directive(directive.clone());
553 }
554 filter
555 });
556 let layer = sentry_tracing::layer()
557 .event_filter(sentry_config.event_filter)
558 .with_filter(filter);
583 let reloader = Arc::new(move |defaults: Vec<Directive>| {
584 let mut filter = EnvFilter::new("info");
586 for directive in &defaults {
588 filter = filter.add_directive(directive.clone());
589 }
590 Ok(filter_handle.reload(filter)?)
591 });
592 (Some(guard), Some(layer), reloader)
593 } else {
594 let reloader = Arc::new(|_| Ok(()));
595 (None, None, reloader)
596 };
597
598 #[cfg(feature = "capture")]
599 let capture = config.capture.map(|storage| CaptureLayer::new(&storage));
600
601 let stack = tracing_subscriber::registry();
602 let stack = stack.with(stderr_log_layer);
603 #[cfg(feature = "capture")]
604 let stack = stack.with(capture);
605 let stack = stack.with(otel_layer);
606 #[cfg(feature = "tokio-console")]
607 let stack = stack.with(tokio_console_layer);
608 let stack = stack.with(sentry_layer);
609
610 assert!(GLOBAL_SUBSCRIBER.set(Arc::new(stack)).is_ok());
612 Arc::clone(GLOBAL_SUBSCRIBER.get().unwrap()).init();
614
615 #[cfg(feature = "tokio-console")]
616 if let Some(console_config) = config.tokio_console {
617 let endpoint = match console_config.listen_addr {
618 SocketAddr::Inet(addr) => format!("http://{addr}"),
619 SocketAddr::Unix(addr) => format!("file://localhost{addr}"),
620 SocketAddr::Turmoil(_) => unimplemented!(),
621 };
622 tracing::info!("starting tokio console on {endpoint}");
623 }
624
625 let handle = TracingHandle {
626 stderr_log: stderr_log_reloader,
627 opentelemetry: otel_reloader,
628 sentry: sentry_reloader,
629 };
630 let guard = TracingGuard {
631 _sentry_guard: sentry_guard,
632 };
633
634 Ok((handle, guard))
635}
636
637pub fn crate_level(filter: &EnvFilter, crate_name: &'static str) -> Level {
640 let mut default_level = Level::ERROR;
646 for directive in format!("{}", filter).split(',') {
649 match directive.split('=').collect::<Vec<_>>().as_slice() {
650 [target, level] => {
651 if *target == crate_name {
652 match Level::from_str(*level) {
653 Ok(level) => return level,
654 Err(err) => warn!("invalid level for {}: {}", target, err),
655 }
656 }
657 }
658 [token] => match Level::from_str(*token) {
659 Ok(level) => default_level = default_level.max(level),
660 Err(_) => {
661 if *token == crate_name {
663 default_level = default_level.max(Level::TRACE);
664 }
665 }
666 },
667 _ => {}
668 }
669 }
670
671 default_level
672}
673
674#[derive(Debug)]
677pub struct PrefixFormat<F> {
678 inner: F,
679 prefix: Option<String>,
680}
681
682impl<F, C, N> FormatEvent<C, N> for PrefixFormat<F>
683where
684 C: Subscriber + for<'a> LookupSpan<'a>,
685 N: for<'a> FormatFields<'a> + 'static,
686 F: FormatEvent<C, N>,
687{
688 fn format_event(
689 &self,
690 ctx: &FmtContext<'_, C, N>,
691 mut writer: Writer<'_>,
692 event: &Event<'_>,
693 ) -> std::fmt::Result {
694 match &self.prefix {
695 None => self.inner.format_event(ctx, writer, event)?,
696 Some(prefix) => {
697 let mut prefix = yansi::Paint::new(prefix);
698 if writer.has_ansi_escapes() {
699 prefix = prefix.bold();
700 }
701 write!(writer, "{}: ", prefix)?;
702 self.inner.format_event(ctx, writer, event)?;
703 }
704 }
705 Ok(())
706 }
707}
708
709#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
713pub struct OpenTelemetryContext {
714 inner: BTreeMap<String, String>,
715}
716
717impl OpenTelemetryContext {
718 pub fn attach_as_parent(&self) {
725 let parent_cx = global::get_text_map_propagator(|prop| prop.extract(self));
726 tracing::Span::current().set_parent(parent_cx);
727 }
728
729 pub fn attach_as_parent_to(&self, span: &Span) {
736 let parent_cx = global::get_text_map_propagator(|prop| prop.extract(self));
737 span.set_parent(parent_cx);
738 }
739
740 pub fn obtain() -> Self {
742 let mut context = Self::empty();
743 global::get_text_map_propagator(|propagator| {
744 propagator.inject_context(&tracing::Span::current().context(), &mut context)
745 });
746
747 context
748 }
749
750 pub fn empty() -> Self {
752 Self {
753 inner: BTreeMap::new(),
754 }
755 }
756}
757
758impl Extractor for OpenTelemetryContext {
759 fn get(&self, key: &str) -> Option<&str> {
760 self.inner.get(&key.to_lowercase()).map(|v| v.as_str())
761 }
762
763 fn keys(&self) -> Vec<&str> {
764 self.inner.keys().map(|k| k.as_str()).collect::<Vec<_>>()
765 }
766}
767
768impl Injector for OpenTelemetryContext {
769 fn set(&mut self, key: &str, value: String) {
770 self.inner.insert(key.to_lowercase(), value);
771 }
772}
773
774impl From<OpenTelemetryContext> for BTreeMap<String, String> {
775 fn from(ctx: OpenTelemetryContext) -> Self {
776 ctx.inner
777 }
778}
779
780impl From<BTreeMap<String, String>> for OpenTelemetryContext {
781 fn from(map: BTreeMap<String, String>) -> Self {
782 Self { inner: map }
783 }
784}
785
786struct MetricsLayer {
787 on_close: IntCounter,
788}
789
790impl MetricsLayer {
791 fn new(registry: &MetricsRegistry) -> Self {
792 MetricsLayer {
793 on_close: registry.register(metric!(
794 name: "mz_otel_on_close",
795 help: "count of on_close events sent to otel",
796 )),
797 }
798 }
799}
800
801impl<S: tracing::Subscriber> Layer<S> for MetricsLayer {
802 fn on_close(&self, _id: tracing::span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
803 self.on_close.inc()
804 }
805}
806
807#[cfg(test)]
808mod tests {
809 use std::str::FromStr;
810 use tracing::Level;
811 use tracing_subscriber::filter::{EnvFilter, LevelFilter, Targets};
812
813 #[crate::test]
814 fn overriding_targets() {
815 let user_defined = Targets::new().with_target("my_crate", Level::INFO);
816
817 let default = Targets::new().with_target("my_crate", LevelFilter::OFF);
818 assert!(!default.would_enable("my_crate", &Level::INFO));
819
820 let filters = Targets::new()
822 .with_targets(default)
823 .with_targets(user_defined);
824 assert!(filters.would_enable("my_crate", &Level::INFO));
825 }
826
827 #[crate::test]
828 fn crate_level() {
829 let filter = EnvFilter::from_str("abc=trace,def=debug").expect("valid");
831 assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
832 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
833 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
834 assert_eq!(
835 super::crate_level(&filter, "abc::doesnt::exist"),
836 Level::ERROR
837 );
838 assert_eq!(super::crate_level(&filter, "doesnt::exist"), Level::ERROR);
839
840 let filter = EnvFilter::from_str("abc=trace,def=debug,info").expect("valid");
842 assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
843 assert_eq!(
844 super::crate_level(&filter, "abc::doesnt:exist"),
845 Level::INFO
846 );
847 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
848 assert_eq!(super::crate_level(&filter, "nan"), Level::INFO);
849
850 let filter = EnvFilter::from_str("abc::def::ghi=trace,debug").expect("valid");
852 assert_eq!(super::crate_level(&filter, "abc"), Level::DEBUG);
853 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
854 assert_eq!(
855 super::crate_level(&filter, "gets_the_default"),
856 Level::DEBUG
857 );
858
859 let filter =
861 EnvFilter::from_str("abc[s]=trace,def[s{g=h}]=debug,[{s2}]=debug,info").expect("valid");
862 assert_eq!(super::crate_level(&filter, "abc"), Level::INFO);
863 assert_eq!(super::crate_level(&filter, "def"), Level::INFO);
864 assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
865
866 let filter = EnvFilter::from_str("abc,info").expect("valid");
868 assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
869 assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
870 assert_eq!(super::crate_level(&filter, "abc::def"), Level::INFO);
874 }
875}