mz_orchestrator_tracing/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Service orchestration for tracing-aware services.
11
12use std::ffi::OsString;
13use std::fmt;
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use clap::{CommandFactory, FromArgMatches};
19use derivative::Derivative;
20use futures_core::stream::BoxStream;
21use http::header::{HeaderName, HeaderValue};
22use mz_build_info::BuildInfo;
23#[cfg(feature = "tokio-console")]
24use mz_orchestrator::ServicePort;
25use mz_orchestrator::{
26    NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
27    ServiceProcessMetrics,
28};
29use mz_ore::cli::KeyValueArg;
30use mz_ore::metrics::MetricsRegistry;
31#[cfg(feature = "tokio-console")]
32use mz_ore::netio::SocketAddr;
33#[cfg(feature = "tokio-console")]
34use mz_ore::tracing::TokioConsoleConfig;
35use mz_ore::tracing::{
36    OpenTelemetryConfig, SentryConfig, StderrLogConfig, StderrLogFormat, TracingConfig,
37    TracingGuard, TracingHandle,
38};
39use mz_tracing::CloneableEnvFilter;
40use opentelemetry::KeyValue;
41use opentelemetry_sdk::resource::Resource;
42
43/// Command line arguments for application tracing.
44///
45/// These arguments correspond directly to parameters in [`TracingConfig`], and
46/// this type can be directly converted into a `TracingConfig` via the supplied
47/// `From<TracingCliArgs>` implementation.
48///
49/// This logic is separated from `mz_ore::tracing` because the details of how
50/// these command-line arguments are parsed and unparsed is specific to
51/// orchestrators and does not belong in a foundational crate like `mz_ore`.
52#[derive(Derivative, Clone, clap::Parser)]
53#[derivative(Debug)]
54pub struct TracingCliArgs {
55    /// Which tracing events to log to stderr during startup, before the
56    /// real log filter is synced from LaunchDarkly.
57    ///
58    /// WARNING: you probably don't want to set this for `environmentd`. This
59    /// parameter only controls logging for the brief moment before the log
60    /// filter is synced from LaunchDarkly. You probably instead want to pass
61    /// `--system-parameter-default=log_filter=<filter>`, which will set the
62    /// default log filter to use unless overridden by the LaunchDarkly sync.
63    ///
64    /// This value is a comma-separated list of filter directives. Each filter
65    /// directive has the following format:
66    ///
67    /// ```text
68    /// [module::path=]level
69    /// ```
70    ///
71    /// A directive indicates that log messages from the specified module that
72    /// are at least as severe as the specified level should be emitted. If a
73    /// directive omits the module, then it implicitly applies to all modules.
74    /// When directives conflict, the last directive wins. If a log message does
75    /// not match any directive, it is not emitted.
76    ///
77    /// The module path of a log message reflects its location in Materialize's
78    /// source code. Choosing module paths for filter directives requires
79    /// familiarity with Materialize's codebase and is intended for advanced
80    /// users. Note that module paths change frequency from release to release.
81    ///
82    /// The valid levels for a log message are, in increasing order of severity:
83    /// trace, debug, info, warn, and error. The special level "off" may be used
84    /// in a directive to suppress all log messages, even errors.
85    ///
86    /// The default value for this option is "info".
87    #[clap(
88        long,
89        env = "STARTUP_LOG_FILTER",
90        value_name = "FILTER",
91        default_value = "info"
92    )]
93    pub startup_log_filter: CloneableEnvFilter,
94    /// The format to use for stderr log messages.
95    #[clap(long, env = "LOG_FORMAT", default_value_t, value_enum)]
96    pub log_format: LogFormat,
97    /// An optional prefix for each stderr log line.
98    ///
99    /// Only respected when `--log-format` is `text`.
100    #[clap(long, env = "LOG_PREFIX")]
101    pub log_prefix: Option<String>,
102    /// OpenTelemetry batch flag defaults are based on the
103    /// `BatchConfig::default()` in the opentelemetry_sdk crate.
104    /// <https://docs.rs/opentelemetry_sdk/0.21.2/opentelemetry_sdk/trace/struct.BatchConfig.html>
105    ///
106    /// The max number of tracing spans to queue before dropping.
107    #[clap(
108        long,
109        env = "OPENTELEMETRY_MAX_BATCH_QUEUE_SIZE",
110        default_value = "2048",
111        requires = "opentelemetry_endpoint"
112    )]
113    pub opentelemetry_max_batch_queue_size: usize,
114    /// The max number of spans to export in a single batch.
115    #[clap(
116        long,
117        env = "OPENTELEMETRY_MAX_EXPORT_BATCH_SIZE",
118        default_value = "512",
119        requires = "opentelemetry_endpoint"
120    )]
121    pub opentelemetry_max_export_batch_size: usize,
122    /// The max number of concurrent export tasks.
123    #[clap(
124        long,
125        env = "OPENTELEMETRY_MAX_CONCURRENT_EXPORTS",
126        default_value = "1",
127        requires = "opentelemetry_endpoint"
128    )]
129    pub opentelemetry_max_concurrent_exports: usize,
130    /// The delay between sequential sending of batches.
131    #[clap(
132        long,
133        env = "OPENTELEMETRY_SCHED_DELAY",
134        default_value = "5000ms",
135        requires = "opentelemetry_endpoint",
136        value_parser = humantime::parse_duration,
137    )]
138    pub opentelemetry_sched_delay: Duration,
139    /// The max time to attempt exporting a batch.
140    #[clap(
141        long,
142        env = "OPENTELEMETRY_MAX_EXPORT_TIMEOUT",
143        default_value = "30s",
144        requires = "opentelemetry_endpoint",
145        value_parser = humantime::parse_duration,
146    )]
147    pub opentelemetry_max_export_timeout: Duration,
148    /// Export OpenTelemetry tracing events to the provided endpoint.
149    ///
150    /// The specified endpoint should speak the OTLP/HTTP protocol. If the
151    /// backend requires authentication, you can pass authentication metadata
152    /// via the `--opentelemetry-header` option.
153    #[clap(long, env = "OPENTELEMETRY_ENDPOINT")]
154    pub opentelemetry_endpoint: Option<String>,
155    /// A header to pass with every request to the OpenTelemetry endpoint
156    /// specified by `--opentelemetry-endpoint` in the form `NAME=VALUE`.
157    ///
158    /// Requires that the `--opentelemetry-endpoint` option is specified.
159    /// To specify multiple headers, either specify this option multiple times,
160    /// or specify it once with multiple `NAME=VALUE` pairs separated by commas.
161    #[clap(
162        long,
163        env = "OPENTELEMETRY_HEADER",
164        requires = "opentelemetry_endpoint",
165        value_name = "NAME=VALUE",
166        use_value_delimiter = true
167    )]
168    pub opentelemetry_header: Vec<KeyValueArg<HeaderName, HeaderValue>>,
169    /// Which tracing events to export to the OpenTelemetry endpoint specified
170    /// by `--opentelemetry-endpoint`.
171    ///
172    /// The syntax of this option is the same as the syntax of the
173    /// `--startup-log-filter` option.
174    ///
175    /// Requires that the `--opentelemetry-endpoint` option is specified.
176    #[clap(
177        long,
178        env = "STARTUP_OPENTELEMETRY_FILTER",
179        requires = "opentelemetry_endpoint",
180        default_value = "info"
181    )]
182    pub startup_opentelemetry_filter: CloneableEnvFilter,
183    /// Additional key-value pairs to send with all opentelemetry traces.
184    /// Also used as Sentry tags.
185    ///
186    /// Requires that one of the `--opentelemetry-endpoint` or `--sentry-dsn`
187    /// options is specified.
188    #[clap(
189        long,
190        env = "OPENTELEMETRY_RESOURCE",
191        value_name = "NAME=VALUE",
192        use_value_delimiter = true
193    )]
194    pub opentelemetry_resource: Vec<KeyValueArg<String, String>>,
195    /// The address on which to listen for Tokio console connections.
196    ///
197    /// For details about Tokio console, see: <https://github.com/tokio-rs/console>
198    ///
199    /// Requires that the `--tokio-console` option is specified.
200    #[cfg(feature = "tokio-console")]
201    #[clap(long, env = "TOKIO_CONSOLE_LISTEN_ADDR")]
202    pub tokio_console_listen_addr: Option<SocketAddr>,
203    /// How frequently to publish updates to Tokio console clients.
204    ///
205    /// Requires that the `--tokio-console` option is specified.
206    #[cfg(feature = "tokio-console")]
207    #[clap(
208        long,
209        env = "TOKIO_CONSOLE_PUBLISH_INTERVAL",
210        requires = "tokio_console_listen_addr",
211        value_parser = humantime::parse_duration,
212        default_value = "1s",
213    )]
214    pub tokio_console_publish_interval: Duration,
215    /// How long Tokio console data is retained for completed tasks.
216    ///
217    /// Requires that the `--tokio-console` option is specified.
218    #[cfg(feature = "tokio-console")]
219    #[clap(
220        long,
221        env = "TOKIO_CONSOLE_RETENTION",
222        requires = "tokio_console_listen_addr",
223        value_parser = humantime::parse_duration,
224        default_value = "1h",
225    )]
226    pub tokio_console_retention: Duration,
227    /// Sentry data source to submit events and exceptions (e.g. panics) to.
228    #[clap(long, env = "SENTRY_DSN")]
229    pub sentry_dsn: Option<String>,
230    /// The environment name to report to Sentry.
231    ///
232    /// Ignored unless the `--sentry-dsn` option is specified.
233    ///
234    /// See: <https://docs.sentry.io/platforms/rust/configuration/options/#environment>
235    #[clap(long, env = "SENTRY_ENVIRONMENT")]
236    pub sentry_environment: Option<String>,
237    /// Tags to send with all Sentry events, in addition to the tags specified by
238    /// `--opentelemetry-resource`.
239    ///
240    /// Requires that the `--sentry-dsn` option is specified.
241    #[clap(
242        long,
243        env = "SENTRY_TAG",
244        value_name = "NAME=VALUE",
245        use_value_delimiter = true
246    )]
247    pub sentry_tag: Vec<KeyValueArg<String, String>>,
248    /// Test-only feature to enable tracing assertions.
249    #[cfg(feature = "capture")]
250    #[derivative(Debug = "ignore")]
251    #[clap(skip)]
252    pub capture: Option<tracing_capture::SharedStorage>,
253}
254
255impl Default for TracingCliArgs {
256    fn default() -> TracingCliArgs {
257        let matches = TracingCliArgs::command().get_matches_from::<_, OsString>([]);
258        TracingCliArgs::from_arg_matches(&matches)
259            .expect("no arguments produce valid TracingCliArgs")
260    }
261}
262
263impl TracingCliArgs {
264    pub async fn configure_tracing(
265        &self,
266        StaticTracingConfig {
267            service_name,
268            build_info,
269        }: StaticTracingConfig,
270        registry: MetricsRegistry,
271    ) -> Result<(TracingHandle, TracingGuard), anyhow::Error> {
272        mz_ore::tracing::configure(TracingConfig {
273            service_name,
274            stderr_log: StderrLogConfig {
275                format: match self.log_format {
276                    LogFormat::Text => StderrLogFormat::Text {
277                        prefix: self.log_prefix.clone(),
278                    },
279                    LogFormat::Json => StderrLogFormat::Json,
280                },
281                filter: self.startup_log_filter.clone().into(),
282            },
283            opentelemetry: self.opentelemetry_endpoint.clone().map(|endpoint| {
284                OpenTelemetryConfig {
285                    endpoint,
286                    headers: self
287                        .opentelemetry_header
288                        .iter()
289                        .map(|header| (header.key.clone(), header.value.clone()))
290                        .collect(),
291                    filter: self.startup_opentelemetry_filter.clone().into(),
292                    max_batch_queue_size: self.opentelemetry_max_batch_queue_size,
293                    max_export_batch_size: self.opentelemetry_max_export_batch_size,
294                    max_concurrent_exports: self.opentelemetry_max_concurrent_exports,
295                    batch_scheduled_delay: self.opentelemetry_sched_delay,
296                    max_export_timeout: self.opentelemetry_max_export_timeout,
297                    resource: Resource::new(
298                        self.opentelemetry_resource
299                            .iter()
300                            .cloned()
301                            .map(|kv| KeyValue::new(kv.key, kv.value)),
302                    ),
303                }
304            }),
305            #[cfg(feature = "tokio-console")]
306            tokio_console: self.tokio_console_listen_addr.clone().map(|listen_addr| {
307                TokioConsoleConfig {
308                    listen_addr,
309                    publish_interval: self.tokio_console_publish_interval,
310                    retention: self.tokio_console_retention,
311                }
312            }),
313            sentry: self.sentry_dsn.clone().map(|dsn| SentryConfig {
314                dsn,
315                environment: self.sentry_environment.clone(),
316                tags: self
317                    .opentelemetry_resource
318                    .iter()
319                    .cloned()
320                    .chain(self.sentry_tag.iter().cloned())
321                    .map(|kv| (kv.key, kv.value))
322                    .collect(),
323                event_filter: mz_service::tracing::mz_sentry_event_filter,
324            }),
325            build_version: build_info.version,
326            build_sha: build_info.sha,
327            registry,
328            #[cfg(feature = "capture")]
329            capture: self.capture.clone(),
330        })
331        .await
332    }
333}
334
335/// The fields of [`TracingConfig`] that are not set by command-line arguments.
336pub struct StaticTracingConfig {
337    /// See [`TracingConfig::service_name`].
338    pub service_name: &'static str,
339    /// The build information for this service.
340    pub build_info: BuildInfo,
341}
342
343/// Wraps an [`Orchestrator`] to inject tracing into all created services.
344#[derive(Debug)]
345pub struct TracingOrchestrator {
346    inner: Arc<dyn Orchestrator>,
347    tracing_args: TracingCliArgs,
348}
349
350impl TracingOrchestrator {
351    /// Constructs a new tracing orchestrator.
352    ///
353    /// The orchestrator wraps the provided `inner` orchestrator. It mutates
354    /// [`ServiceConfig`]s to inject the tracing configuration specified by
355    /// `tracing_args`.
356    ///
357    /// All services created by the orchestrator **must** embed the
358    /// [`TracingCliArgs`] in their command-line argument parser.
359    pub fn new(inner: Arc<dyn Orchestrator>, tracing_args: TracingCliArgs) -> TracingOrchestrator {
360        TracingOrchestrator {
361            inner,
362            tracing_args,
363        }
364    }
365}
366
367impl Orchestrator for TracingOrchestrator {
368    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
369        Arc::new(NamespacedTracingOrchestrator {
370            namespace: namespace.to_string(),
371            inner: self.inner.namespace(namespace),
372            tracing_args: self.tracing_args.clone(),
373        })
374    }
375}
376
377#[derive(Debug)]
378struct NamespacedTracingOrchestrator {
379    namespace: String,
380    inner: Arc<dyn NamespacedOrchestrator>,
381    tracing_args: TracingCliArgs,
382}
383
384#[async_trait]
385impl NamespacedOrchestrator for NamespacedTracingOrchestrator {
386    async fn fetch_service_metrics(
387        &self,
388        id: &str,
389    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
390        self.inner.fetch_service_metrics(id).await
391    }
392
393    fn ensure_service(
394        &self,
395        id: &str,
396        mut service_config: ServiceConfig,
397    ) -> Result<Box<dyn Service>, anyhow::Error> {
398        let tracing_args = self.tracing_args.clone();
399        let log_prefix_arg = format!("{}-{}", self.namespace, id);
400        let args_fn = move |assigned: ServiceAssignments| {
401            #[cfg(feature = "tokio-console")]
402            let tokio_console_listen_addr = assigned.listen_addrs.get("tokio-console");
403            let mut args = (service_config.args)(assigned);
404            let TracingCliArgs {
405                startup_log_filter,
406                log_prefix,
407                log_format,
408                opentelemetry_max_batch_queue_size,
409                opentelemetry_max_export_batch_size,
410                opentelemetry_max_concurrent_exports,
411                opentelemetry_sched_delay,
412                opentelemetry_max_export_timeout,
413                opentelemetry_endpoint,
414                opentelemetry_header,
415                startup_opentelemetry_filter: _,
416                opentelemetry_resource,
417                #[cfg(feature = "tokio-console")]
418                    tokio_console_listen_addr: _,
419                #[cfg(feature = "tokio-console")]
420                tokio_console_publish_interval,
421                #[cfg(feature = "tokio-console")]
422                tokio_console_retention,
423                sentry_dsn,
424                sentry_environment,
425                sentry_tag,
426                #[cfg(feature = "capture")]
427                    capture: _,
428            } = &tracing_args;
429            args.push(format!("--startup-log-filter={startup_log_filter}"));
430            args.push(format!("--log-format={log_format}"));
431            if log_prefix.is_some() {
432                args.push(format!("--log-prefix={log_prefix_arg}"));
433            }
434            if let Some(endpoint) = opentelemetry_endpoint {
435                args.push(format!("--opentelemetry-endpoint={endpoint}"));
436                for kv in opentelemetry_header {
437                    args.push(format!(
438                        "--opentelemetry-header={}={}",
439                        kv.key,
440                        kv.value
441                            .to_str()
442                            .expect("opentelemetry-header had non-ascii value"),
443                    ));
444                }
445                args.push(format!(
446                    "--opentelemetry-max-batch-queue-size={opentelemetry_max_batch_queue_size}",
447                ));
448                args.push(format!(
449                    "--opentelemetry-max-export-batch-size={opentelemetry_max_export_batch_size}",
450                ));
451                args.push(format!(
452                    "--opentelemetry-max-concurrent-exports={opentelemetry_max_concurrent_exports}",
453                ));
454                args.push(format!(
455                    "--opentelemetry-sched-delay={}ms",
456                    opentelemetry_sched_delay.as_millis(),
457                ));
458                args.push(format!(
459                    "--opentelemetry-max-export-timeout={}ms",
460                    opentelemetry_max_export_timeout.as_millis(),
461                ));
462            }
463            #[cfg(feature = "tokio-console")]
464            if let Some(tokio_console_listen_addr) = tokio_console_listen_addr {
465                args.push(format!(
466                    "--tokio-console-listen-addr={}",
467                    tokio_console_listen_addr,
468                ));
469                args.push(format!(
470                    "--tokio-console-publish-interval={} us",
471                    tokio_console_publish_interval.as_micros(),
472                ));
473                args.push(format!(
474                    "--tokio-console-retention={} us",
475                    tokio_console_retention.as_micros(),
476                ));
477            }
478            if let Some(dsn) = sentry_dsn {
479                args.push(format!("--sentry-dsn={dsn}"));
480                for kv in sentry_tag {
481                    args.push(format!("--sentry-tag={}={}", kv.key, kv.value));
482                }
483            }
484            if let Some(environment) = sentry_environment {
485                args.push(format!("--sentry-environment={environment}"));
486            }
487
488            if opentelemetry_endpoint.is_some() || sentry_dsn.is_some() {
489                for kv in opentelemetry_resource {
490                    args.push(format!("--opentelemetry-resource={}={}", kv.key, kv.value));
491                }
492            }
493
494            args
495        };
496        service_config.args = Box::new(args_fn);
497        #[cfg(feature = "tokio-console")]
498        if self.tracing_args.tokio_console_listen_addr.is_some() {
499            service_config.ports.push(ServicePort {
500                name: "tokio-console".into(),
501                port_hint: 6669,
502            });
503        }
504        self.inner.ensure_service(id, service_config)
505    }
506
507    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
508        self.inner.drop_service(id)
509    }
510
511    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
512        self.inner.list_services().await
513    }
514
515    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
516        self.inner.watch_services()
517    }
518
519    fn update_scheduling_config(
520        &self,
521        config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
522    ) {
523        self.inner.update_scheduling_config(config)
524    }
525}
526
527/// Specifies the format of a stderr log message.
528#[derive(Debug, Clone, Default, clap::ValueEnum)]
529pub enum LogFormat {
530    /// Format as human readable, optionally colored text.
531    ///
532    /// Best suited for direct human consumption in a terminal.
533    #[default]
534    Text,
535    /// Format as JSON (in reality, JSONL).
536    ///
537    /// Best suited for ingestion in structured logging aggregators.
538    Json,
539}
540
541impl fmt::Display for LogFormat {
542    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
543        match self {
544            LogFormat::Text => f.write_str("text"),
545            LogFormat::Json => f.write_str("json"),
546        }
547    }
548}