use std::ffi::CStr;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::LazyLock;
use std::time::{Duration, Instant};
use std::{cmp, env, iter, thread};
use anyhow::{bail, Context};
use clap::{ArgEnum, Parser};
use fail::FailScenario;
use http::header::HeaderValue;
use ipnet::IpNet;
use itertools::Itertools;
use mz_adapter::ResultExt;
use mz_aws_secrets_controller::AwsSecretsController;
use mz_build_info::BuildInfo;
use mz_catalog::builtin::{
UnsafeBuiltinTableFingerprintWhitespace,
UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE,
};
use mz_catalog::config::ClusterReplicaSizeMap;
use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController};
use mz_controller::ControllerConfig;
use mz_environmentd::{CatalogConfig, Listeners, ListenersConfig, BUILD_INFO};
use mz_frontegg_auth::{Authenticator, FronteggCliArgs};
use mz_orchestrator::Orchestrator;
use mz_orchestrator_kubernetes::{
KubernetesImagePullPolicy, KubernetesOrchestrator, KubernetesOrchestratorConfig,
};
use mz_orchestrator_process::{
ProcessOrchestrator, ProcessOrchestratorConfig, ProcessOrchestratorTcpProxyConfig,
};
use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs, TracingOrchestrator};
use mz_ore::cli::{self, CliConfig, KeyValueArg};
use mz_ore::error::ErrorExt;
use mz_ore::metric;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::SYSTEM_TIME;
use mz_ore::task::RuntimeExt;
use mz_ore::url::SensitiveUrl;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::cfg::PersistConfig;
use mz_persist_client::rpc::{
MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
};
use mz_persist_client::PersistLocation;
use mz_secrets::SecretsController;
use mz_server_core::TlsCliArgs;
use mz_service::emit_boot_diagnostics;
use mz_service::secrets::{SecretsControllerKind, SecretsReaderCliArgs};
use mz_sql::catalog::EnvironmentId;
use mz_storage_types::connections::ConnectionContext;
use opentelemetry::trace::TraceContextExt;
use prometheus::IntGauge;
use tracing::{error, info, info_span, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use url::Url;
mod sys;
static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
static LONG_VERSION: LazyLock<String> = LazyLock::new(|| {
iter::once(BUILD_INFO.human_version(None))
.chain(build_info())
.join("\n")
});
#[derive(Parser, Debug)]
#[clap(
name = "environmentd",
next_line_help = true,
version = VERSION.as_str(),
long_version = LONG_VERSION.as_str(),
)]
pub struct Args {
#[clap(long, env = "UNSAFE_MODE")]
unsafe_mode: bool,
#[clap(long, env = "ALL_FEATURES")]
all_features: bool,
#[clap(
long,
env = "SQL_LISTEN_ADDR",
value_name = "HOST:PORT",
default_value = "127.0.0.1:6875"
)]
sql_listen_addr: SocketAddr,
#[clap(
long,
env = "HTTP_LISTEN_ADDR",
value_name = "HOST:PORT",
default_value = "127.0.0.1:6876"
)]
http_listen_addr: SocketAddr,
#[clap(
long,
value_name = "HOST:PORT",
env = "INTERNAL_SQL_LISTEN_ADDR",
default_value = "127.0.0.1:6877"
)]
internal_sql_listen_addr: SocketAddr,
#[clap(
long,
value_name = "HOST:PORT",
env = "INTERNAL_HTTP_LISTEN_ADDR",
default_value = "127.0.0.1:6878"
)]
internal_http_listen_addr: SocketAddr,
#[clap(
long,
value_name = "HOST:PORT",
env = "INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR",
default_value = "127.0.0.1:6879"
)]
internal_persist_pubsub_listen_addr: SocketAddr,
#[clap(
long,
value_name = "HOST:PORT",
env = "BALANCER_SQL_LISTEN_ADDR",
default_value = "127.0.0.1:6880"
)]
balancer_sql_listen_addr: SocketAddr,
#[clap(
long,
value_name = "HOST:PORT",
env = "BALANCER_HTTP_LISTEN_ADDR",
default_value = "127.0.0.1:6881"
)]
balancer_http_listen_addr: SocketAddr,
#[structopt(long, env = "CORS_ALLOWED_ORIGIN")]
cors_allowed_origin: Vec<HeaderValue>,
#[clap(
long,
env = "ANNOUNCE_EGRESS_ADDRESS",
multiple = true,
use_delimiter = true
)]
announce_egress_address: Vec<IpNet>,
#[clap(long, env = "HTTP_HOST_NAME")]
http_host_name: Option<String>,
#[clap(long, env = "INTERNAL_CONSOLE_REDIRECT_URL")]
internal_console_redirect_url: Option<String>,
#[clap(flatten)]
tls: TlsCliArgs,
#[clap(flatten)]
frontegg: FronteggCliArgs,
#[structopt(long, arg_enum, env = "ORCHESTRATOR")]
orchestrator: OrchestratorKind,
#[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SCHEDULER_NAME")]
orchestrator_kubernetes_scheduler_name: Option<String>,
#[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_LABEL")]
orchestrator_kubernetes_service_label: Vec<KeyValueArg<String, String>>,
#[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_NODE_SELECTOR")]
orchestrator_kubernetes_service_node_selector: Vec<KeyValueArg<String, String>>,
#[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ACCOUNT")]
orchestrator_kubernetes_service_account: Option<String>,
#[structopt(
long,
env = "ORCHESTRATOR_KUBERNETES_CONTEXT",
default_value = "minikube"
)]
orchestrator_kubernetes_context: String,
#[structopt(
long,
env = "ORCHESTRATOR_KUBERNETES_IMAGE_PULL_POLICY",
default_value = "always",
arg_enum
)]
orchestrator_kubernetes_image_pull_policy: KubernetesImagePullPolicy,
#[clap(long, env = "ORCHESTRATOR_KUBERNETES_INIT_CONTAINER_IMAGE")]
orchestrator_kubernetes_init_container_image: Option<String>,
#[clap(long, env = "ORCHESTRATOR_KUBERNETES_EPHEMERAL_VOLUME_CLASS")]
orchestrator_kubernetes_ephemeral_volume_class: Option<String>,
#[clap(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_FS_GROUP")]
orchestrator_kubernetes_service_fs_group: Option<i64>,
#[clap(long, env = "ORCHESTRATOR_KUBERNETES_NAME_PREFIX")]
orchestrator_kubernetes_name_prefix: Option<String>,
#[clap(long, env = "ORCHESTRATOR_PROCESS_WRAPPER")]
orchestrator_process_wrapper: Option<String>,
#[clap(
long,
env = "ORCHESTRATOR_PROCESS_SECRETS_DIRECTORY",
value_name = "PATH",
required_if_eq("orchestrator", "process")
)]
orchestrator_process_secrets_directory: Option<PathBuf>,
#[clap(long, env = "ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES")]
orchestrator_process_propagate_crashes: bool,
#[clap(long, env = "ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR")]
orchestrator_process_tcp_proxy_listen_addr: Option<IpAddr>,
#[clap(
long,
env = "ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY"
)]
orchestrator_process_prometheus_service_discovery_directory: Option<PathBuf>,
#[clap(
long,
env = "ORCHESTRATOR_PROCESS_SCRATCH_DIRECTORY",
value_name = "PATH"
)]
orchestrator_process_scratch_directory: Option<PathBuf>,
#[structopt(long, env = "ORCHESTRATOR_KUBERNETES_COVERAGE")]
orchestrator_kubernetes_coverage: bool,
#[structopt(
long,
arg_enum,
env = "SECRETS_CONTROLLER",
default_value_ifs(&[
("orchestrator", Some("kubernetes"), Some("kubernetes")),
("orchestrator", Some("process"), Some("local-file"))
]),
default_value("kubernetes"), )]
secrets_controller: SecretsControllerKind,
#[clap(
long,
env = "AWS_SECRETS_CONTROLLER_TAGS",
multiple = true,
value_delimiter = ';',
required_if_eq("secrets-controller", "aws-secrets-manager")
)]
aws_secrets_controller_tags: Vec<KeyValueArg<String, String>>,
#[structopt(
long,
env = "CLUSTERD_IMAGE",
required_if_eq("orchestrator", "kubernetes"),
default_value_if("orchestrator", Some("process"), Some("clusterd"))
)]
clusterd_image: Option<String>,
#[clap(long, env = "DEPLOY_GENERATION", default_value = "0")]
deploy_generation: u64,
#[clap(
long,
env = "METADATA_BACKEND_URL",
conflicts_with_all = &[
"persist-consensus-url",
"timestamp-oracle-url",
],
)]
metadata_backend_url: Option<SensitiveUrl>,
#[clap(long, env = "HELM_CHART_VERSION")]
helm_chart_version: Option<String>,
#[clap(long, env = "PERSIST_BLOB_URL")]
persist_blob_url: SensitiveUrl,
#[clap(long, env = "PERSIST_CONSENSUS_URL")]
persist_consensus_url: Option<SensitiveUrl>,
#[clap(
long,
env = "PERSIST_PUBSUB_URL",
default_value = "http://localhost:6879"
)]
persist_pubsub_url: String,
#[clap(long, env = "PERSIST_ISOLATED_RUNTIME_THREADS")]
persist_isolated_runtime_threads: Option<isize>,
#[clap(
long,
env = "STORAGE_USAGE_COLLECTION_INTERVAL",
parse(try_from_str = humantime::parse_duration),
default_value = "3600s"
)]
storage_usage_collection_interval_sec: Duration,
#[clap(long, env = "STORAGE_USAGE_RETENTION_PERIOD", parse(try_from_str = humantime::parse_duration))]
storage_usage_retention_period: Option<Duration>,
#[clap(long, env = "TIMESTAMP_ORACLE_URL", value_name = "POSTGRES_URL")]
timestamp_oracle_url: Option<SensitiveUrl>,
#[clap(long, env = "AVAILABILITY_ZONE", use_value_delimiter = true)]
availability_zone: Vec<String>,
#[clap(
long,
env = "CLUSTER_REPLICA_SIZES",
requires = "bootstrap-default-cluster-replica-size"
)]
cluster_replica_sizes: Option<String>,
#[clap(long, env = "SEGMENT_API_KEY")]
segment_api_key: Option<String>,
#[clap(long, env = "SEGMENT_CLIENT_SIDE")]
segment_client_side: bool,
#[clap(long, env = "LAUNCHDARKLY_SDK_KEY")]
launchdarkly_sdk_key: Option<String>,
#[clap(
long,
env = "LAUNCHDARKLY_KEY_MAP",
multiple = true,
value_delimiter = ';'
)]
launchdarkly_key_map: Vec<KeyValueArg<String, String>>,
#[clap(
long,
env = "CONFIG_SYNC_TIMEOUT",
parse(try_from_str = humantime::parse_duration),
default_value = "30s"
)]
config_sync_timeout: Duration,
#[clap(
long,
env = "CONFIG_SYNC_LOOP_INTERVAL",
parse(try_from_str = humantime::parse_duration),
)]
config_sync_loop_interval: Option<Duration>,
#[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
scratch_directory: Option<PathBuf>,
#[clap(
long,
env = "ENVIRONMENT_ID",
value_name = "<CLOUD>-<REGION>-<ORG-ID>-<ORDINAL>"
)]
environment_id: EnvironmentId,
#[clap(long, env = "BOOTSTRAP_ROLE")]
bootstrap_role: Option<String>,
#[clap(
long,
env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
default_value = "1"
)]
bootstrap_default_cluster_replica_size: String,
#[clap(
long,
env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
default_value = "1"
)]
bootstrap_builtin_system_cluster_replica_size: String,
#[clap(
long,
env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
default_value = "1"
)]
bootstrap_builtin_catalog_server_cluster_replica_size: String,
#[clap(
long,
env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
default_value = "1"
)]
bootstrap_builtin_probe_cluster_replica_size: String,
#[clap(
long,
env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
default_value = "1"
)]
bootstrap_builtin_support_cluster_replica_size: String,
#[clap(
long,
env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
default_value = "1"
)]
bootstrap_builtin_analytics_cluster_replica_size: String,
#[clap(
long,
env = "SYSTEM_PARAMETER_DEFAULT",
multiple = true,
value_delimiter = ';'
)]
system_parameter_default: Vec<KeyValueArg<String, String>>,
#[clap(long, env = "AWS_ACCOUNT_ID")]
aws_account_id: Option<String>,
#[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
aws_connection_role_arn: Option<String>,
#[clap(long, env = "AWS_EXTERNAL_ID_PREFIX", value_name = "ID", parse(from_str = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable))]
aws_external_id_prefix: Option<AwsExternalIdPrefix>,
#[clap(
long,
env = "AWS_PRIVATELINK_AVAILABILITY_ZONES",
multiple = true,
use_delimiter = true
)]
aws_privatelink_availability_zones: Option<Vec<String>>,
#[clap(flatten)]
tracing: TracingCliArgs,
#[clap(long, arg_enum, requires = "unsafe-mode")]
unsafe_builtin_table_fingerprint_whitespace: Option<UnsafeBuiltinTableFingerprintWhitespace>,
#[clap(long, requires = "unsafe-mode", default_value = "1")]
unsafe_builtin_table_fingerprint_whitespace_version: usize,
}
#[derive(ArgEnum, Debug, Clone)]
enum OrchestratorKind {
Kubernetes,
Process,
}
fn aws_secrets_controller_prefix(env_id: &EnvironmentId) -> String {
format!("/user-managed/{}/", env_id)
}
fn aws_secrets_controller_key_alias(env_id: &EnvironmentId) -> String {
format!("alias/customer_key_{}", env_id)
}
fn main() {
let args = cli::parse_args(CliConfig {
env_prefix: Some("MZ_"),
enable_version_flag: true,
});
if let Err(err) = run(args) {
panic!("environmentd: fatal: {}", err.display_with_causes());
}
}
fn run(mut args: Args) -> Result<(), anyhow::Error> {
mz_ore::panic::install_enhanced_handler();
let envd_start = Instant::now();
sys::enable_sigusr2_coverage_dump()?;
sys::enable_termination_signal_cleanup()?;
if let Some(fingerprint_whitespace) = args.unsafe_builtin_table_fingerprint_whitespace {
assert!(args.unsafe_mode);
let whitespace = "\n".repeat(args.unsafe_builtin_table_fingerprint_whitespace_version);
*UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE
.lock()
.expect("lock poisoned") = Some((fingerprint_whitespace, whitespace));
}
let ncpus_useful = usize::max(1, cmp::min(num_cpus::get(), num_cpus::get_physical()));
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(ncpus_useful)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("tokio:work-{}", id)
})
.enable_all()
.build()?,
);
args.tracing.log_prefix = if matches!(args.orchestrator, OrchestratorKind::Process) {
Some("environmentd".to_string())
} else {
None
};
let metrics_registry = MetricsRegistry::new();
let (tracing_handle, _tracing_guard) = runtime.block_on(args.tracing.configure_tracing(
StaticTracingConfig {
service_name: "environmentd",
build_info: BUILD_INFO,
},
metrics_registry.clone(),
))?;
let span = tracing::info_span!("environmentd::run").entered();
info!("startup: envd init: beginning");
info!("startup: envd init: preamble beginning");
let metrics = Metrics::register_into(&metrics_registry, BUILD_INFO);
runtime.block_on(mz_alloc::register_metrics_into(&metrics_registry));
runtime.block_on(mz_metrics::rusage::register_metrics_into(&metrics_registry));
let _failpoint_scenario = FailScenario::setup();
let tls = args.tls.into_config()?;
let frontegg = Authenticator::from_args(args.frontegg, &metrics_registry)?;
let allowed_origins = if !args.cors_allowed_origin.is_empty() {
args.cors_allowed_origin
} else {
let port = args.http_listen_addr.port();
vec![
HeaderValue::from_str(&format!("http://localhost:{}", port)).unwrap(),
HeaderValue::from_str(&format!("http://127.0.0.1:{}", port)).unwrap(),
HeaderValue::from_str(&format!("http://[::1]:{}", port)).unwrap(),
HeaderValue::from_str(&format!("https://localhost:{}", port)).unwrap(),
HeaderValue::from_str(&format!("https://127.0.0.1:{}", port)).unwrap(),
HeaderValue::from_str(&format!("https://[::1]:{}", port)).unwrap(),
]
};
let cors_allowed_origin = mz_http_util::build_cors_allowed_origin(&allowed_origins);
let entered = info_span!("environmentd::configure_controller").entered();
let (orchestrator, secrets_controller, cloud_resource_controller): (
Arc<dyn Orchestrator>,
Arc<dyn SecretsController>,
Option<Arc<dyn CloudResourceController>>,
) = match args.orchestrator {
OrchestratorKind::Kubernetes => {
if args.orchestrator_process_scratch_directory.is_some() {
bail!(
"--orchestrator-process-scratch-directory is \
not currently usable with the kubernetes orchestrator"
);
}
let orchestrator = Arc::new(
runtime
.block_on(KubernetesOrchestrator::new(KubernetesOrchestratorConfig {
context: args.orchestrator_kubernetes_context.clone(),
scheduler_name: args.orchestrator_kubernetes_scheduler_name,
service_labels: args
.orchestrator_kubernetes_service_label
.into_iter()
.map(|l| (l.key, l.value))
.collect(),
service_node_selector: args
.orchestrator_kubernetes_service_node_selector
.into_iter()
.map(|l| (l.key, l.value))
.collect(),
service_account: args.orchestrator_kubernetes_service_account,
image_pull_policy: args.orchestrator_kubernetes_image_pull_policy,
aws_external_id_prefix: args.aws_external_id_prefix.clone(),
coverage: args.orchestrator_kubernetes_coverage,
ephemeral_volume_storage_class: args
.orchestrator_kubernetes_ephemeral_volume_class
.clone(),
service_fs_group: args.orchestrator_kubernetes_service_fs_group.clone(),
name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
}))
.context("creating kubernetes orchestrator")?,
);
let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
SecretsControllerKind::Kubernetes => {
let sc = Arc::clone(&orchestrator);
let sc: Arc<dyn SecretsController> = sc;
sc
}
SecretsControllerKind::AwsSecretsManager => {
Arc::new(
runtime.block_on(AwsSecretsController::new(
&aws_secrets_controller_prefix(&args.environment_id),
&aws_secrets_controller_key_alias(&args.environment_id),
args.aws_secrets_controller_tags
.into_iter()
.map(|tag| (tag.key, tag.value))
.collect(),
)),
)
}
SecretsControllerKind::LocalFile => bail!(
"SecretsControllerKind::LocalFile is not compatible with Orchestrator::Kubernetes."
),
};
let cloud_resource_controller = Arc::clone(&orchestrator);
(
orchestrator,
secrets_controller,
Some(cloud_resource_controller),
)
}
OrchestratorKind::Process => {
if args
.orchestrator_kubernetes_ephemeral_volume_class
.is_some()
{
bail!(
"--orchestrator-kubernetes-ephemeral-volume-class is \
not usable with the process orchestrator"
);
}
let orchestrator = Arc::new(
runtime
.block_on(ProcessOrchestrator::new(ProcessOrchestratorConfig {
image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
suppress_output: false,
environment_id: args.environment_id.to_string(),
secrets_dir: args
.orchestrator_process_secrets_directory
.clone()
.expect("clap enforced"),
command_wrapper: args
.orchestrator_process_wrapper
.map_or(Ok(vec![]), |s| shell_words::split(&s))?,
propagate_crashes: args.orchestrator_process_propagate_crashes,
tcp_proxy: args.orchestrator_process_tcp_proxy_listen_addr.map(
|listen_addr| ProcessOrchestratorTcpProxyConfig {
listen_addr,
prometheus_service_discovery_dir: args
.orchestrator_process_prometheus_service_discovery_directory,
},
),
scratch_directory: args
.orchestrator_process_scratch_directory
.expect("process orchestrator requires scratch directory"),
}))
.context("creating process orchestrator")?,
);
let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
SecretsControllerKind::Kubernetes => bail!(
"SecretsControllerKind::Kubernetes is not compatible with Orchestrator::Process."
),
SecretsControllerKind::AwsSecretsManager => {
Arc::new(
runtime.block_on(AwsSecretsController::new(
&aws_secrets_controller_prefix(&args.environment_id),
&aws_secrets_controller_key_alias(&args.environment_id),
args.aws_secrets_controller_tags
.into_iter()
.map(|tag| (tag.key, tag.value))
.collect(),
)),
)
}
SecretsControllerKind::LocalFile => {
let sc = Arc::clone(&orchestrator);
let sc: Arc<dyn SecretsController> = sc;
sc
}
};
(orchestrator, secrets_controller, None)
}
};
drop(entered);
let cloud_resource_reader = cloud_resource_controller.as_ref().map(|c| c.reader());
let secrets_reader = secrets_controller.reader();
let now = SYSTEM_TIME.clone();
let mut persist_config = PersistConfig::new(
&mz_environmentd::BUILD_INFO,
now.clone(),
mz_dyncfgs::all_dyncfgs(),
);
let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
match args.persist_isolated_runtime_threads {
None | Some(0) => (),
Some(x @ ..=-1) => {
let threads = num_cpus::get().saturating_add_signed(x).max(1);
persist_config.isolated_runtime_worker_threads = threads;
}
Some(x @ 1..) => {
let threads = usize::try_from(x).expect("pattern matched a positive value");
persist_config.isolated_runtime_worker_threads = threads;
}
};
let _server = runtime.spawn_named(
|| "persist::rpc::server",
async move {
info!(
"listening for Persist PubSub connections on {}",
args.internal_persist_pubsub_listen_addr
);
let res = persist_pubsub_server
.serve(args.internal_persist_pubsub_listen_addr)
.await;
error!("Persist Pubsub server exited {:?}", res);
}
.instrument(tracing::info_span!("persist::rpc::server")),
);
let persist_clients = {
let _tokio_guard = runtime.enter();
PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
cfg,
persist_pubsub_client.sender,
metrics,
));
PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
})
};
let consensus_uri = args.persist_consensus_url.unwrap_or_else(|| {
args.metadata_backend_url
.as_ref()
.map(|metadata_backend_url| {
SensitiveUrl(
Url::parse_with_params(
metadata_backend_url.0.as_ref(),
&[("options", "--search_path=consensus")],
)
.unwrap(),
)
})
.expect("either --persist-consensus-url or --metadata-backend-url must be provided")
});
let timestamp_oracle_url = args.timestamp_oracle_url.or_else(|| {
args.metadata_backend_url
.as_ref()
.map(|metadata_backend_url| {
SensitiveUrl(
Url::parse_with_params(
metadata_backend_url.0.as_ref(),
&[("options", "--search_path=tsoracle")],
)
.unwrap(),
)
})
});
let persist_clients = Arc::new(persist_clients);
let connection_context = ConnectionContext::from_cli_args(
args.environment_id.to_string(),
&args.tracing.startup_log_filter,
args.aws_external_id_prefix,
args.aws_connection_role_arn,
secrets_reader,
cloud_resource_reader,
);
let orchestrator = Arc::new(TracingOrchestrator::new(orchestrator, args.tracing.clone()));
let controller = ControllerConfig {
build_info: &mz_environmentd::BUILD_INFO,
orchestrator,
persist_location: PersistLocation {
blob_uri: args.persist_blob_url,
consensus_uri,
},
persist_clients: Arc::clone(&persist_clients),
clusterd_image: args.clusterd_image.expect("clap enforced"),
init_container_image: args.orchestrator_kubernetes_init_container_image,
deploy_generation: args.deploy_generation,
now: SYSTEM_TIME.clone(),
metrics_registry: metrics_registry.clone(),
persist_pubsub_url: args.persist_pubsub_url,
connection_context,
secrets_args: SecretsReaderCliArgs {
secrets_reader: args.secrets_controller,
secrets_reader_local_file_dir: args.orchestrator_process_secrets_directory,
secrets_reader_kubernetes_context: Some(args.orchestrator_kubernetes_context),
secrets_reader_aws_prefix: Some(aws_secrets_controller_prefix(&args.environment_id)),
},
};
let cluster_replica_sizes: ClusterReplicaSizeMap = match args.cluster_replica_sizes {
None => Default::default(),
Some(json) => serde_json::from_str(&json).context("parsing replica size map")?,
};
emit_boot_diagnostics!(&BUILD_INFO);
sys::adjust_rlimits();
info!(
"startup: envd init: preamble complete in {:?}",
envd_start.elapsed()
);
let serve_start = Instant::now();
info!("startup: envd init: serving beginning");
let server = runtime.block_on(async {
let listeners = Listeners::bind(ListenersConfig {
sql_listen_addr: args.sql_listen_addr,
http_listen_addr: args.http_listen_addr,
balancer_sql_listen_addr: args.balancer_sql_listen_addr,
balancer_http_listen_addr: args.balancer_http_listen_addr,
internal_sql_listen_addr: args.internal_sql_listen_addr,
internal_http_listen_addr: args.internal_http_listen_addr,
})
.await?;
let catalog_config = CatalogConfig {
persist_clients,
metrics: Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry)),
};
let server = listeners
.serve(mz_environmentd::Config {
unsafe_mode: args.unsafe_mode,
all_features: args.all_features,
tls,
tls_reload_certs: mz_server_core::default_cert_reload_ticker(),
frontegg,
cors_allowed_origin,
egress_addresses: args.announce_egress_address,
http_host_name: args.http_host_name,
internal_console_redirect_url: args.internal_console_redirect_url,
controller,
secrets_controller,
cloud_resource_controller,
storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
storage_usage_retention_period: args.storage_usage_retention_period,
catalog_config,
availability_zones: args.availability_zone,
cluster_replica_sizes,
timestamp_oracle_url,
segment_api_key: args.segment_api_key,
segment_client_side: args.segment_client_side,
launchdarkly_sdk_key: args.launchdarkly_sdk_key,
launchdarkly_key_map: args
.launchdarkly_key_map
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect(),
config_sync_timeout: args.config_sync_timeout,
config_sync_loop_interval: args.config_sync_loop_interval,
environment_id: args.environment_id,
bootstrap_role: args.bootstrap_role,
bootstrap_default_cluster_replica_size: args.bootstrap_default_cluster_replica_size,
bootstrap_builtin_system_cluster_replica_size: args
.bootstrap_builtin_system_cluster_replica_size,
bootstrap_builtin_catalog_server_cluster_replica_size: args
.bootstrap_builtin_catalog_server_cluster_replica_size,
bootstrap_builtin_probe_cluster_replica_size: args
.bootstrap_builtin_probe_cluster_replica_size,
bootstrap_builtin_support_cluster_replica_size: args
.bootstrap_builtin_support_cluster_replica_size,
bootstrap_builtin_analytics_cluster_replica_size: args
.bootstrap_builtin_analytics_cluster_replica_size,
system_parameter_defaults: args
.system_parameter_default
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect(),
helm_chart_version: args.helm_chart_version.clone(),
aws_account_id: args.aws_account_id,
aws_privatelink_availability_zones: args.aws_privatelink_availability_zones,
metrics_registry,
tracing_handle,
now,
})
.await
.maybe_terminate("booting server")?;
Ok::<_, anyhow::Error>(server)
})?;
info!(
"startup: envd init: serving complete in {:?}",
serve_start.elapsed()
);
let start_duration = envd_start.elapsed();
metrics
.start_time_environmentd
.set(start_duration.as_millis().try_into().expect("must fit"));
let span = span.exit();
let id = span.context().span().span_context().trace_id();
drop(span);
info!("startup: envd init: complete in {start_duration:?}");
println!(
"environmentd {} listening...",
mz_environmentd::BUILD_INFO.human_version(args.helm_chart_version)
);
println!(" SQL address: {}", server.sql_local_addr());
println!(" HTTP address: {}", server.http_local_addr());
println!(
" Internal SQL address: {}",
server.internal_sql_local_addr()
);
println!(
" Internal HTTP address: {}",
server.internal_http_local_addr()
);
println!(
" Balancerd SQL address: {}",
server.balancer_sql_local_addr()
);
println!(
" Balancerd HTTP address: {}",
server.balancer_http_local_addr()
);
println!(
" Internal Persist PubSub address: {}",
args.internal_persist_pubsub_listen_addr
);
println!(" Root trace ID: {id}");
loop {
thread::park();
}
}
fn build_info() -> Vec<String> {
let openssl_version =
unsafe { CStr::from_ptr(openssl_sys::OpenSSL_version(openssl_sys::OPENSSL_VERSION)) };
let rdkafka_version = unsafe { CStr::from_ptr(rdkafka_sys::bindings::rd_kafka_version_str()) };
vec![
openssl_version.to_string_lossy().into_owned(),
format!("librdkafka v{}", rdkafka_version.to_string_lossy()),
]
}
#[derive(Debug, Clone)]
struct Metrics {
pub start_time_environmentd: IntGauge,
}
impl Metrics {
pub fn register_into(registry: &MetricsRegistry, build_info: BuildInfo) -> Metrics {
Metrics {
start_time_environmentd: registry.register(metric!(
name: "mz_start_time_environmentd",
help: "Time in milliseconds from environmentd start until the adapter is ready.",
const_labels: {
"version" => build_info.version,
"build_type" => if cfg!(release) { "release" } else { "debug" }
},
)),
}
}
}