1use std::str::FromStr;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use mz_dyncfg::{Config, ConfigSet, ConfigUpdates};
17use mz_tracing::params::TracingParameters;
18use mz_tracing::{CloneableEnvFilter, SerializableDirective};
19use tracing_subscriber::filter::Directive;
20
21pub const SIGTERM_CONNECTION_WAIT: Config<Duration> = Config::new(
27 "balancerd_sigterm_connection_wait",
28 Duration::from_secs(60 * 9),
29 "Duration to wait after listeners closed via SIGTERM for outstanding connections to complete.",
30);
31
32pub const SIGTERM_LISTEN_WAIT: Config<Duration> = Config::new(
34 "balancerd_sigterm_listen_wait",
35 Duration::from_secs(60),
36 "Duration to wait after SIGTERM to begin shutdown of servers.",
37);
38
39pub const INJECT_PROXY_PROTOCOL_HEADER_HTTP: Config<bool> = Config::new(
41 "balancerd_inject_proxy_protocol_header_http",
42 false,
43 "Whether to inject tcp proxy protocol headers to downstream http servers.",
44);
45
46pub const LOGGING_FILTER: Config<&str> = Config::new(
48 "balancerd_log_filter",
49 "info",
50 "Sets the filter to apply to stderr logging.",
51);
52
53pub const OPENTELEMETRY_FILTER: Config<&str> = Config::new(
55 "balancerd_opentelemetry_filter",
56 "info",
57 "Sets the filter to apply to OpenTelemetry-backed distributed tracing.",
58);
59
60pub const LOGGING_FILTER_DEFAULTS: Config<fn() -> String> = Config::new(
64 "balancerd_log_filter_defaults",
65 || mz_ore::tracing::LOGGING_DEFAULTS_STR.join(","),
66 "Sets additional default directives to apply to stderr logging. \
67 These apply to all variations of `log_filter`. Directives other than \
68 `module=off` are likely incorrect. Comma separated list.",
69);
70
71pub const OPENTELEMETRY_FILTER_DEFAULTS: Config<fn() -> String> = Config::new(
76 "balancerd_opentelemetry_filter_defaults",
77 || mz_ore::tracing::OPENTELEMETRY_DEFAULTS_STR.join(","),
78 "Sets additional default directives to apply to OpenTelemetry-backed \
79 distributed tracing. \
80 These apply to all variations of `opentelemetry_filter`. Directives other than \
81 `module=off` are likely incorrect. Comma separated list.",
82);
83
84pub const SENTRY_FILTERS: Config<fn() -> String> = Config::new(
88 "balancerd_sentry_filters",
89 || mz_ore::tracing::SENTRY_DEFAULTS_STR.join(","),
90 "Sets additional default directives to apply to sentry logging. \
91 These apply on top of a default `info` directive. Directives other than \
92 `module=off` are likely incorrect. Comma separated list.",
93);
94
95pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
97 configs
98 .add(&SIGTERM_CONNECTION_WAIT)
99 .add(&SIGTERM_LISTEN_WAIT)
100 .add(&INJECT_PROXY_PROTOCOL_HEADER_HTTP)
101 .add(&LOGGING_FILTER)
102 .add(&OPENTELEMETRY_FILTER)
103 .add(&LOGGING_FILTER_DEFAULTS)
104 .add(&OPENTELEMETRY_FILTER_DEFAULTS)
105 .add(&SENTRY_FILTERS)
106}
107
108pub(crate) fn set_defaults(
117 config_set: &ConfigSet,
118 default_config: Vec<(String, String)>,
119) -> Result<(), anyhow::Error> {
120 let mut config_updates = ConfigUpdates::default();
121 for (k, v) in default_config.iter() {
122 if k.as_str() == INJECT_PROXY_PROTOCOL_HEADER_HTTP.name() {
123 config_updates.add_dynamic(
124 INJECT_PROXY_PROTOCOL_HEADER_HTTP.name(),
125 mz_dyncfg::ConfigVal::Bool(bool::from_str(v)?),
126 )
127 } else {
128 return Err(anyhow!("Invalid default config value {k}"));
129 }
130 }
131 config_updates.apply(config_set);
132 Ok(())
133}
134
135pub fn tracing_config(configs: &ConfigSet) -> Result<TracingParameters, String> {
137 fn to_serializable_directives(
138 config: &Config<fn() -> String>,
139 configs: &ConfigSet,
140 ) -> Result<Vec<SerializableDirective>, String> {
141 let directives = config.get(configs);
142 let directives: Vec<_> = directives
143 .split(',')
144 .map(Directive::from_str)
145 .collect::<Result<_, _>>()
146 .map_err(|e| e.to_string())?;
147 Ok(directives.into_iter().map(|d| d.into()).collect())
148 }
149
150 let log_filter = LOGGING_FILTER.get(configs);
151 let log_filter = CloneableEnvFilter::from_str(&log_filter).map_err(|e| e.to_string())?;
152
153 let opentelemetry_filter = OPENTELEMETRY_FILTER.get(configs);
154 let opentelemetry_filter =
155 CloneableEnvFilter::from_str(&opentelemetry_filter).map_err(|e| e.to_string())?;
156
157 let log_filter_defaults = to_serializable_directives(&LOGGING_FILTER_DEFAULTS, configs)?;
158
159 let opentelemetry_filter_defaults =
160 to_serializable_directives(&OPENTELEMETRY_FILTER_DEFAULTS, configs)?;
161
162 let sentry_filters = to_serializable_directives(&SENTRY_FILTERS, configs)?;
163
164 Ok(TracingParameters {
165 log_filter: Some(log_filter),
166 opentelemetry_filter: Some(opentelemetry_filter),
167 log_filter_defaults,
168 opentelemetry_filter_defaults,
169 sentry_filters,
170 })
171}
172
173pub fn has_tracing_config_update(updates: &ConfigUpdates) -> bool {
175 [
176 LOGGING_FILTER.name(),
177 OPENTELEMETRY_FILTER.name(),
178 LOGGING_FILTER_DEFAULTS.name(),
179 OPENTELEMETRY_FILTER_DEFAULTS.name(),
180 SENTRY_FILTERS.name(),
181 ]
182 .into_iter()
183 .any(|name| updates.updates.contains_key(name))
184}