mz_orchestrator_tracing/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Service orchestration for tracing-aware services.
11
12use std::collections::BTreeMap;
13use std::ffi::OsString;
14use std::fmt;
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use clap::{CommandFactory, FromArgMatches};
20use derivative::Derivative;
21use futures_core::stream::BoxStream;
22use http::header::{HeaderName, HeaderValue};
23use mz_build_info::BuildInfo;
24#[cfg(feature = "tokio-console")]
25use mz_orchestrator::ServicePort;
26use mz_orchestrator::{
27    NamespacedOrchestrator, Orchestrator, Service, ServiceConfig, ServiceEvent,
28    ServiceProcessMetrics,
29};
30use mz_ore::cli::KeyValueArg;
31use mz_ore::metrics::MetricsRegistry;
32#[cfg(feature = "tokio-console")]
33use mz_ore::netio::SocketAddr;
34#[cfg(feature = "tokio-console")]
35use mz_ore::tracing::TokioConsoleConfig;
36use mz_ore::tracing::{
37    OpenTelemetryConfig, SentryConfig, StderrLogConfig, StderrLogFormat, TracingConfig,
38    TracingGuard, TracingHandle,
39};
40use mz_tracing::CloneableEnvFilter;
41use opentelemetry::KeyValue;
42use opentelemetry_sdk::resource::Resource;
43
44/// Command line arguments for application tracing.
45///
46/// These arguments correspond directly to parameters in [`TracingConfig`], and
47/// this type can be directly converted into a `TracingConfig` via the supplied
48/// `From<TracingCliArgs>` implementation.
49///
50/// This logic is separated from `mz_ore::tracing` because the details of how
51/// these command-line arguments are parsed and unparsed is specific to
52/// orchestrators and does not belong in a foundational crate like `mz_ore`.
53#[derive(Derivative, Clone, clap::Parser)]
54#[derivative(Debug)]
55pub struct TracingCliArgs {
56    /// Which tracing events to log to stderr during startup, before the
57    /// real log filter is synced from LaunchDarkly.
58    ///
59    /// WARNING: you probably don't want to set this for `environmentd`. This
60    /// parameter only controls logging for the brief moment before the log
61    /// filter is synced from LaunchDarkly. You probably instead want to pass
62    /// `--system-parameter-default=log_filter=<filter>`, which will set the
63    /// default log filter to use unless overridden by the LaunchDarkly sync.
64    ///
65    /// This value is a comma-separated list of filter directives. Each filter
66    /// directive has the following format:
67    ///
68    /// ```text
69    /// [module::path=]level
70    /// ```
71    ///
72    /// A directive indicates that log messages from the specified module that
73    /// are at least as severe as the specified level should be emitted. If a
74    /// directive omits the module, then it implicitly applies to all modules.
75    /// When directives conflict, the last directive wins. If a log message does
76    /// not match any directive, it is not emitted.
77    ///
78    /// The module path of a log message reflects its location in Materialize's
79    /// source code. Choosing module paths for filter directives requires
80    /// familiarity with Materialize's codebase and is intended for advanced
81    /// users. Note that module paths change frequency from release to release.
82    ///
83    /// The valid levels for a log message are, in increasing order of severity:
84    /// trace, debug, info, warn, and error. The special level "off" may be used
85    /// in a directive to suppress all log messages, even errors.
86    ///
87    /// The default value for this option is "info".
88    #[clap(
89        long,
90        env = "STARTUP_LOG_FILTER",
91        value_name = "FILTER",
92        default_value = "info"
93    )]
94    pub startup_log_filter: CloneableEnvFilter,
95    /// The format to use for stderr log messages.
96    #[clap(long, env = "LOG_FORMAT", default_value_t, value_enum)]
97    pub log_format: LogFormat,
98    /// An optional prefix for each stderr log line.
99    ///
100    /// Only respected when `--log-format` is `text`.
101    #[clap(long, env = "LOG_PREFIX")]
102    pub log_prefix: Option<String>,
103    /// OpenTelemetry batch flag defaults are based on the
104    /// `BatchConfig::default()` in the opentelemetry_sdk crate.
105    /// <https://docs.rs/opentelemetry_sdk/0.21.2/opentelemetry_sdk/trace/struct.BatchConfig.html>
106    ///
107    /// The max number of tracing spans to queue before dropping.
108    #[clap(
109        long,
110        env = "OPENTELEMETRY_MAX_BATCH_QUEUE_SIZE",
111        default_value = "2048",
112        requires = "opentelemetry_endpoint"
113    )]
114    pub opentelemetry_max_batch_queue_size: usize,
115    /// The max number of spans to export in a single batch.
116    #[clap(
117        long,
118        env = "OPENTELEMETRY_MAX_EXPORT_BATCH_SIZE",
119        default_value = "512",
120        requires = "opentelemetry_endpoint"
121    )]
122    pub opentelemetry_max_export_batch_size: usize,
123    /// The max number of concurrent export tasks.
124    #[clap(
125        long,
126        env = "OPENTELEMETRY_MAX_CONCURRENT_EXPORTS",
127        default_value = "1",
128        requires = "opentelemetry_endpoint"
129    )]
130    pub opentelemetry_max_concurrent_exports: usize,
131    /// The delay between sequential sending of batches.
132    #[clap(
133        long,
134        env = "OPENTELEMETRY_SCHED_DELAY",
135        default_value = "5000ms",
136        requires = "opentelemetry_endpoint",
137        value_parser = humantime::parse_duration,
138    )]
139    pub opentelemetry_sched_delay: Duration,
140    /// The max time to attempt exporting a batch.
141    #[clap(
142        long,
143        env = "OPENTELEMETRY_MAX_EXPORT_TIMEOUT",
144        default_value = "30s",
145        requires = "opentelemetry_endpoint",
146        value_parser = humantime::parse_duration,
147    )]
148    pub opentelemetry_max_export_timeout: Duration,
149    /// Export OpenTelemetry tracing events to the provided endpoint.
150    ///
151    /// The specified endpoint should speak the OTLP/HTTP protocol. If the
152    /// backend requires authentication, you can pass authentication metadata
153    /// via the `--opentelemetry-header` option.
154    #[clap(long, env = "OPENTELEMETRY_ENDPOINT")]
155    pub opentelemetry_endpoint: Option<String>,
156    /// A header to pass with every request to the OpenTelemetry endpoint
157    /// specified by `--opentelemetry-endpoint` in the form `NAME=VALUE`.
158    ///
159    /// Requires that the `--opentelemetry-endpoint` option is specified.
160    /// To specify multiple headers, either specify this option multiple times,
161    /// or specify it once with multiple `NAME=VALUE` pairs separated by commas.
162    #[clap(
163        long,
164        env = "OPENTELEMETRY_HEADER",
165        requires = "opentelemetry_endpoint",
166        value_name = "NAME=VALUE",
167        use_value_delimiter = true
168    )]
169    pub opentelemetry_header: Vec<KeyValueArg<HeaderName, HeaderValue>>,
170    /// Which tracing events to export to the OpenTelemetry endpoint specified
171    /// by `--opentelemetry-endpoint`.
172    ///
173    /// The syntax of this option is the same as the syntax of the
174    /// `--startup-log-filter` option.
175    ///
176    /// Requires that the `--opentelemetry-endpoint` option is specified.
177    #[clap(
178        long,
179        env = "STARTUP_OPENTELEMETRY_FILTER",
180        requires = "opentelemetry_endpoint",
181        default_value = "info"
182    )]
183    pub startup_opentelemetry_filter: CloneableEnvFilter,
184    /// Additional key-value pairs to send with all opentelemetry traces.
185    /// Also used as Sentry tags.
186    ///
187    /// Requires that one of the `--opentelemetry-endpoint` or `--sentry-dsn`
188    /// options is specified.
189    #[clap(
190        long,
191        env = "OPENTELEMETRY_RESOURCE",
192        value_name = "NAME=VALUE",
193        use_value_delimiter = true
194    )]
195    pub opentelemetry_resource: Vec<KeyValueArg<String, String>>,
196    /// The address on which to listen for Tokio console connections.
197    ///
198    /// For details about Tokio console, see: <https://github.com/tokio-rs/console>
199    ///
200    /// Requires that the `--tokio-console` option is specified.
201    #[cfg(feature = "tokio-console")]
202    #[clap(long, env = "TOKIO_CONSOLE_LISTEN_ADDR")]
203    pub tokio_console_listen_addr: Option<SocketAddr>,
204    /// How frequently to publish updates to Tokio console clients.
205    ///
206    /// Requires that the `--tokio-console` option is specified.
207    #[cfg(feature = "tokio-console")]
208    #[clap(
209        long,
210        env = "TOKIO_CONSOLE_PUBLISH_INTERVAL",
211        requires = "tokio_console_listen_addr",
212        value_parser = humantime::parse_duration,
213        default_value = "1s",
214    )]
215    pub tokio_console_publish_interval: Duration,
216    /// How long Tokio console data is retained for completed tasks.
217    ///
218    /// Requires that the `--tokio-console` option is specified.
219    #[cfg(feature = "tokio-console")]
220    #[clap(
221        long,
222        env = "TOKIO_CONSOLE_RETENTION",
223        requires = "tokio_console_listen_addr",
224        value_parser = humantime::parse_duration,
225        default_value = "1h",
226    )]
227    pub tokio_console_retention: Duration,
228    /// Sentry data source to submit events and exceptions (e.g. panics) to.
229    #[clap(long, env = "SENTRY_DSN")]
230    pub sentry_dsn: Option<String>,
231    /// The environment name to report to Sentry.
232    ///
233    /// Ignored unless the `--sentry-dsn` option is specified.
234    ///
235    /// See: <https://docs.sentry.io/platforms/rust/configuration/options/#environment>
236    #[clap(long, env = "SENTRY_ENVIRONMENT")]
237    pub sentry_environment: Option<String>,
238    /// Tags to send with all Sentry events, in addition to the tags specified by
239    /// `--opentelemetry-resource`.
240    ///
241    /// Requires that the `--sentry-dsn` option is specified.
242    #[clap(
243        long,
244        env = "SENTRY_TAG",
245        value_name = "NAME=VALUE",
246        use_value_delimiter = true
247    )]
248    pub sentry_tag: Vec<KeyValueArg<String, String>>,
249    /// Test-only feature to enable tracing assertions.
250    #[cfg(feature = "capture")]
251    #[derivative(Debug = "ignore")]
252    #[clap(skip)]
253    pub capture: Option<tracing_capture::SharedStorage>,
254}
255
256impl Default for TracingCliArgs {
257    fn default() -> TracingCliArgs {
258        let matches = TracingCliArgs::command().get_matches_from::<_, OsString>([]);
259        TracingCliArgs::from_arg_matches(&matches)
260            .expect("no arguments produce valid TracingCliArgs")
261    }
262}
263
264impl TracingCliArgs {
265    pub async fn configure_tracing(
266        &self,
267        StaticTracingConfig {
268            service_name,
269            build_info,
270        }: StaticTracingConfig,
271        registry: MetricsRegistry,
272    ) -> Result<(TracingHandle, TracingGuard), anyhow::Error> {
273        mz_ore::tracing::configure(TracingConfig {
274            service_name,
275            stderr_log: StderrLogConfig {
276                format: match self.log_format {
277                    LogFormat::Text => StderrLogFormat::Text {
278                        prefix: self.log_prefix.clone(),
279                    },
280                    LogFormat::Json => StderrLogFormat::Json,
281                },
282                filter: self.startup_log_filter.clone().into(),
283            },
284            opentelemetry: self.opentelemetry_endpoint.clone().map(|endpoint| {
285                OpenTelemetryConfig {
286                    endpoint,
287                    headers: self
288                        .opentelemetry_header
289                        .iter()
290                        .map(|header| (header.key.clone(), header.value.clone()))
291                        .collect(),
292                    filter: self.startup_opentelemetry_filter.clone().into(),
293                    max_batch_queue_size: self.opentelemetry_max_batch_queue_size,
294                    max_export_batch_size: self.opentelemetry_max_export_batch_size,
295                    max_concurrent_exports: self.opentelemetry_max_concurrent_exports,
296                    batch_scheduled_delay: self.opentelemetry_sched_delay,
297                    max_export_timeout: self.opentelemetry_max_export_timeout,
298                    resource: Resource::new(
299                        self.opentelemetry_resource
300                            .iter()
301                            .cloned()
302                            .map(|kv| KeyValue::new(kv.key, kv.value)),
303                    ),
304                }
305            }),
306            #[cfg(feature = "tokio-console")]
307            tokio_console: self.tokio_console_listen_addr.clone().map(|listen_addr| {
308                TokioConsoleConfig {
309                    listen_addr,
310                    publish_interval: self.tokio_console_publish_interval,
311                    retention: self.tokio_console_retention,
312                }
313            }),
314            sentry: self.sentry_dsn.clone().map(|dsn| SentryConfig {
315                dsn,
316                environment: self.sentry_environment.clone(),
317                tags: self
318                    .opentelemetry_resource
319                    .iter()
320                    .cloned()
321                    .chain(self.sentry_tag.iter().cloned())
322                    .map(|kv| (kv.key, kv.value))
323                    .collect(),
324                event_filter: mz_service::tracing::mz_sentry_event_filter,
325            }),
326            build_version: build_info.version,
327            build_sha: build_info.sha,
328            registry,
329            #[cfg(feature = "capture")]
330            capture: self.capture.clone(),
331        })
332        .await
333    }
334}
335
336/// The fields of [`TracingConfig`] that are not set by command-line arguments.
337pub struct StaticTracingConfig {
338    /// See [`TracingConfig::service_name`].
339    pub service_name: &'static str,
340    /// The build information for this service.
341    pub build_info: BuildInfo,
342}
343
344/// Wraps an [`Orchestrator`] to inject tracing into all created services.
345#[derive(Debug)]
346pub struct TracingOrchestrator {
347    inner: Arc<dyn Orchestrator>,
348    tracing_args: TracingCliArgs,
349}
350
351impl TracingOrchestrator {
352    /// Constructs a new tracing orchestrator.
353    ///
354    /// The orchestrator wraps the provided `inner` orchestrator. It mutates
355    /// [`ServiceConfig`]s to inject the tracing configuration specified by
356    /// `tracing_args`.
357    ///
358    /// All services created by the orchestrator **must** embed the
359    /// [`TracingCliArgs`] in their command-line argument parser.
360    pub fn new(inner: Arc<dyn Orchestrator>, tracing_args: TracingCliArgs) -> TracingOrchestrator {
361        TracingOrchestrator {
362            inner,
363            tracing_args,
364        }
365    }
366}
367
368impl Orchestrator for TracingOrchestrator {
369    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
370        Arc::new(NamespacedTracingOrchestrator {
371            namespace: namespace.to_string(),
372            inner: self.inner.namespace(namespace),
373            tracing_args: self.tracing_args.clone(),
374        })
375    }
376}
377
378#[derive(Debug)]
379struct NamespacedTracingOrchestrator {
380    namespace: String,
381    inner: Arc<dyn NamespacedOrchestrator>,
382    tracing_args: TracingCliArgs,
383}
384
385#[async_trait]
386impl NamespacedOrchestrator for NamespacedTracingOrchestrator {
387    async fn fetch_service_metrics(
388        &self,
389        id: &str,
390    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
391        self.inner.fetch_service_metrics(id).await
392    }
393
394    fn ensure_service(
395        &self,
396        id: &str,
397        mut service_config: ServiceConfig,
398    ) -> Result<Box<dyn Service>, anyhow::Error> {
399        let tracing_args = self.tracing_args.clone();
400        let log_prefix_arg = format!("{}-{}", self.namespace, id);
401        let args_fn = move |listen_addrs: &BTreeMap<String, String>| {
402            #[cfg(feature = "tokio-console")]
403            let tokio_console_listen_addr = listen_addrs.get("tokio-console");
404            let mut args = (service_config.args)(listen_addrs);
405            let TracingCliArgs {
406                startup_log_filter,
407                log_prefix,
408                log_format,
409                opentelemetry_max_batch_queue_size,
410                opentelemetry_max_export_batch_size,
411                opentelemetry_max_concurrent_exports,
412                opentelemetry_sched_delay,
413                opentelemetry_max_export_timeout,
414                opentelemetry_endpoint,
415                opentelemetry_header,
416                startup_opentelemetry_filter: _,
417                opentelemetry_resource,
418                #[cfg(feature = "tokio-console")]
419                    tokio_console_listen_addr: _,
420                #[cfg(feature = "tokio-console")]
421                tokio_console_publish_interval,
422                #[cfg(feature = "tokio-console")]
423                tokio_console_retention,
424                sentry_dsn,
425                sentry_environment,
426                sentry_tag,
427                #[cfg(feature = "capture")]
428                    capture: _,
429            } = &tracing_args;
430            args.push(format!("--startup-log-filter={startup_log_filter}"));
431            args.push(format!("--log-format={log_format}"));
432            if log_prefix.is_some() {
433                args.push(format!("--log-prefix={log_prefix_arg}"));
434            }
435            if let Some(endpoint) = opentelemetry_endpoint {
436                args.push(format!("--opentelemetry-endpoint={endpoint}"));
437                for kv in opentelemetry_header {
438                    args.push(format!(
439                        "--opentelemetry-header={}={}",
440                        kv.key,
441                        kv.value
442                            .to_str()
443                            .expect("opentelemetry-header had non-ascii value"),
444                    ));
445                }
446                args.push(format!(
447                    "--opentelemetry-max-batch-queue-size={opentelemetry_max_batch_queue_size}",
448                ));
449                args.push(format!(
450                    "--opentelemetry-max-export-batch-size={opentelemetry_max_export_batch_size}",
451                ));
452                args.push(format!(
453                    "--opentelemetry-max-concurrent-exports={opentelemetry_max_concurrent_exports}",
454                ));
455                args.push(format!(
456                    "--opentelemetry-sched-delay={}ms",
457                    opentelemetry_sched_delay.as_millis(),
458                ));
459                args.push(format!(
460                    "--opentelemetry-max-export-timeout={}ms",
461                    opentelemetry_max_export_timeout.as_millis(),
462                ));
463            }
464            #[cfg(feature = "tokio-console")]
465            if let Some(tokio_console_listen_addr) = tokio_console_listen_addr {
466                args.push(format!(
467                    "--tokio-console-listen-addr={}",
468                    tokio_console_listen_addr,
469                ));
470                args.push(format!(
471                    "--tokio-console-publish-interval={} us",
472                    tokio_console_publish_interval.as_micros(),
473                ));
474                args.push(format!(
475                    "--tokio-console-retention={} us",
476                    tokio_console_retention.as_micros(),
477                ));
478            }
479            if let Some(dsn) = sentry_dsn {
480                args.push(format!("--sentry-dsn={dsn}"));
481                for kv in sentry_tag {
482                    args.push(format!("--sentry-tag={}={}", kv.key, kv.value));
483                }
484            }
485            if let Some(environment) = sentry_environment {
486                args.push(format!("--sentry-environment={environment}"));
487            }
488
489            if opentelemetry_endpoint.is_some() || sentry_dsn.is_some() {
490                for kv in opentelemetry_resource {
491                    args.push(format!("--opentelemetry-resource={}={}", kv.key, kv.value));
492                }
493            }
494
495            args
496        };
497        service_config.args = Box::new(args_fn);
498        #[cfg(feature = "tokio-console")]
499        if self.tracing_args.tokio_console_listen_addr.is_some() {
500            service_config.ports.push(ServicePort {
501                name: "tokio-console".into(),
502                port_hint: 6669,
503            });
504        }
505        self.inner.ensure_service(id, service_config)
506    }
507
508    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
509        self.inner.drop_service(id)
510    }
511
512    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
513        self.inner.list_services().await
514    }
515
516    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
517        self.inner.watch_services()
518    }
519
520    fn update_scheduling_config(
521        &self,
522        config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
523    ) {
524        self.inner.update_scheduling_config(config)
525    }
526}
527
528/// Specifies the format of a stderr log message.
529#[derive(Debug, Clone, Default, clap::ValueEnum)]
530pub enum LogFormat {
531    /// Format as human readable, optionally colored text.
532    ///
533    /// Best suited for direct human consumption in a terminal.
534    #[default]
535    Text,
536    /// Format as JSON (in reality, JSONL).
537    ///
538    /// Best suited for ingestion in structured logging aggregators.
539    Json,
540}
541
542impl fmt::Display for LogFormat {
543    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544        match self {
545            LogFormat::Text => f.write_str("text"),
546            LogFormat::Json => f.write_str("json"),
547        }
548    }
549}