1use std::collections::BTreeMap;
29use std::hash::{Hash, Hasher};
30use std::io;
31use std::io::IsTerminal;
32use std::str::FromStr;
33use std::sync::LazyLock;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::{Arc, Mutex, OnceLock};
36use std::time::Duration;
37
38#[cfg(feature = "tokio-console")]
39use console_subscriber::ConsoleLayer;
40use derivative::Derivative;
41use http::HeaderMap;
42use hyper_tls::HttpsConnector;
43use hyper_util::client::legacy::connect::HttpConnector;
44use opentelemetry::propagation::{Extractor, Injector};
45use opentelemetry::trace::TracerProvider;
46use opentelemetry::{KeyValue, global};
47use opentelemetry_otlp::WithTonicConfig;
48use opentelemetry_sdk::propagation::TraceContextPropagator;
49use opentelemetry_sdk::runtime::Tokio;
50use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
51use opentelemetry_sdk::{Resource, trace};
52use prometheus::IntCounter;
53use tonic::metadata::MetadataMap;
54use tonic::transport::Endpoint;
55use tracing::{Event, Level, Span, Subscriber, warn};
56#[cfg(feature = "capture")]
57use tracing_capture::{CaptureLayer, SharedStorage};
58use tracing_opentelemetry::OpenTelemetrySpanExt;
59use tracing_subscriber::filter::Directive;
60use tracing_subscriber::fmt::format::{Writer, format};
61use tracing_subscriber::fmt::{self, FmtContext, FormatEvent, FormatFields};
62use tracing_subscriber::layer::{Layer, SubscriberExt};
63use tracing_subscriber::registry::LookupSpan;
64use tracing_subscriber::util::SubscriberInitExt;
65use tracing_subscriber::{EnvFilter, Registry, reload};
66
67use crate::metric;
68use crate::metrics::MetricsRegistry;
69#[cfg(feature = "tokio-console")]
70use crate::netio::SocketAddr;
71use crate::now::{EpochMillis, NowFn, SYSTEM_TIME};
72
73#[derive(Derivative)]
77#[derivative(Debug)]
78pub struct TracingConfig<F> {
79 pub service_name: &'static str,
81 pub stderr_log: StderrLogConfig,
83 pub opentelemetry: Option<OpenTelemetryConfig>,
85 #[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
89 #[cfg(feature = "tokio-console")]
90 pub tokio_console: Option<TokioConsoleConfig>,
91 #[cfg(feature = "capture")]
93 #[derivative(Debug = "ignore")]
94 pub capture: Option<SharedStorage>,
95 pub sentry: Option<SentryConfig<F>>,
97 pub build_version: &'static str,
99 pub build_sha: &'static str,
101 pub registry: MetricsRegistry,
103}
104
105#[derive(Debug, Clone)]
107pub struct SentryConfig<F> {
108 pub dsn: String,
110 pub environment: Option<String>,
115 pub tags: BTreeMap<String, String>,
117 pub event_filter: F,
119}
120
121#[derive(Debug)]
123pub struct StderrLogConfig {
124 pub format: StderrLogFormat,
126 pub filter: EnvFilter,
128}
129
130#[derive(Debug, Clone)]
132pub enum StderrLogFormat {
133 Text {
137 prefix: Option<String>,
139 },
140 Json,
144}
145
146#[derive(Debug)]
148pub struct OpenTelemetryConfig {
149 pub endpoint: String,
153 pub headers: HeaderMap,
155 pub filter: EnvFilter,
157 pub max_batch_queue_size: usize,
159 pub max_export_batch_size: usize,
161 pub max_concurrent_exports: usize,
164 pub batch_scheduled_delay: Duration,
166 pub max_export_timeout: Duration,
168 pub resource: Resource,
171}
172
173#[cfg_attr(nightly_doc_features, doc(cfg(feature = "tokio-console")))]
177#[cfg(feature = "tokio-console")]
178#[derive(Debug, Clone)]
179pub struct TokioConsoleConfig {
180 pub listen_addr: SocketAddr,
184 pub publish_interval: Duration,
188 pub retention: Duration,
192}
193
194type Reloader = Arc<dyn Fn(EnvFilter, Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
195type DirectiveReloader = Arc<dyn Fn(Vec<Directive>) -> Result<(), anyhow::Error> + Send + Sync>;
196
197#[derive(Clone)]
199pub struct TracingHandle {
200 stderr_log: Reloader,
201 opentelemetry: Reloader,
202 sentry: DirectiveReloader,
203}
204
205impl TracingHandle {
206 pub fn disabled() -> TracingHandle {
210 TracingHandle {
211 stderr_log: Arc::new(|_, _| Ok(())),
212 opentelemetry: Arc::new(|_, _| Ok(())),
213 sentry: Arc::new(|_| Ok(())),
214 }
215 }
216
217 pub fn reload_stderr_log_filter(
219 &self,
220 filter: EnvFilter,
221 defaults: Vec<Directive>,
222 ) -> Result<(), anyhow::Error> {
223 (self.stderr_log)(filter, defaults)
224 }
225
226 pub fn reload_opentelemetry_filter(
228 &self,
229 filter: EnvFilter,
230 defaults: Vec<Directive>,
231 ) -> Result<(), anyhow::Error> {
232 (self.opentelemetry)(filter, defaults)
233 }
234
235 pub fn reload_sentry_directives(&self, defaults: Vec<Directive>) -> Result<(), anyhow::Error> {
237 (self.sentry)(defaults)
238 }
239}
240
241impl std::fmt::Debug for TracingHandle {
242 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
243 f.debug_struct("TracingHandle").finish_non_exhaustive()
244 }
245}
246
247pub const LOGGING_DEFAULTS_STR: [&str; 2] = [
257 "kube_client::client::builder=off",
258 "aws_config::profile::credentials=off",
261];
262pub static LOGGING_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
264 LOGGING_DEFAULTS_STR
265 .into_iter()
266 .map(|directive| Directive::from_str(directive).expect("valid directive"))
267 .collect()
268});
269pub const OPENTELEMETRY_DEFAULTS_STR: [&str; 2] = ["h2=off", "hyper=off"];
275pub static OPENTELEMETRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
277 OPENTELEMETRY_DEFAULTS_STR
278 .into_iter()
279 .map(|directive| Directive::from_str(directive).expect("valid directive"))
280 .collect()
281});
282
283pub const SENTRY_DEFAULTS_STR: [&str; 2] =
286 ["kube_client::client::builder=off", "mysql_async::conn=off"];
287pub static SENTRY_DEFAULTS: LazyLock<Vec<Directive>> = LazyLock::new(|| {
289 SENTRY_DEFAULTS_STR
290 .into_iter()
291 .map(|directive| Directive::from_str(directive).expect("valid directive"))
292 .collect()
293});
294
295type GlobalSubscriber = Arc<dyn Subscriber + Send + Sync + 'static>;
297
298pub static GLOBAL_SUBSCRIBER: OnceLock<GlobalSubscriber> = OnceLock::new();
301
302#[allow(clippy::unused_async)]
323pub async fn configure<F>(config: TracingConfig<F>) -> Result<TracingHandle, anyhow::Error>
324where
325 F: Fn(&tracing::Metadata<'_>) -> sentry_tracing::EventFilter + Send + Sync + 'static,
326{
327 let stderr_log_layer: Box<dyn Layer<Registry> + Send + Sync> = match config.stderr_log.format {
328 StderrLogFormat::Text { prefix } => {
329 let no_color = std::env::var_os("NO_COLOR").unwrap_or_else(|| "".into()) != "";
331 Box::new(
332 fmt::layer()
333 .with_writer(io::stderr)
334 .event_format(PrefixFormat {
335 inner: format(),
336 prefix,
337 })
338 .with_ansi(!no_color && io::stderr().is_terminal()),
339 )
340 }
341 StderrLogFormat::Json => Box::new(
342 fmt::layer()
343 .with_writer(io::stderr)
344 .json()
345 .with_current_span(true),
346 ),
347 };
348 let (stderr_log_filter, stderr_log_filter_reloader) = reload::Layer::new({
349 let mut filter = config.stderr_log.filter;
350 for directive in LOGGING_DEFAULTS.iter() {
351 filter = filter.add_directive(directive.clone());
352 }
353 filter
354 });
355 let otel_rate_limit_filter = OpenTelemetryRateLimitingFilter::new(Duration::from_secs(
359 OPENTELEMETRY_RATE_LIMIT_BACKOFF_SECS,
360 ));
361 let stderr_log_layer = stderr_log_layer
366 .with_filter(otel_rate_limit_filter)
367 .with_filter(stderr_log_filter);
368 let stderr_log_reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
369 for directive in &defaults {
370 filter = filter.add_directive(directive.clone());
371 }
372 Ok(stderr_log_filter_reloader.reload(filter)?)
373 });
374
375 let (otel_layer, otel_reloader): (_, Reloader) = if let Some(otel_config) = config.opentelemetry
376 {
377 opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
378
379 let channel = Endpoint::from_shared(otel_config.endpoint)?
383 .timeout(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT)
384 .connect_with_connector_lazy({
386 let mut http = HttpConnector::new();
387 http.enforce_http(false);
388 HttpsConnector::from((
389 http,
390 tokio_native_tls::TlsConnector::from(
392 native_tls::TlsConnector::builder()
393 .request_alpns(&["h2"])
394 .build()
395 .unwrap(),
396 ),
397 ))
398 });
399 let exporter = opentelemetry_otlp::SpanExporter::builder()
400 .with_tonic()
401 .with_channel(channel)
402 .with_metadata(MetadataMap::from_headers(otel_config.headers))
403 .build()?;
404 let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default()
405 .with_max_queue_size(otel_config.max_batch_queue_size)
406 .with_max_export_batch_size(otel_config.max_export_batch_size)
407 .with_max_concurrent_exports(otel_config.max_concurrent_exports)
408 .with_scheduled_delay(otel_config.batch_scheduled_delay)
409 .with_max_export_timeout(otel_config.max_export_timeout)
410 .build();
411 let batch_span_processor = BatchSpanProcessor::builder(exporter, Tokio)
412 .with_batch_config(batch_config)
413 .build();
414 let tracer = trace::SdkTracerProvider::builder()
415 .with_resource(
416 Resource::builder()
417 .with_service_name(config.service_name.to_string())
418 .with_attributes(
419 otel_config
420 .resource
421 .iter()
422 .map(|(k, v)| KeyValue::new(k.clone(), v.clone())),
423 )
425 .build(),
426 )
427 .with_span_processor(batch_span_processor)
428 .with_max_events_per_span(2048)
429 .build()
430 .tracer(config.service_name);
431
432 let (filter, filter_handle) = reload::Layer::new({
433 let mut filter = otel_config.filter;
434 for directive in OPENTELEMETRY_DEFAULTS.iter() {
435 filter = filter.add_directive(directive.clone());
436 }
437 filter
438 });
439 let metrics_layer = MetricsLayer::new(&config.registry);
440 let layer = tracing_opentelemetry::layer()
441 .with_tracer(tracer)
447 .and_then(metrics_layer)
448 .with_filter(filter);
452 let reloader = Arc::new(move |mut filter: EnvFilter, defaults: Vec<Directive>| {
453 for directive in &defaults {
455 filter = filter.add_directive(directive.clone());
456 }
457 Ok(filter_handle.reload(filter)?)
458 });
459 (Some(layer), reloader)
460 } else {
461 let reloader = Arc::new(|_, _| Ok(()));
462 (None, reloader)
463 };
464
465 #[cfg(feature = "tokio-console")]
466 let tokio_console_layer = if let Some(console_config) = config.tokio_console.clone() {
467 let builder = ConsoleLayer::builder()
468 .publish_interval(console_config.publish_interval)
469 .retention(console_config.retention);
470 let builder = match console_config.listen_addr {
471 SocketAddr::Inet(addr) => builder.server_addr(addr),
472 SocketAddr::Unix(addr) => {
473 let path = addr.as_pathname().unwrap().as_ref();
474 builder.server_addr(path)
475 }
476 SocketAddr::Turmoil(_) => unimplemented!(),
477 };
478 Some(builder.spawn())
479 } else {
480 None
481 };
482
483 let (sentry_layer, sentry_reloader): (_, DirectiveReloader) =
484 if let Some(sentry_config) = config.sentry {
485 let guard = sentry::init((
486 sentry_config.dsn,
487 sentry::ClientOptions {
488 attach_stacktrace: true,
489 release: Some(format!("materialize@{0}", config.build_version).into()),
490 environment: sentry_config.environment.map(Into::into),
491 ..Default::default()
492 },
493 ));
494
495 std::mem::forget(guard);
498
499 sentry::configure_scope(|scope| {
500 scope.set_tag("service_name", config.service_name);
501 scope.set_tag("build_sha", config.build_sha.to_string());
502 for (k, v) in sentry_config.tags {
503 scope.set_tag(&k, v);
504 }
505 });
506
507 let (filter, filter_handle) = reload::Layer::new({
508 let mut filter = EnvFilter::new("info");
510 for directive in SENTRY_DEFAULTS.iter() {
511 filter = filter.add_directive(directive.clone());
512 }
513 filter
514 });
515 let layer = sentry_tracing::layer()
516 .event_filter(sentry_config.event_filter)
517 .with_filter(filter);
542 let reloader = Arc::new(move |defaults: Vec<Directive>| {
543 let mut filter = EnvFilter::new("info");
545 for directive in &defaults {
547 filter = filter.add_directive(directive.clone());
548 }
549 Ok(filter_handle.reload(filter)?)
550 });
551 (Some(layer), reloader)
552 } else {
553 let reloader = Arc::new(|_| Ok(()));
554 (None, reloader)
555 };
556
557 #[cfg(feature = "capture")]
558 let capture = config.capture.map(|storage| CaptureLayer::new(&storage));
559
560 let stack = tracing_subscriber::registry();
561 let stack = stack.with(stderr_log_layer);
562 #[cfg(feature = "capture")]
563 let stack = stack.with(capture);
564 let stack = stack.with(otel_layer);
565 #[cfg(feature = "tokio-console")]
566 let stack = stack.with(tokio_console_layer);
567 let stack = stack.with(sentry_layer);
568
569 assert!(GLOBAL_SUBSCRIBER.set(Arc::new(stack)).is_ok());
571 Arc::clone(GLOBAL_SUBSCRIBER.get().unwrap()).init();
573
574 #[cfg(feature = "tokio-console")]
575 if let Some(console_config) = config.tokio_console {
576 let endpoint = match console_config.listen_addr {
577 SocketAddr::Inet(addr) => format!("http://{addr}"),
578 SocketAddr::Unix(addr) => format!("file://localhost{addr}"),
579 SocketAddr::Turmoil(_) => unimplemented!(),
580 };
581 tracing::info!("starting tokio console on {endpoint}");
582 }
583
584 let handle = TracingHandle {
585 stderr_log: stderr_log_reloader,
586 opentelemetry: otel_reloader,
587 sentry: sentry_reloader,
588 };
589
590 Ok(handle)
591}
592
593pub fn crate_level(filter: &EnvFilter, crate_name: &'static str) -> Level {
596 let mut default_level = Level::ERROR;
602 for directive in format!("{}", filter).split(',') {
605 match directive.split('=').collect::<Vec<_>>().as_slice() {
606 [target, level] => {
607 if *target == crate_name {
608 match Level::from_str(*level) {
609 Ok(level) => return level,
610 Err(err) => warn!("invalid level for {}: {}", target, err),
611 }
612 }
613 }
614 [token] => match Level::from_str(*token) {
615 Ok(level) => default_level = default_level.max(level),
616 Err(_) => {
617 if *token == crate_name {
619 default_level = default_level.max(Level::TRACE);
620 }
621 }
622 },
623 _ => {}
624 }
625 }
626
627 default_level
628}
629
630#[derive(Debug)]
633pub struct PrefixFormat<F> {
634 inner: F,
635 prefix: Option<String>,
636}
637
638impl<F, C, N> FormatEvent<C, N> for PrefixFormat<F>
639where
640 C: Subscriber + for<'a> LookupSpan<'a>,
641 N: for<'a> FormatFields<'a> + 'static,
642 F: FormatEvent<C, N>,
643{
644 fn format_event(
645 &self,
646 ctx: &FmtContext<'_, C, N>,
647 mut writer: Writer<'_>,
648 event: &Event<'_>,
649 ) -> std::fmt::Result {
650 match &self.prefix {
651 None => self.inner.format_event(ctx, writer, event)?,
652 Some(prefix) => {
653 let mut prefix = yansi::Paint::new(prefix);
654 if writer.has_ansi_escapes() {
655 prefix = prefix.bold();
656 }
657 write!(writer, "{}: ", prefix)?;
658 self.inner.format_event(ctx, writer, event)?;
659 }
660 }
661 Ok(())
662 }
663}
664
665#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
669pub struct OpenTelemetryContext {
670 inner: BTreeMap<String, String>,
671}
672
673impl OpenTelemetryContext {
674 pub fn attach_as_parent(&self) {
681 self.attach_as_parent_to(&tracing::Span::current())
682 }
683
684 pub fn attach_as_parent_to(&self, span: &Span) {
690 let parent_cx = global::get_text_map_propagator(|prop| prop.extract(self));
691 let _ = span.set_parent(parent_cx);
692 }
693
694 pub fn obtain() -> Self {
696 let mut context = Self::empty();
697 global::get_text_map_propagator(|propagator| {
698 propagator.inject_context(&tracing::Span::current().context(), &mut context)
699 });
700
701 context
702 }
703
704 pub fn empty() -> Self {
706 Self {
707 inner: BTreeMap::new(),
708 }
709 }
710}
711
712impl Extractor for OpenTelemetryContext {
713 fn get(&self, key: &str) -> Option<&str> {
714 self.inner.get(&key.to_lowercase()).map(|v| v.as_str())
715 }
716
717 fn keys(&self) -> Vec<&str> {
718 self.inner.keys().map(|k| k.as_str()).collect::<Vec<_>>()
719 }
720}
721
722impl Injector for OpenTelemetryContext {
723 fn set(&mut self, key: &str, value: String) {
724 self.inner.insert(key.to_lowercase(), value);
725 }
726}
727
728impl From<OpenTelemetryContext> for BTreeMap<String, String> {
729 fn from(ctx: OpenTelemetryContext) -> Self {
730 ctx.inner
731 }
732}
733
734impl From<BTreeMap<String, String>> for OpenTelemetryContext {
735 fn from(map: BTreeMap<String, String>) -> Self {
736 Self { inner: map }
737 }
738}
739
740struct MetricsLayer {
741 on_close: IntCounter,
742}
743
744impl MetricsLayer {
745 fn new(registry: &MetricsRegistry) -> Self {
746 MetricsLayer {
747 on_close: registry.register(metric!(
748 name: "mz_otel_on_close",
749 help: "count of on_close events sent to otel",
750 )),
751 }
752 }
753}
754
755impl<S: tracing::Subscriber> Layer<S> for MetricsLayer {
756 fn on_close(&self, _id: tracing::span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
757 self.on_close.inc()
758 }
759}
760
761const OPENTELEMETRY_TARGET_PREFIX: &str = "opentelemetry";
765
766const OPENTELEMETRY_RATE_LIMIT_BACKOFF_SECS: u64 = 30;
768
769#[derive(Debug)]
779pub struct OpenTelemetryRateLimitingFilter {
780 backoff_duration: Duration,
782 last_logged: Mutex<BTreeMap<u64, EpochMillis>>,
785 suppressed_count: AtomicU64,
787 now_fn: NowFn,
789}
790
791impl OpenTelemetryRateLimitingFilter {
792 pub fn new(backoff_duration: Duration) -> Self {
797 Self {
798 backoff_duration,
799 last_logged: Mutex::new(BTreeMap::new()),
800 suppressed_count: AtomicU64::new(0),
801 now_fn: SYSTEM_TIME.clone(),
802 }
803 }
804
805 #[cfg(test)]
807 fn with_now_fn(mut self, now_fn: NowFn) -> Self {
808 self.now_fn = now_fn;
809 self
810 }
811
812 fn compute_key(metadata: &tracing::Metadata<'_>) -> u64 {
814 let mut hasher = std::hash::DefaultHasher::new();
815 metadata.target().hash(&mut hasher);
816 metadata.name().hash(&mut hasher);
817 if let Some(file) = metadata.file() {
819 file.hash(&mut hasher);
820 }
821 if let Some(line) = metadata.line() {
822 line.hash(&mut hasher);
823 }
824 hasher.finish()
825 }
826
827 fn should_log(&self, metadata: &tracing::Metadata<'_>) -> bool {
829 if !metadata.target().starts_with(OPENTELEMETRY_TARGET_PREFIX) {
831 return true;
832 }
833
834 let key = Self::compute_key(metadata);
835 let now = (self.now_fn)();
836
837 let mut last_logged = self.last_logged.lock().unwrap();
838
839 if let Some(last_time) = last_logged.get(&key) {
840 if Duration::from_millis(now - last_time) < self.backoff_duration {
841 self.suppressed_count.fetch_add(1, Ordering::Relaxed);
842 return false;
843 }
844 }
845
846 last_logged.insert(key, now);
847
848 if last_logged.len() > 1000 {
851 last_logged
852 .retain(|_, time| Duration::from_millis(now - *time) < self.backoff_duration);
853 }
854
855 true
856 }
857
858 pub fn suppressed_count(&self) -> u64 {
860 self.suppressed_count.load(Ordering::Relaxed)
861 }
862}
863
864impl<S> tracing_subscriber::layer::Filter<S> for OpenTelemetryRateLimitingFilter
865where
866 S: tracing::Subscriber + for<'lookup> LookupSpan<'lookup>,
867{
868 fn enabled(
869 &self,
870 metadata: &tracing::Metadata<'_>,
871 _ctx: &tracing_subscriber::layer::Context<'_, S>,
872 ) -> bool {
873 self.should_log(metadata)
874 }
875
876 fn event_enabled(
877 &self,
878 event: &Event<'_>,
879 _ctx: &tracing_subscriber::layer::Context<'_, S>,
880 ) -> bool {
881 self.should_log(event.metadata())
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use std::str::FromStr;
888 use tracing::Level;
889 use tracing_subscriber::filter::{EnvFilter, LevelFilter, Targets};
890
891 #[crate::test]
892 fn overriding_targets() {
893 let user_defined = Targets::new().with_target("my_crate", Level::INFO);
894
895 let default = Targets::new().with_target("my_crate", LevelFilter::OFF);
896 assert!(!default.would_enable("my_crate", &Level::INFO));
897
898 let filters = Targets::new()
900 .with_targets(default)
901 .with_targets(user_defined);
902 assert!(filters.would_enable("my_crate", &Level::INFO));
903 }
904
905 #[crate::test]
906 fn crate_level() {
907 let filter = EnvFilter::from_str("abc=trace,def=debug").expect("valid");
909 assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
910 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
911 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
912 assert_eq!(
913 super::crate_level(&filter, "abc::doesnt::exist"),
914 Level::ERROR
915 );
916 assert_eq!(super::crate_level(&filter, "doesnt::exist"), Level::ERROR);
917
918 let filter = EnvFilter::from_str("abc=trace,def=debug,info").expect("valid");
920 assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
921 assert_eq!(
922 super::crate_level(&filter, "abc::doesnt:exist"),
923 Level::INFO
924 );
925 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
926 assert_eq!(super::crate_level(&filter, "nan"), Level::INFO);
927
928 let filter = EnvFilter::from_str("abc::def::ghi=trace,debug").expect("valid");
930 assert_eq!(super::crate_level(&filter, "abc"), Level::DEBUG);
931 assert_eq!(super::crate_level(&filter, "def"), Level::DEBUG);
932 assert_eq!(
933 super::crate_level(&filter, "gets_the_default"),
934 Level::DEBUG
935 );
936
937 let filter =
939 EnvFilter::from_str("abc[s]=trace,def[s{g=h}]=debug,[{s2}]=debug,info").expect("valid");
940 assert_eq!(super::crate_level(&filter, "abc"), Level::INFO);
941 assert_eq!(super::crate_level(&filter, "def"), Level::INFO);
942 assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
943
944 let filter = EnvFilter::from_str("abc,info").expect("valid");
946 assert_eq!(super::crate_level(&filter, "abc"), Level::TRACE);
947 assert_eq!(super::crate_level(&filter, "gets_the_default"), Level::INFO);
948 assert_eq!(super::crate_level(&filter, "abc::def"), Level::INFO);
952 }
953
954 #[crate::test]
955 fn otel_rate_limiting_filter_backoff() {
956 use std::sync::Arc;
957 use std::sync::atomic::{AtomicU64, Ordering};
958 use std::time::Duration;
959 use tracing::Callsite;
960
961 use crate::now::NowFn;
962
963 let current_time = Arc::new(AtomicU64::new(0));
965 let time_for_closure = Arc::clone(¤t_time);
966 let now_fn: NowFn = NowFn::from(move || time_for_closure.load(Ordering::SeqCst));
967
968 let filter = super::OpenTelemetryRateLimitingFilter::new(Duration::from_millis(100))
969 .with_now_fn(now_fn);
970
971 static OTEL_CALLSITE: tracing::callsite::DefaultCallsite =
973 tracing::callsite::DefaultCallsite::new(&tracing::Metadata::new(
974 "test_event",
975 "opentelemetry_sdk::trace",
976 Level::WARN,
977 Some(file!()),
978 Some(line!()),
979 Some(module_path!()),
980 tracing::field::FieldSet::new(&[], tracing::callsite::Identifier(&OTEL_CALLSITE)),
981 tracing::metadata::Kind::EVENT,
982 ));
983 let otel_meta = OTEL_CALLSITE.metadata();
984
985 static OTHER_CALLSITE: tracing::callsite::DefaultCallsite =
986 tracing::callsite::DefaultCallsite::new(&tracing::Metadata::new(
987 "test_event",
988 "my_app::module",
989 Level::WARN,
990 Some(file!()),
991 Some(line!()),
992 Some(module_path!()),
993 tracing::field::FieldSet::new(&[], tracing::callsite::Identifier(&OTHER_CALLSITE)),
994 tracing::metadata::Kind::EVENT,
995 ));
996 let other_meta = OTHER_CALLSITE.metadata();
997
998 assert!(filter.should_log(other_meta));
1000 assert!(filter.should_log(other_meta));
1001 assert!(filter.should_log(other_meta));
1002 assert_eq!(filter.suppressed_count(), 0);
1003
1004 assert!(filter.should_log(otel_meta));
1006 assert_eq!(filter.suppressed_count(), 0);
1007
1008 current_time.store(50, Ordering::SeqCst); assert!(!filter.should_log(otel_meta));
1011 assert!(!filter.should_log(otel_meta));
1012 assert_eq!(filter.suppressed_count(), 2);
1013
1014 current_time.store(150, Ordering::SeqCst); assert!(filter.should_log(otel_meta));
1017 assert_eq!(filter.suppressed_count(), 2);
1018 }
1019}