1use std::ffi::OsString;
13use std::fmt;
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use clap::{CommandFactory, FromArgMatches};
19use derivative::Derivative;
20use futures_core::stream::BoxStream;
21use http::header::{HeaderName, HeaderValue};
22use mz_build_info::BuildInfo;
23#[cfg(feature = "tokio-console")]
24use mz_orchestrator::ServicePort;
25use mz_orchestrator::{
26 NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
27 ServiceProcessMetrics,
28};
29use mz_ore::cli::KeyValueArg;
30use mz_ore::metrics::MetricsRegistry;
31#[cfg(feature = "tokio-console")]
32use mz_ore::netio::SocketAddr;
33#[cfg(feature = "tokio-console")]
34use mz_ore::tracing::TokioConsoleConfig;
35use mz_ore::tracing::{
36 OpenTelemetryConfig, SentryConfig, StderrLogConfig, StderrLogFormat, TracingConfig,
37 TracingGuard, TracingHandle,
38};
39use mz_tracing::CloneableEnvFilter;
40use opentelemetry::KeyValue;
41use opentelemetry_sdk::resource::Resource;
42
43#[derive(Derivative, Clone, clap::Parser)]
53#[derivative(Debug)]
54pub struct TracingCliArgs {
55 #[clap(
88 long,
89 env = "STARTUP_LOG_FILTER",
90 value_name = "FILTER",
91 default_value = "info"
92 )]
93 pub startup_log_filter: CloneableEnvFilter,
94 #[clap(long, env = "LOG_FORMAT", default_value_t, value_enum)]
96 pub log_format: LogFormat,
97 #[clap(long, env = "LOG_PREFIX")]
101 pub log_prefix: Option<String>,
102 #[clap(
108 long,
109 env = "OPENTELEMETRY_MAX_BATCH_QUEUE_SIZE",
110 default_value = "2048",
111 requires = "opentelemetry_endpoint"
112 )]
113 pub opentelemetry_max_batch_queue_size: usize,
114 #[clap(
116 long,
117 env = "OPENTELEMETRY_MAX_EXPORT_BATCH_SIZE",
118 default_value = "512",
119 requires = "opentelemetry_endpoint"
120 )]
121 pub opentelemetry_max_export_batch_size: usize,
122 #[clap(
124 long,
125 env = "OPENTELEMETRY_MAX_CONCURRENT_EXPORTS",
126 default_value = "1",
127 requires = "opentelemetry_endpoint"
128 )]
129 pub opentelemetry_max_concurrent_exports: usize,
130 #[clap(
132 long,
133 env = "OPENTELEMETRY_SCHED_DELAY",
134 default_value = "5000ms",
135 requires = "opentelemetry_endpoint",
136 value_parser = humantime::parse_duration,
137 )]
138 pub opentelemetry_sched_delay: Duration,
139 #[clap(
141 long,
142 env = "OPENTELEMETRY_MAX_EXPORT_TIMEOUT",
143 default_value = "30s",
144 requires = "opentelemetry_endpoint",
145 value_parser = humantime::parse_duration,
146 )]
147 pub opentelemetry_max_export_timeout: Duration,
148 #[clap(long, env = "OPENTELEMETRY_ENDPOINT")]
154 pub opentelemetry_endpoint: Option<String>,
155 #[clap(
162 long,
163 env = "OPENTELEMETRY_HEADER",
164 requires = "opentelemetry_endpoint",
165 value_name = "NAME=VALUE",
166 use_value_delimiter = true
167 )]
168 pub opentelemetry_header: Vec<KeyValueArg<HeaderName, HeaderValue>>,
169 #[clap(
177 long,
178 env = "STARTUP_OPENTELEMETRY_FILTER",
179 requires = "opentelemetry_endpoint",
180 default_value = "info"
181 )]
182 pub startup_opentelemetry_filter: CloneableEnvFilter,
183 #[clap(
189 long,
190 env = "OPENTELEMETRY_RESOURCE",
191 value_name = "NAME=VALUE",
192 use_value_delimiter = true
193 )]
194 pub opentelemetry_resource: Vec<KeyValueArg<String, String>>,
195 #[cfg(feature = "tokio-console")]
201 #[clap(long, env = "TOKIO_CONSOLE_LISTEN_ADDR")]
202 pub tokio_console_listen_addr: Option<SocketAddr>,
203 #[cfg(feature = "tokio-console")]
207 #[clap(
208 long,
209 env = "TOKIO_CONSOLE_PUBLISH_INTERVAL",
210 requires = "tokio_console_listen_addr",
211 value_parser = humantime::parse_duration,
212 default_value = "1s",
213 )]
214 pub tokio_console_publish_interval: Duration,
215 #[cfg(feature = "tokio-console")]
219 #[clap(
220 long,
221 env = "TOKIO_CONSOLE_RETENTION",
222 requires = "tokio_console_listen_addr",
223 value_parser = humantime::parse_duration,
224 default_value = "1h",
225 )]
226 pub tokio_console_retention: Duration,
227 #[clap(long, env = "SENTRY_DSN")]
229 pub sentry_dsn: Option<String>,
230 #[clap(long, env = "SENTRY_ENVIRONMENT")]
236 pub sentry_environment: Option<String>,
237 #[clap(
242 long,
243 env = "SENTRY_TAG",
244 value_name = "NAME=VALUE",
245 use_value_delimiter = true
246 )]
247 pub sentry_tag: Vec<KeyValueArg<String, String>>,
248 #[cfg(feature = "capture")]
250 #[derivative(Debug = "ignore")]
251 #[clap(skip)]
252 pub capture: Option<tracing_capture::SharedStorage>,
253}
254
255impl Default for TracingCliArgs {
256 fn default() -> TracingCliArgs {
257 let matches = TracingCliArgs::command().get_matches_from::<_, OsString>([]);
258 TracingCliArgs::from_arg_matches(&matches)
259 .expect("no arguments produce valid TracingCliArgs")
260 }
261}
262
263impl TracingCliArgs {
264 pub async fn configure_tracing(
265 &self,
266 StaticTracingConfig {
267 service_name,
268 build_info,
269 }: StaticTracingConfig,
270 registry: MetricsRegistry,
271 ) -> Result<(TracingHandle, TracingGuard), anyhow::Error> {
272 mz_ore::tracing::configure(TracingConfig {
273 service_name,
274 stderr_log: StderrLogConfig {
275 format: match self.log_format {
276 LogFormat::Text => StderrLogFormat::Text {
277 prefix: self.log_prefix.clone(),
278 },
279 LogFormat::Json => StderrLogFormat::Json,
280 },
281 filter: self.startup_log_filter.clone().into(),
282 },
283 opentelemetry: self.opentelemetry_endpoint.clone().map(|endpoint| {
284 OpenTelemetryConfig {
285 endpoint,
286 headers: self
287 .opentelemetry_header
288 .iter()
289 .map(|header| (header.key.clone(), header.value.clone()))
290 .collect(),
291 filter: self.startup_opentelemetry_filter.clone().into(),
292 max_batch_queue_size: self.opentelemetry_max_batch_queue_size,
293 max_export_batch_size: self.opentelemetry_max_export_batch_size,
294 max_concurrent_exports: self.opentelemetry_max_concurrent_exports,
295 batch_scheduled_delay: self.opentelemetry_sched_delay,
296 max_export_timeout: self.opentelemetry_max_export_timeout,
297 resource: Resource::new(
298 self.opentelemetry_resource
299 .iter()
300 .cloned()
301 .map(|kv| KeyValue::new(kv.key, kv.value)),
302 ),
303 }
304 }),
305 #[cfg(feature = "tokio-console")]
306 tokio_console: self.tokio_console_listen_addr.clone().map(|listen_addr| {
307 TokioConsoleConfig {
308 listen_addr,
309 publish_interval: self.tokio_console_publish_interval,
310 retention: self.tokio_console_retention,
311 }
312 }),
313 sentry: self.sentry_dsn.clone().map(|dsn| SentryConfig {
314 dsn,
315 environment: self.sentry_environment.clone(),
316 tags: self
317 .opentelemetry_resource
318 .iter()
319 .cloned()
320 .chain(self.sentry_tag.iter().cloned())
321 .map(|kv| (kv.key, kv.value))
322 .collect(),
323 event_filter: mz_service::tracing::mz_sentry_event_filter,
324 }),
325 build_version: build_info.version,
326 build_sha: build_info.sha,
327 registry,
328 #[cfg(feature = "capture")]
329 capture: self.capture.clone(),
330 })
331 .await
332 }
333}
334
335pub struct StaticTracingConfig {
337 pub service_name: &'static str,
339 pub build_info: BuildInfo,
341}
342
343#[derive(Debug)]
345pub struct TracingOrchestrator {
346 inner: Arc<dyn Orchestrator>,
347 tracing_args: TracingCliArgs,
348}
349
350impl TracingOrchestrator {
351 pub fn new(inner: Arc<dyn Orchestrator>, tracing_args: TracingCliArgs) -> TracingOrchestrator {
360 TracingOrchestrator {
361 inner,
362 tracing_args,
363 }
364 }
365}
366
367impl Orchestrator for TracingOrchestrator {
368 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
369 Arc::new(NamespacedTracingOrchestrator {
370 namespace: namespace.to_string(),
371 inner: self.inner.namespace(namespace),
372 tracing_args: self.tracing_args.clone(),
373 })
374 }
375}
376
377#[derive(Debug)]
378struct NamespacedTracingOrchestrator {
379 namespace: String,
380 inner: Arc<dyn NamespacedOrchestrator>,
381 tracing_args: TracingCliArgs,
382}
383
384#[async_trait]
385impl NamespacedOrchestrator for NamespacedTracingOrchestrator {
386 async fn fetch_service_metrics(
387 &self,
388 id: &str,
389 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
390 self.inner.fetch_service_metrics(id).await
391 }
392
393 fn ensure_service(
394 &self,
395 id: &str,
396 mut service_config: ServiceConfig,
397 ) -> Result<Box<dyn Service>, anyhow::Error> {
398 let tracing_args = self.tracing_args.clone();
399 let log_prefix_arg = format!("{}-{}", self.namespace, id);
400 let args_fn = move |assigned: ServiceAssignments| {
401 #[cfg(feature = "tokio-console")]
402 let tokio_console_listen_addr = assigned.listen_addrs.get("tokio-console");
403 let mut args = (service_config.args)(assigned);
404 let TracingCliArgs {
405 startup_log_filter,
406 log_prefix,
407 log_format,
408 opentelemetry_max_batch_queue_size,
409 opentelemetry_max_export_batch_size,
410 opentelemetry_max_concurrent_exports,
411 opentelemetry_sched_delay,
412 opentelemetry_max_export_timeout,
413 opentelemetry_endpoint,
414 opentelemetry_header,
415 startup_opentelemetry_filter: _,
416 opentelemetry_resource,
417 #[cfg(feature = "tokio-console")]
418 tokio_console_listen_addr: _,
419 #[cfg(feature = "tokio-console")]
420 tokio_console_publish_interval,
421 #[cfg(feature = "tokio-console")]
422 tokio_console_retention,
423 sentry_dsn,
424 sentry_environment,
425 sentry_tag,
426 #[cfg(feature = "capture")]
427 capture: _,
428 } = &tracing_args;
429 args.push(format!("--startup-log-filter={startup_log_filter}"));
430 args.push(format!("--log-format={log_format}"));
431 if log_prefix.is_some() {
432 args.push(format!("--log-prefix={log_prefix_arg}"));
433 }
434 if let Some(endpoint) = opentelemetry_endpoint {
435 args.push(format!("--opentelemetry-endpoint={endpoint}"));
436 for kv in opentelemetry_header {
437 args.push(format!(
438 "--opentelemetry-header={}={}",
439 kv.key,
440 kv.value
441 .to_str()
442 .expect("opentelemetry-header had non-ascii value"),
443 ));
444 }
445 args.push(format!(
446 "--opentelemetry-max-batch-queue-size={opentelemetry_max_batch_queue_size}",
447 ));
448 args.push(format!(
449 "--opentelemetry-max-export-batch-size={opentelemetry_max_export_batch_size}",
450 ));
451 args.push(format!(
452 "--opentelemetry-max-concurrent-exports={opentelemetry_max_concurrent_exports}",
453 ));
454 args.push(format!(
455 "--opentelemetry-sched-delay={}ms",
456 opentelemetry_sched_delay.as_millis(),
457 ));
458 args.push(format!(
459 "--opentelemetry-max-export-timeout={}ms",
460 opentelemetry_max_export_timeout.as_millis(),
461 ));
462 }
463 #[cfg(feature = "tokio-console")]
464 if let Some(tokio_console_listen_addr) = tokio_console_listen_addr {
465 args.push(format!(
466 "--tokio-console-listen-addr={}",
467 tokio_console_listen_addr,
468 ));
469 args.push(format!(
470 "--tokio-console-publish-interval={} us",
471 tokio_console_publish_interval.as_micros(),
472 ));
473 args.push(format!(
474 "--tokio-console-retention={} us",
475 tokio_console_retention.as_micros(),
476 ));
477 }
478 if let Some(dsn) = sentry_dsn {
479 args.push(format!("--sentry-dsn={dsn}"));
480 for kv in sentry_tag {
481 args.push(format!("--sentry-tag={}={}", kv.key, kv.value));
482 }
483 }
484 if let Some(environment) = sentry_environment {
485 args.push(format!("--sentry-environment={environment}"));
486 }
487
488 if opentelemetry_endpoint.is_some() || sentry_dsn.is_some() {
489 for kv in opentelemetry_resource {
490 args.push(format!("--opentelemetry-resource={}={}", kv.key, kv.value));
491 }
492 }
493
494 args
495 };
496 service_config.args = Box::new(args_fn);
497 #[cfg(feature = "tokio-console")]
498 if self.tracing_args.tokio_console_listen_addr.is_some() {
499 service_config.ports.push(ServicePort {
500 name: "tokio-console".into(),
501 port_hint: 6669,
502 });
503 }
504 self.inner.ensure_service(id, service_config)
505 }
506
507 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
508 self.inner.drop_service(id)
509 }
510
511 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
512 self.inner.list_services().await
513 }
514
515 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
516 self.inner.watch_services()
517 }
518
519 fn update_scheduling_config(
520 &self,
521 config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
522 ) {
523 self.inner.update_scheduling_config(config)
524 }
525}
526
527#[derive(Debug, Clone, Default, clap::ValueEnum)]
529pub enum LogFormat {
530 #[default]
534 Text,
535 Json,
539}
540
541impl fmt::Display for LogFormat {
542 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
543 match self {
544 LogFormat::Text => f.write_str("text"),
545 LogFormat::Json => f.write_str("json"),
546 }
547 }
548}