1use std::collections::BTreeMap;
13use std::ffi::OsString;
14use std::fmt;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use clap::{CommandFactory, FromArgMatches};
20use derivative::Derivative;
21use futures_core::stream::BoxStream;
22use http::header::{HeaderName, HeaderValue};
23use mz_build_info::BuildInfo;
24#[cfg(feature = "tokio-console")]
25use mz_orchestrator::ServicePort;
26use mz_orchestrator::{
27 NamespacedOrchestrator, Orchestrator, Service, ServiceConfig, ServiceEvent,
28 ServiceProcessMetrics,
29};
30use mz_ore::cli::KeyValueArg;
31use mz_ore::metrics::MetricsRegistry;
32#[cfg(feature = "tokio-console")]
33use mz_ore::netio::SocketAddr;
34#[cfg(feature = "tokio-console")]
35use mz_ore::tracing::TokioConsoleConfig;
36use mz_ore::tracing::{
37 OpenTelemetryConfig, SentryConfig, StderrLogConfig, StderrLogFormat, TracingConfig,
38 TracingGuard, TracingHandle,
39};
40use mz_tracing::CloneableEnvFilter;
41use opentelemetry::KeyValue;
42use opentelemetry_sdk::resource::Resource;
43
44#[derive(Derivative, Clone, clap::Parser)]
54#[derivative(Debug)]
55pub struct TracingCliArgs {
56 #[clap(
89 long,
90 env = "STARTUP_LOG_FILTER",
91 value_name = "FILTER",
92 default_value = "info"
93 )]
94 pub startup_log_filter: CloneableEnvFilter,
95 #[clap(long, env = "LOG_FORMAT", default_value_t, value_enum)]
97 pub log_format: LogFormat,
98 #[clap(long, env = "LOG_PREFIX")]
102 pub log_prefix: Option<String>,
103 #[clap(
109 long,
110 env = "OPENTELEMETRY_MAX_BATCH_QUEUE_SIZE",
111 default_value = "2048",
112 requires = "opentelemetry_endpoint"
113 )]
114 pub opentelemetry_max_batch_queue_size: usize,
115 #[clap(
117 long,
118 env = "OPENTELEMETRY_MAX_EXPORT_BATCH_SIZE",
119 default_value = "512",
120 requires = "opentelemetry_endpoint"
121 )]
122 pub opentelemetry_max_export_batch_size: usize,
123 #[clap(
125 long,
126 env = "OPENTELEMETRY_MAX_CONCURRENT_EXPORTS",
127 default_value = "1",
128 requires = "opentelemetry_endpoint"
129 )]
130 pub opentelemetry_max_concurrent_exports: usize,
131 #[clap(
133 long,
134 env = "OPENTELEMETRY_SCHED_DELAY",
135 default_value = "5000ms",
136 requires = "opentelemetry_endpoint",
137 value_parser = humantime::parse_duration,
138 )]
139 pub opentelemetry_sched_delay: Duration,
140 #[clap(
142 long,
143 env = "OPENTELEMETRY_MAX_EXPORT_TIMEOUT",
144 default_value = "30s",
145 requires = "opentelemetry_endpoint",
146 value_parser = humantime::parse_duration,
147 )]
148 pub opentelemetry_max_export_timeout: Duration,
149 #[clap(long, env = "OPENTELEMETRY_ENDPOINT")]
155 pub opentelemetry_endpoint: Option<String>,
156 #[clap(
163 long,
164 env = "OPENTELEMETRY_HEADER",
165 requires = "opentelemetry_endpoint",
166 value_name = "NAME=VALUE",
167 use_value_delimiter = true
168 )]
169 pub opentelemetry_header: Vec<KeyValueArg<HeaderName, HeaderValue>>,
170 #[clap(
178 long,
179 env = "STARTUP_OPENTELEMETRY_FILTER",
180 requires = "opentelemetry_endpoint",
181 default_value = "info"
182 )]
183 pub startup_opentelemetry_filter: CloneableEnvFilter,
184 #[clap(
190 long,
191 env = "OPENTELEMETRY_RESOURCE",
192 value_name = "NAME=VALUE",
193 use_value_delimiter = true
194 )]
195 pub opentelemetry_resource: Vec<KeyValueArg<String, String>>,
196 #[cfg(feature = "tokio-console")]
202 #[clap(long, env = "TOKIO_CONSOLE_LISTEN_ADDR")]
203 pub tokio_console_listen_addr: Option<SocketAddr>,
204 #[cfg(feature = "tokio-console")]
208 #[clap(
209 long,
210 env = "TOKIO_CONSOLE_PUBLISH_INTERVAL",
211 requires = "tokio_console_listen_addr",
212 value_parser = humantime::parse_duration,
213 default_value = "1s",
214 )]
215 pub tokio_console_publish_interval: Duration,
216 #[cfg(feature = "tokio-console")]
220 #[clap(
221 long,
222 env = "TOKIO_CONSOLE_RETENTION",
223 requires = "tokio_console_listen_addr",
224 value_parser = humantime::parse_duration,
225 default_value = "1h",
226 )]
227 pub tokio_console_retention: Duration,
228 #[clap(long, env = "SENTRY_DSN")]
230 pub sentry_dsn: Option<String>,
231 #[clap(long, env = "SENTRY_ENVIRONMENT")]
237 pub sentry_environment: Option<String>,
238 #[clap(
243 long,
244 env = "SENTRY_TAG",
245 value_name = "NAME=VALUE",
246 use_value_delimiter = true
247 )]
248 pub sentry_tag: Vec<KeyValueArg<String, String>>,
249 #[cfg(feature = "capture")]
251 #[derivative(Debug = "ignore")]
252 #[clap(skip)]
253 pub capture: Option<tracing_capture::SharedStorage>,
254}
255
256impl Default for TracingCliArgs {
257 fn default() -> TracingCliArgs {
258 let matches = TracingCliArgs::command().get_matches_from::<_, OsString>([]);
259 TracingCliArgs::from_arg_matches(&matches)
260 .expect("no arguments produce valid TracingCliArgs")
261 }
262}
263
264impl TracingCliArgs {
265 pub async fn configure_tracing(
266 &self,
267 StaticTracingConfig {
268 service_name,
269 build_info,
270 }: StaticTracingConfig,
271 registry: MetricsRegistry,
272 ) -> Result<(TracingHandle, TracingGuard), anyhow::Error> {
273 mz_ore::tracing::configure(TracingConfig {
274 service_name,
275 stderr_log: StderrLogConfig {
276 format: match self.log_format {
277 LogFormat::Text => StderrLogFormat::Text {
278 prefix: self.log_prefix.clone(),
279 },
280 LogFormat::Json => StderrLogFormat::Json,
281 },
282 filter: self.startup_log_filter.clone().into(),
283 },
284 opentelemetry: self.opentelemetry_endpoint.clone().map(|endpoint| {
285 OpenTelemetryConfig {
286 endpoint,
287 headers: self
288 .opentelemetry_header
289 .iter()
290 .map(|header| (header.key.clone(), header.value.clone()))
291 .collect(),
292 filter: self.startup_opentelemetry_filter.clone().into(),
293 max_batch_queue_size: self.opentelemetry_max_batch_queue_size,
294 max_export_batch_size: self.opentelemetry_max_export_batch_size,
295 max_concurrent_exports: self.opentelemetry_max_concurrent_exports,
296 batch_scheduled_delay: self.opentelemetry_sched_delay,
297 max_export_timeout: self.opentelemetry_max_export_timeout,
298 resource: Resource::new(
299 self.opentelemetry_resource
300 .iter()
301 .cloned()
302 .map(|kv| KeyValue::new(kv.key, kv.value)),
303 ),
304 }
305 }),
306 #[cfg(feature = "tokio-console")]
307 tokio_console: self.tokio_console_listen_addr.clone().map(|listen_addr| {
308 TokioConsoleConfig {
309 listen_addr,
310 publish_interval: self.tokio_console_publish_interval,
311 retention: self.tokio_console_retention,
312 }
313 }),
314 sentry: self.sentry_dsn.clone().map(|dsn| SentryConfig {
315 dsn,
316 environment: self.sentry_environment.clone(),
317 tags: self
318 .opentelemetry_resource
319 .iter()
320 .cloned()
321 .chain(self.sentry_tag.iter().cloned())
322 .map(|kv| (kv.key, kv.value))
323 .collect(),
324 event_filter: mz_service::tracing::mz_sentry_event_filter,
325 }),
326 build_version: build_info.version,
327 build_sha: build_info.sha,
328 registry,
329 #[cfg(feature = "capture")]
330 capture: self.capture.clone(),
331 })
332 .await
333 }
334}
335
336pub struct StaticTracingConfig {
338 pub service_name: &'static str,
340 pub build_info: BuildInfo,
342}
343
344#[derive(Debug)]
346pub struct TracingOrchestrator {
347 inner: Arc<dyn Orchestrator>,
348 tracing_args: TracingCliArgs,
349}
350
351impl TracingOrchestrator {
352 pub fn new(inner: Arc<dyn Orchestrator>, tracing_args: TracingCliArgs) -> TracingOrchestrator {
361 TracingOrchestrator {
362 inner,
363 tracing_args,
364 }
365 }
366}
367
368impl Orchestrator for TracingOrchestrator {
369 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
370 Arc::new(NamespacedTracingOrchestrator {
371 namespace: namespace.to_string(),
372 inner: self.inner.namespace(namespace),
373 tracing_args: self.tracing_args.clone(),
374 })
375 }
376}
377
378#[derive(Debug)]
379struct NamespacedTracingOrchestrator {
380 namespace: String,
381 inner: Arc<dyn NamespacedOrchestrator>,
382 tracing_args: TracingCliArgs,
383}
384
385#[async_trait]
386impl NamespacedOrchestrator for NamespacedTracingOrchestrator {
387 async fn fetch_service_metrics(
388 &self,
389 id: &str,
390 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
391 self.inner.fetch_service_metrics(id).await
392 }
393
394 fn ensure_service(
395 &self,
396 id: &str,
397 mut service_config: ServiceConfig,
398 ) -> Result<Box<dyn Service>, anyhow::Error> {
399 let tracing_args = self.tracing_args.clone();
400 let log_prefix_arg = format!("{}-{}", self.namespace, id);
401 let args_fn = move |listen_addrs: &BTreeMap<String, String>| {
402 #[cfg(feature = "tokio-console")]
403 let tokio_console_listen_addr = listen_addrs.get("tokio-console");
404 let mut args = (service_config.args)(listen_addrs);
405 let TracingCliArgs {
406 startup_log_filter,
407 log_prefix,
408 log_format,
409 opentelemetry_max_batch_queue_size,
410 opentelemetry_max_export_batch_size,
411 opentelemetry_max_concurrent_exports,
412 opentelemetry_sched_delay,
413 opentelemetry_max_export_timeout,
414 opentelemetry_endpoint,
415 opentelemetry_header,
416 startup_opentelemetry_filter: _,
417 opentelemetry_resource,
418 #[cfg(feature = "tokio-console")]
419 tokio_console_listen_addr: _,
420 #[cfg(feature = "tokio-console")]
421 tokio_console_publish_interval,
422 #[cfg(feature = "tokio-console")]
423 tokio_console_retention,
424 sentry_dsn,
425 sentry_environment,
426 sentry_tag,
427 #[cfg(feature = "capture")]
428 capture: _,
429 } = &tracing_args;
430 args.push(format!("--startup-log-filter={startup_log_filter}"));
431 args.push(format!("--log-format={log_format}"));
432 if log_prefix.is_some() {
433 args.push(format!("--log-prefix={log_prefix_arg}"));
434 }
435 if let Some(endpoint) = opentelemetry_endpoint {
436 args.push(format!("--opentelemetry-endpoint={endpoint}"));
437 for kv in opentelemetry_header {
438 args.push(format!(
439 "--opentelemetry-header={}={}",
440 kv.key,
441 kv.value
442 .to_str()
443 .expect("opentelemetry-header had non-ascii value"),
444 ));
445 }
446 args.push(format!(
447 "--opentelemetry-max-batch-queue-size={opentelemetry_max_batch_queue_size}",
448 ));
449 args.push(format!(
450 "--opentelemetry-max-export-batch-size={opentelemetry_max_export_batch_size}",
451 ));
452 args.push(format!(
453 "--opentelemetry-max-concurrent-exports={opentelemetry_max_concurrent_exports}",
454 ));
455 args.push(format!(
456 "--opentelemetry-sched-delay={}ms",
457 opentelemetry_sched_delay.as_millis(),
458 ));
459 args.push(format!(
460 "--opentelemetry-max-export-timeout={}ms",
461 opentelemetry_max_export_timeout.as_millis(),
462 ));
463 }
464 #[cfg(feature = "tokio-console")]
465 if let Some(tokio_console_listen_addr) = tokio_console_listen_addr {
466 args.push(format!(
467 "--tokio-console-listen-addr={}",
468 tokio_console_listen_addr,
469 ));
470 args.push(format!(
471 "--tokio-console-publish-interval={} us",
472 tokio_console_publish_interval.as_micros(),
473 ));
474 args.push(format!(
475 "--tokio-console-retention={} us",
476 tokio_console_retention.as_micros(),
477 ));
478 }
479 if let Some(dsn) = sentry_dsn {
480 args.push(format!("--sentry-dsn={dsn}"));
481 for kv in sentry_tag {
482 args.push(format!("--sentry-tag={}={}", kv.key, kv.value));
483 }
484 }
485 if let Some(environment) = sentry_environment {
486 args.push(format!("--sentry-environment={environment}"));
487 }
488
489 if opentelemetry_endpoint.is_some() || sentry_dsn.is_some() {
490 for kv in opentelemetry_resource {
491 args.push(format!("--opentelemetry-resource={}={}", kv.key, kv.value));
492 }
493 }
494
495 args
496 };
497 service_config.args = Box::new(args_fn);
498 #[cfg(feature = "tokio-console")]
499 if self.tracing_args.tokio_console_listen_addr.is_some() {
500 service_config.ports.push(ServicePort {
501 name: "tokio-console".into(),
502 port_hint: 6669,
503 });
504 }
505 self.inner.ensure_service(id, service_config)
506 }
507
508 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
509 self.inner.drop_service(id)
510 }
511
512 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
513 self.inner.list_services().await
514 }
515
516 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
517 self.inner.watch_services()
518 }
519
520 fn update_scheduling_config(
521 &self,
522 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
523 ) {
524 self.inner.update_scheduling_config(config)
525 }
526}
527
528#[derive(Debug, Clone, Default, clap::ValueEnum)]
530pub enum LogFormat {
531 #[default]
535 Text,
536 Json,
540}
541
542impl fmt::Display for LogFormat {
543 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544 match self {
545 LogFormat::Text => f.write_str("text"),
546 LogFormat::Json => f.write_str("json"),
547 }
548 }
549}