1use std::ffi::OsString;
13use std::fmt;
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use clap::{CommandFactory, FromArgMatches};
19use derivative::Derivative;
20use futures_core::stream::BoxStream;
21use http::header::{HeaderName, HeaderValue};
22use mz_build_info::BuildInfo;
23#[cfg(feature = "tokio-console")]
24use mz_orchestrator::ServicePort;
25use mz_orchestrator::{
26 NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
27 ServiceProcessMetrics,
28};
29use mz_ore::cli::KeyValueArg;
30use mz_ore::metrics::MetricsRegistry;
31#[cfg(feature = "tokio-console")]
32use mz_ore::netio::SocketAddr;
33#[cfg(feature = "tokio-console")]
34use mz_ore::tracing::TokioConsoleConfig;
35use mz_ore::tracing::{
36 OpenTelemetryConfig, SentryConfig, StderrLogConfig, StderrLogFormat, TracingConfig,
37 TracingHandle,
38};
39use mz_tracing::CloneableEnvFilter;
40use opentelemetry::KeyValue;
41use opentelemetry_sdk::resource::Resource;
42
43#[derive(Derivative, Clone, clap::Parser)]
53#[derivative(Debug)]
54pub struct TracingCliArgs {
55 #[clap(
88 long,
89 env = "STARTUP_LOG_FILTER",
90 value_name = "FILTER",
91 default_value = "info"
92 )]
93 pub startup_log_filter: CloneableEnvFilter,
94 #[clap(long, env = "LOG_FORMAT", default_value_t, value_enum)]
96 pub log_format: LogFormat,
97 #[clap(long, env = "LOG_PREFIX")]
101 pub log_prefix: Option<String>,
102 #[clap(
108 long,
109 env = "OPENTELEMETRY_MAX_BATCH_QUEUE_SIZE",
110 default_value = "2048",
111 requires = "opentelemetry_endpoint"
112 )]
113 pub opentelemetry_max_batch_queue_size: usize,
114 #[clap(
116 long,
117 env = "OPENTELEMETRY_MAX_EXPORT_BATCH_SIZE",
118 default_value = "512",
119 requires = "opentelemetry_endpoint"
120 )]
121 pub opentelemetry_max_export_batch_size: usize,
122 #[clap(
124 long,
125 env = "OPENTELEMETRY_MAX_CONCURRENT_EXPORTS",
126 default_value = "1",
127 requires = "opentelemetry_endpoint"
128 )]
129 pub opentelemetry_max_concurrent_exports: usize,
130 #[clap(
132 long,
133 env = "OPENTELEMETRY_SCHED_DELAY",
134 default_value = "5000ms",
135 requires = "opentelemetry_endpoint",
136 value_parser = humantime::parse_duration,
137 )]
138 pub opentelemetry_sched_delay: Duration,
139 #[clap(
141 long,
142 env = "OPENTELEMETRY_MAX_EXPORT_TIMEOUT",
143 default_value = "30s",
144 requires = "opentelemetry_endpoint",
145 value_parser = humantime::parse_duration,
146 )]
147 pub opentelemetry_max_export_timeout: Duration,
148 #[clap(long, env = "OPENTELEMETRY_ENDPOINT")]
154 pub opentelemetry_endpoint: Option<String>,
155 #[clap(
162 long,
163 env = "OPENTELEMETRY_HEADER",
164 requires = "opentelemetry_endpoint",
165 value_name = "NAME=VALUE",
166 use_value_delimiter = true
167 )]
168 pub opentelemetry_header: Vec<KeyValueArg<HeaderName, HeaderValue>>,
169 #[clap(
177 long,
178 env = "STARTUP_OPENTELEMETRY_FILTER",
179 requires = "opentelemetry_endpoint",
180 default_value = "info"
181 )]
182 pub startup_opentelemetry_filter: CloneableEnvFilter,
183 #[clap(
189 long,
190 env = "OPENTELEMETRY_RESOURCE",
191 value_name = "NAME=VALUE",
192 use_value_delimiter = true
193 )]
194 pub opentelemetry_resource: Vec<KeyValueArg<String, String>>,
195 #[cfg(feature = "tokio-console")]
201 #[clap(long, env = "TOKIO_CONSOLE_LISTEN_ADDR")]
202 pub tokio_console_listen_addr: Option<SocketAddr>,
203 #[cfg(feature = "tokio-console")]
207 #[clap(
208 long,
209 env = "TOKIO_CONSOLE_PUBLISH_INTERVAL",
210 requires = "tokio_console_listen_addr",
211 value_parser = humantime::parse_duration,
212 default_value = "1s",
213 )]
214 pub tokio_console_publish_interval: Duration,
215 #[cfg(feature = "tokio-console")]
219 #[clap(
220 long,
221 env = "TOKIO_CONSOLE_RETENTION",
222 requires = "tokio_console_listen_addr",
223 value_parser = humantime::parse_duration,
224 default_value = "1h",
225 )]
226 pub tokio_console_retention: Duration,
227 #[clap(long, env = "SENTRY_DSN")]
229 pub sentry_dsn: Option<String>,
230 #[clap(long, env = "SENTRY_ENVIRONMENT")]
236 pub sentry_environment: Option<String>,
237 #[clap(
242 long,
243 env = "SENTRY_TAG",
244 value_name = "NAME=VALUE",
245 use_value_delimiter = true
246 )]
247 pub sentry_tag: Vec<KeyValueArg<String, String>>,
248 #[cfg(feature = "capture")]
250 #[derivative(Debug = "ignore")]
251 #[clap(skip)]
252 pub capture: Option<tracing_capture::SharedStorage>,
253}
254
255impl Default for TracingCliArgs {
256 fn default() -> TracingCliArgs {
257 let matches = TracingCliArgs::command().get_matches_from::<_, OsString>([]);
258 TracingCliArgs::from_arg_matches(&matches)
259 .expect("no arguments produce valid TracingCliArgs")
260 }
261}
262
263impl TracingCliArgs {
264 pub async fn configure_tracing(
265 &self,
266 StaticTracingConfig {
267 service_name,
268 build_info,
269 }: StaticTracingConfig,
270 registry: MetricsRegistry,
271 ) -> Result<TracingHandle, anyhow::Error> {
272 mz_ore::tracing::configure(TracingConfig {
273 service_name,
274 stderr_log: StderrLogConfig {
275 format: match self.log_format {
276 LogFormat::Text => StderrLogFormat::Text {
277 prefix: self.log_prefix.clone(),
278 },
279 LogFormat::Json => StderrLogFormat::Json,
280 },
281 filter: self.startup_log_filter.clone().into(),
282 },
283 opentelemetry: self.opentelemetry_endpoint.clone().map(|endpoint| {
284 OpenTelemetryConfig {
285 endpoint,
286 headers: self
287 .opentelemetry_header
288 .iter()
289 .map(|header| (header.key.clone(), header.value.clone()))
290 .collect(),
291 filter: self.startup_opentelemetry_filter.clone().into(),
292 max_batch_queue_size: self.opentelemetry_max_batch_queue_size,
293 max_export_batch_size: self.opentelemetry_max_export_batch_size,
294 max_concurrent_exports: self.opentelemetry_max_concurrent_exports,
295 batch_scheduled_delay: self.opentelemetry_sched_delay,
296 max_export_timeout: self.opentelemetry_max_export_timeout,
297 resource: {
298 Resource::builder()
299 .with_attributes(
300 self.opentelemetry_resource
301 .iter()
302 .cloned()
303 .map(|kv| KeyValue::new(kv.key, kv.value)),
304 )
305 .build()
306 },
307 }
308 }),
309 #[cfg(feature = "tokio-console")]
310 tokio_console: self.tokio_console_listen_addr.clone().map(|listen_addr| {
311 TokioConsoleConfig {
312 listen_addr,
313 publish_interval: self.tokio_console_publish_interval,
314 retention: self.tokio_console_retention,
315 }
316 }),
317 sentry: self.sentry_dsn.clone().map(|dsn| SentryConfig {
318 dsn,
319 environment: self.sentry_environment.clone(),
320 tags: self
321 .opentelemetry_resource
322 .iter()
323 .cloned()
324 .chain(self.sentry_tag.iter().cloned())
325 .map(|kv| (kv.key, kv.value))
326 .collect(),
327 event_filter: mz_service::tracing::mz_sentry_event_filter,
328 }),
329 build_version: build_info.version,
330 build_sha: build_info.sha,
331 registry,
332 #[cfg(feature = "capture")]
333 capture: self.capture.clone(),
334 })
335 .await
336 }
337}
338
339pub struct StaticTracingConfig {
341 pub service_name: &'static str,
343 pub build_info: BuildInfo,
345}
346
347#[derive(Debug)]
349pub struct TracingOrchestrator {
350 inner: Arc<dyn Orchestrator>,
351 tracing_args: TracingCliArgs,
352}
353
354impl TracingOrchestrator {
355 pub fn new(inner: Arc<dyn Orchestrator>, tracing_args: TracingCliArgs) -> TracingOrchestrator {
364 TracingOrchestrator {
365 inner,
366 tracing_args,
367 }
368 }
369}
370
371impl Orchestrator for TracingOrchestrator {
372 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
373 Arc::new(NamespacedTracingOrchestrator {
374 namespace: namespace.to_string(),
375 inner: self.inner.namespace(namespace),
376 tracing_args: self.tracing_args.clone(),
377 })
378 }
379}
380
381#[derive(Debug)]
382struct NamespacedTracingOrchestrator {
383 namespace: String,
384 inner: Arc<dyn NamespacedOrchestrator>,
385 tracing_args: TracingCliArgs,
386}
387
388#[async_trait]
389impl NamespacedOrchestrator for NamespacedTracingOrchestrator {
390 async fn fetch_service_metrics(
391 &self,
392 id: &str,
393 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
394 self.inner.fetch_service_metrics(id).await
395 }
396
397 fn ensure_service(
398 &self,
399 id: &str,
400 mut service_config: ServiceConfig,
401 ) -> Result<Box<dyn Service>, anyhow::Error> {
402 let tracing_args = self.tracing_args.clone();
403 let log_prefix_arg = format!("{}-{}", self.namespace, id);
404 let args_fn = move |assigned: ServiceAssignments| {
405 #[cfg(feature = "tokio-console")]
406 let tokio_console_listen_addr = assigned.listen_addrs.get("tokio-console");
407 let mut args = (service_config.args)(assigned);
408 let TracingCliArgs {
409 startup_log_filter,
410 log_prefix,
411 log_format,
412 opentelemetry_max_batch_queue_size,
413 opentelemetry_max_export_batch_size,
414 opentelemetry_max_concurrent_exports,
415 opentelemetry_sched_delay,
416 opentelemetry_max_export_timeout,
417 opentelemetry_endpoint,
418 opentelemetry_header,
419 startup_opentelemetry_filter: _,
420 opentelemetry_resource,
421 #[cfg(feature = "tokio-console")]
422 tokio_console_listen_addr: _,
423 #[cfg(feature = "tokio-console")]
424 tokio_console_publish_interval,
425 #[cfg(feature = "tokio-console")]
426 tokio_console_retention,
427 sentry_dsn,
428 sentry_environment,
429 sentry_tag,
430 #[cfg(feature = "capture")]
431 capture: _,
432 } = &tracing_args;
433 args.push(format!("--startup-log-filter={startup_log_filter}"));
434 args.push(format!("--log-format={log_format}"));
435 if log_prefix.is_some() {
436 args.push(format!("--log-prefix={log_prefix_arg}"));
437 }
438 if let Some(endpoint) = opentelemetry_endpoint {
439 args.push(format!("--opentelemetry-endpoint={endpoint}"));
440 for kv in opentelemetry_header {
441 args.push(format!(
442 "--opentelemetry-header={}={}",
443 kv.key,
444 kv.value
445 .to_str()
446 .expect("opentelemetry-header had non-ascii value"),
447 ));
448 }
449 args.push(format!(
450 "--opentelemetry-max-batch-queue-size={opentelemetry_max_batch_queue_size}",
451 ));
452 args.push(format!(
453 "--opentelemetry-max-export-batch-size={opentelemetry_max_export_batch_size}",
454 ));
455 args.push(format!(
456 "--opentelemetry-max-concurrent-exports={opentelemetry_max_concurrent_exports}",
457 ));
458 args.push(format!(
459 "--opentelemetry-sched-delay={}ms",
460 opentelemetry_sched_delay.as_millis(),
461 ));
462 args.push(format!(
463 "--opentelemetry-max-export-timeout={}ms",
464 opentelemetry_max_export_timeout.as_millis(),
465 ));
466 }
467 #[cfg(feature = "tokio-console")]
468 if let Some(tokio_console_listen_addr) = tokio_console_listen_addr {
469 args.push(format!(
470 "--tokio-console-listen-addr={}",
471 tokio_console_listen_addr,
472 ));
473 args.push(format!(
474 "--tokio-console-publish-interval={} us",
475 tokio_console_publish_interval.as_micros(),
476 ));
477 args.push(format!(
478 "--tokio-console-retention={} us",
479 tokio_console_retention.as_micros(),
480 ));
481 }
482 if let Some(dsn) = sentry_dsn {
483 args.push(format!("--sentry-dsn={dsn}"));
484 for kv in sentry_tag {
485 args.push(format!("--sentry-tag={}={}", kv.key, kv.value));
486 }
487 }
488 if let Some(environment) = sentry_environment {
489 args.push(format!("--sentry-environment={environment}"));
490 }
491
492 if opentelemetry_endpoint.is_some() || sentry_dsn.is_some() {
493 for kv in opentelemetry_resource {
494 args.push(format!("--opentelemetry-resource={}={}", kv.key, kv.value));
495 }
496 }
497
498 args
499 };
500 service_config.args = Box::new(args_fn);
501 #[cfg(feature = "tokio-console")]
502 if self.tracing_args.tokio_console_listen_addr.is_some() {
503 service_config.ports.push(ServicePort {
504 name: "tokio-console".into(),
505 port_hint: 6669,
506 });
507 }
508 self.inner.ensure_service(id, service_config)
509 }
510
511 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
512 self.inner.drop_service(id)
513 }
514
515 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
516 self.inner.list_services().await
517 }
518
519 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
520 self.inner.watch_services()
521 }
522
523 fn update_scheduling_config(
524 &self,
525 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
526 ) {
527 self.inner.update_scheduling_config(config)
528 }
529}
530
531#[derive(Debug, Clone, Default, clap::ValueEnum)]
533pub enum LogFormat {
534 #[default]
538 Text,
539 Json,
543}
544
545impl fmt::Display for LogFormat {
546 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
547 match self {
548 LogFormat::Text => f.write_str("text"),
549 LogFormat::Json => f.write_str("json"),
550 }
551 }
552}