#![allow(unknown_lints)]
#![allow(clippy::style)]
#![allow(clippy::complexity)]
#![allow(clippy::large_enum_variant)]
#![allow(clippy::mutable_key_type)]
#![allow(clippy::stable_sort_primitive)]
#![allow(clippy::map_entry)]
#![allow(clippy::box_default)]
#![allow(clippy::drain_collect)]
#![warn(clippy::bool_comparison)]
#![warn(clippy::clone_on_ref_ptr)]
#![warn(clippy::no_effect)]
#![warn(clippy::unnecessary_unwrap)]
#![warn(clippy::dbg_macro)]
#![warn(clippy::todo)]
#![warn(clippy::wildcard_dependencies)]
#![warn(clippy::zero_prefixed_literal)]
#![warn(clippy::borrowed_box)]
#![warn(clippy::deref_addrof)]
#![warn(clippy::double_must_use)]
#![warn(clippy::double_parens)]
#![warn(clippy::extra_unused_lifetimes)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_question_mark)]
#![warn(clippy::needless_return)]
#![warn(clippy::redundant_pattern)]
#![warn(clippy::redundant_slicing)]
#![warn(clippy::redundant_static_lifetimes)]
#![warn(clippy::single_component_path_imports)]
#![warn(clippy::unnecessary_cast)]
#![warn(clippy::useless_asref)]
#![warn(clippy::useless_conversion)]
#![warn(clippy::builtin_type_shadow)]
#![warn(clippy::duplicate_underscore_argument)]
#![warn(clippy::double_neg)]
#![warn(clippy::unnecessary_mut_passed)]
#![warn(clippy::wildcard_in_or_patterns)]
#![warn(clippy::crosspointer_transmute)]
#![warn(clippy::excessive_precision)]
#![warn(clippy::overflow_check_conditional)]
#![warn(clippy::as_conversions)]
#![warn(clippy::match_overlapping_arm)]
#![warn(clippy::zero_divided_by_zero)]
#![warn(clippy::must_use_unit)]
#![warn(clippy::suspicious_assignment_formatting)]
#![warn(clippy::suspicious_else_formatting)]
#![warn(clippy::suspicious_unary_op_formatting)]
#![warn(clippy::mut_mutex_lock)]
#![warn(clippy::print_literal)]
#![warn(clippy::same_item_push)]
#![warn(clippy::useless_format)]
#![warn(clippy::write_literal)]
#![warn(clippy::redundant_closure)]
#![warn(clippy::redundant_closure_call)]
#![warn(clippy::unnecessary_lazy_evaluations)]
#![warn(clippy::partialeq_ne_impl)]
#![warn(clippy::redundant_field_names)]
#![warn(clippy::transmutes_expressible_as_ptr_casts)]
#![warn(clippy::unused_async)]
#![warn(clippy::disallowed_methods)]
#![warn(clippy::disallowed_macros)]
#![warn(clippy::disallowed_types)]
#![warn(clippy::from_over_into)]
use std::collections::BTreeMap;
use std::ffi::OsString;
use std::fmt;
use std::sync::Arc;
#[cfg(feature = "tokio-console")]
use std::time::Duration;
use async_trait::async_trait;
use clap::{FromArgMatches, IntoApp};
use futures_core::stream::BoxStream;
use http::header::{HeaderName, HeaderValue};
use mz_build_info::BuildInfo;
#[cfg(feature = "tokio-console")]
use mz_orchestrator::ServicePort;
use mz_orchestrator::{
NamespacedOrchestrator, Orchestrator, Service, ServiceConfig, ServiceEvent,
ServiceProcessMetrics,
};
use mz_ore::cli::KeyValueArg;
use mz_ore::metrics::MetricsRegistry;
#[cfg(feature = "tokio-console")]
use mz_ore::netio::SocketAddr;
#[cfg(feature = "tokio-console")]
use mz_ore::tracing::TokioConsoleConfig;
use mz_ore::tracing::{
OpenTelemetryConfig, SentryConfig, StderrLogConfig, StderrLogFormat, TracingConfig,
TracingGuard, TracingHandle,
};
use mz_tracing::CloneableEnvFilter;
use opentelemetry::sdk::resource::Resource;
use opentelemetry::KeyValue;
#[derive(Debug, Clone, clap::Parser)]
pub struct TracingCliArgs {
#[clap(
long,
env = "STARTUP_LOG_FILTER",
value_name = "FILTER",
default_value = "info"
)]
pub startup_log_filter: CloneableEnvFilter,
#[clap(long, env = "LOG_FORMAT", default_value_t, value_enum)]
pub log_format: LogFormat,
#[clap(long, env = "LOG_PREFIX")]
pub log_prefix: Option<String>,
#[clap(long, env = "OPENTELEMETRY_ENDPOINT")]
pub opentelemetry_endpoint: Option<String>,
#[clap(
long,
env = "OPENTELEMETRY_HEADER",
requires = "opentelemetry-endpoint",
value_name = "NAME=VALUE",
use_value_delimiter = true
)]
pub opentelemetry_header: Vec<KeyValueArg<HeaderName, HeaderValue>>,
#[clap(
long,
env = "STARTUP_OPENTELEMETRY_FILTER",
requires = "opentelemetry-endpoint",
default_value = "off"
)]
pub startup_opentelemetry_filter: CloneableEnvFilter,
#[clap(
long,
env = "OPENTELEMETRY_RESOURCE",
value_name = "NAME=VALUE",
use_value_delimiter = true
)]
pub opentelemetry_resource: Vec<KeyValueArg<String, String>>,
#[cfg(feature = "tokio-console")]
#[clap(long, env = "TOKIO_CONSOLE_LISTEN_ADDR")]
pub tokio_console_listen_addr: Option<SocketAddr>,
#[cfg(feature = "tokio-console")]
#[clap(
long,
env = "TOKIO_CONSOLE_PUBLISH_INTERVAL",
requires = "tokio-console-listen-addr",
parse(try_from_str = humantime::parse_duration),
default_value = "1s",
)]
pub tokio_console_publish_interval: Duration,
#[cfg(feature = "tokio-console")]
#[clap(
long,
env = "TOKIO_CONSOLE_RETENTION",
requires = "tokio-console-listen-addr",
parse(try_from_str = humantime::parse_duration),
default_value = "1h",
)]
pub tokio_console_retention: Duration,
#[clap(long, env = "SENTRY_DSN")]
pub sentry_dsn: Option<String>,
#[clap(long, env = "SENTRY_ENVIRONMENT")]
pub sentry_environment: Option<String>,
}
impl Default for TracingCliArgs {
fn default() -> TracingCliArgs {
let matches = TracingCliArgs::command().get_matches_from::<_, OsString>([]);
TracingCliArgs::from_arg_matches(&matches)
.expect("no arguments produce valid TracingCliArgs")
}
}
impl TracingCliArgs {
pub async fn configure_tracing(
&self,
StaticTracingConfig {
service_name,
build_info,
}: StaticTracingConfig,
registry: MetricsRegistry,
) -> Result<(TracingHandle, TracingGuard), anyhow::Error> {
mz_ore::tracing::configure(TracingConfig {
service_name,
stderr_log: StderrLogConfig {
format: match self.log_format {
LogFormat::Text => StderrLogFormat::Text {
prefix: self.log_prefix.clone(),
},
LogFormat::Json => StderrLogFormat::Json,
},
filter: self.startup_log_filter.clone().into(),
},
opentelemetry: self.opentelemetry_endpoint.clone().map(|endpoint| {
OpenTelemetryConfig {
endpoint,
headers: self
.opentelemetry_header
.iter()
.map(|header| (header.key.clone(), header.value.clone()))
.collect(),
filter: self.startup_opentelemetry_filter.clone().into(),
resource: Resource::new(
self.opentelemetry_resource
.iter()
.cloned()
.map(|kv| KeyValue::new(kv.key, kv.value)),
),
}
}),
#[cfg(feature = "tokio-console")]
tokio_console: self.tokio_console_listen_addr.clone().map(|listen_addr| {
TokioConsoleConfig {
listen_addr,
publish_interval: self.tokio_console_publish_interval,
retention: self.tokio_console_retention,
}
}),
sentry: self.sentry_dsn.clone().map(|dsn| SentryConfig {
dsn,
environment: self.sentry_environment.clone(),
tags: self
.opentelemetry_resource
.iter()
.cloned()
.map(|kv| (kv.key, kv.value))
.collect(),
event_filter: mz_service::tracing::mz_sentry_event_filter,
}),
build_version: build_info.version,
build_sha: build_info.sha,
build_time: build_info.time,
registry,
})
.await
}
}
pub struct StaticTracingConfig {
pub service_name: &'static str,
pub build_info: BuildInfo,
}
#[derive(Debug)]
pub struct TracingOrchestrator {
inner: Arc<dyn Orchestrator>,
tracing_args: TracingCliArgs,
}
impl TracingOrchestrator {
pub fn new(inner: Arc<dyn Orchestrator>, tracing_args: TracingCliArgs) -> TracingOrchestrator {
TracingOrchestrator {
inner,
tracing_args,
}
}
}
impl Orchestrator for TracingOrchestrator {
fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
Arc::new(NamespacedTracingOrchestrator {
namespace: namespace.to_string(),
inner: self.inner.namespace(namespace),
tracing_args: self.tracing_args.clone(),
})
}
}
#[derive(Debug)]
struct NamespacedTracingOrchestrator {
namespace: String,
inner: Arc<dyn NamespacedOrchestrator>,
tracing_args: TracingCliArgs,
}
#[async_trait]
impl NamespacedOrchestrator for NamespacedTracingOrchestrator {
async fn fetch_service_metrics(
&self,
id: &str,
) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
self.inner.fetch_service_metrics(id).await
}
async fn ensure_service(
&self,
id: &str,
mut service_config: ServiceConfig<'_>,
) -> Result<Box<dyn Service>, anyhow::Error> {
let args_fn = |listen_addrs: &BTreeMap<String, String>| {
#[cfg(feature = "tokio-console")]
let tokio_console_listen_addr = listen_addrs.get("tokio-console");
let mut args = (service_config.args)(listen_addrs);
let TracingCliArgs {
startup_log_filter,
log_prefix,
log_format,
opentelemetry_endpoint,
opentelemetry_header,
startup_opentelemetry_filter: _,
opentelemetry_resource,
#[cfg(feature = "tokio-console")]
tokio_console_listen_addr: _,
#[cfg(feature = "tokio-console")]
tokio_console_publish_interval,
#[cfg(feature = "tokio-console")]
tokio_console_retention,
sentry_dsn,
sentry_environment,
} = &self.tracing_args;
args.push(format!("--startup-log-filter={startup_log_filter}"));
args.push(format!("--log-format={log_format}"));
if log_prefix.is_some() {
args.push(format!("--log-prefix={}-{}", self.namespace, id));
}
if let Some(endpoint) = opentelemetry_endpoint {
args.push(format!("--opentelemetry-endpoint={endpoint}"));
for kv in opentelemetry_header {
args.push(format!(
"--opentelemetry-header={}={}",
kv.key,
kv.value
.to_str()
.expect("opentelemetry-header had non-ascii value"),
));
}
for kv in opentelemetry_resource {
args.push(format!("--opentelemetry-resource={}={}", kv.key, kv.value));
}
}
#[cfg(feature = "tokio-console")]
if let Some(tokio_console_listen_addr) = tokio_console_listen_addr {
args.push(format!(
"--tokio-console-listen-addr={}",
tokio_console_listen_addr,
));
args.push(format!(
"--tokio-console-publish-interval={} us",
tokio_console_publish_interval.as_micros(),
));
args.push(format!(
"--tokio-console-retention={} us",
tokio_console_retention.as_micros(),
));
}
if let Some(dsn) = sentry_dsn {
args.push(format!("--sentry-dsn={dsn}"));
}
if let Some(environment) = sentry_environment {
args.push(format!("--sentry-environment={environment}"));
}
args
};
service_config.args = &args_fn;
#[cfg(feature = "tokio-console")]
if self.tracing_args.tokio_console_listen_addr.is_some() {
service_config.ports.push(ServicePort {
name: "tokio-console".into(),
port_hint: 6669,
});
}
self.inner.ensure_service(id, service_config).await
}
async fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
self.inner.drop_service(id).await
}
async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
self.inner.list_services().await
}
fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
self.inner.watch_services()
}
fn update_scheduling_config(
&self,
config: mz_orchestrator::scheduling_config::ServiceSchedulingConfig,
) {
self.inner.update_scheduling_config(config)
}
}
#[derive(Debug, Clone, Default, clap::ValueEnum)]
pub enum LogFormat {
#[default]
Text,
Json,
}
impl fmt::Display for LogFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LogFormat::Text => f.write_str("text"),
LogFormat::Json => f.write_str("json"),
}
}
}