1use std::ffi::CStr;
16use std::net::{IpAddr, SocketAddr};
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::sync::LazyLock;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::{Duration, Instant};
22use std::{cmp, env, iter, thread};
23
24use anyhow::{Context, bail};
25use clap::{ArgAction, Parser, ValueEnum};
26use fail::FailScenario;
27use http::header::HeaderValue;
28use ipnet::IpNet;
29use itertools::Itertools;
30use mz_adapter::ResultExt;
31use mz_adapter_types::bootstrap_builtin_cluster_config::{
32 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
33 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, DEFAULT_REPLICATION_FACTOR,
34 PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR, SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
35 SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
36};
37use mz_aws_secrets_controller::AwsSecretsController;
38use mz_build_info::BuildInfo;
39use mz_catalog::builtin::{
40 UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE,
41 UnsafeBuiltinTableFingerprintWhitespace,
42};
43use mz_catalog::config::ClusterReplicaSizeMap;
44use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController};
45use mz_controller::ControllerConfig;
46use mz_frontegg_auth::{Authenticator, FronteggCliArgs};
47use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
48use mz_orchestrator::Orchestrator;
49use mz_orchestrator_kubernetes::{
50 KubernetesImagePullPolicy, KubernetesOrchestrator, KubernetesOrchestratorConfig,
51};
52use mz_orchestrator_process::{
53 ProcessOrchestrator, ProcessOrchestratorConfig, ProcessOrchestratorTcpProxyConfig,
54};
55use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs, TracingOrchestrator};
56use mz_ore::cli::{self, CliConfig, KeyValueArg};
57use mz_ore::error::ErrorExt;
58use mz_ore::metric;
59use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
60use mz_ore::now::SYSTEM_TIME;
61use mz_ore::task::RuntimeExt;
62use mz_ore::url::SensitiveUrl;
63use mz_persist_client::PersistLocation;
64use mz_persist_client::cache::PersistClientCache;
65use mz_persist_client::cfg::PersistConfig;
66use mz_persist_client::rpc::{
67 MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
68};
69use mz_secrets::SecretsController;
70use mz_server_core::TlsCliArgs;
71use mz_service::emit_boot_diagnostics;
72use mz_service::secrets::{SecretsControllerKind, SecretsReaderCliArgs};
73use mz_sql::catalog::EnvironmentId;
74use mz_storage_types::connections::ConnectionContext;
75use opentelemetry::trace::TraceContextExt;
76use prometheus::IntGauge;
77use tracing::{Instrument, error, info, info_span, warn};
78use tracing_opentelemetry::OpenTelemetrySpanExt;
79use url::Url;
80
81use crate::environmentd::sys;
82use crate::{BUILD_INFO, CatalogConfig, Listeners, ListenersConfig};
83
84static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
85static LONG_VERSION: LazyLock<String> = LazyLock::new(|| {
86 iter::once(BUILD_INFO.human_version(None))
87 .chain(build_info())
88 .join("\n")
89});
90
91#[derive(Parser, Debug)]
93#[clap(
94 name = "environmentd",
95 next_line_help = true,
96 version = VERSION.as_str(),
97 long_version = LONG_VERSION.as_str(),
98)]
99pub struct Args {
100 #[clap(long, env = "UNSAFE_MODE")]
104 unsafe_mode: bool,
105 #[clap(long, env = "ALL_FEATURES")]
108 all_features: bool,
109
110 #[clap(
117 long,
118 env = "SQL_LISTEN_ADDR",
119 value_name = "HOST:PORT",
120 default_value = "127.0.0.1:6875",
121 action = ArgAction::Set,
122 )]
123 sql_listen_addr: SocketAddr,
124 #[clap(
130 long,
131 env = "HTTP_LISTEN_ADDR",
132 value_name = "HOST:PORT",
133 default_value = "127.0.0.1:6876",
134 action = ArgAction::Set,
135 )]
136 http_listen_addr: SocketAddr,
137 #[clap(
144 long,
145 value_name = "HOST:PORT",
146 env = "INTERNAL_SQL_LISTEN_ADDR",
147 default_value = "127.0.0.1:6877",
148 action = ArgAction::Set,
149 )]
150 internal_sql_listen_addr: SocketAddr,
151 #[clap(
157 long,
158 value_name = "HOST:PORT",
159 env = "INTERNAL_HTTP_LISTEN_ADDR",
160 default_value = "127.0.0.1:6878",
161 action = ArgAction::Set,
162 )]
163 internal_http_listen_addr: SocketAddr,
164 #[clap(
170 long,
171 value_name = "HOST:PORT",
172 env = "INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR",
173 default_value = "127.0.0.1:6879",
174 action = ArgAction::Set,
175 )]
176 internal_persist_pubsub_listen_addr: SocketAddr,
177 #[structopt(long, env = "CORS_ALLOWED_ORIGIN")]
186 cors_allowed_origin: Vec<HeaderValue>,
187 #[clap(
190 long,
191 env = "ANNOUNCE_EGRESS_ADDRESS",
192 action = ArgAction::Append,
193 use_value_delimiter = true
194 )]
195 announce_egress_address: Vec<IpNet>,
196 #[clap(long, env = "HTTP_HOST_NAME")]
202 http_host_name: Option<String>,
203 #[clap(long, env = "INTERNAL_CONSOLE_REDIRECT_URL")]
206 internal_console_redirect_url: Option<String>,
207 #[clap(flatten)]
209 tls: TlsCliArgs,
210 #[clap(flatten)]
212 frontegg: FronteggCliArgs,
213 #[clap(long, env = "ENABLE_SELF_HOSTED_AUTH")]
217 enable_self_hosted_auth: bool,
218 #[clap(long, env = "ENABLE_SELF_HOSTED_AUTH_INTERNAL")]
220 enable_self_hosted_auth_internal: bool,
221 #[structopt(long, value_enum, env = "ORCHESTRATOR")]
224 orchestrator: OrchestratorKind,
225 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SCHEDULER_NAME")]
227 orchestrator_kubernetes_scheduler_name: Option<String>,
228 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_LABEL")]
231 orchestrator_kubernetes_service_label: Vec<KeyValueArg<String, String>>,
232 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_NODE_SELECTOR")]
235 orchestrator_kubernetes_service_node_selector: Vec<KeyValueArg<String, String>>,
236 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ACCOUNT")]
239 orchestrator_kubernetes_service_account: Option<String>,
240 #[structopt(
245 long,
246 env = "ORCHESTRATOR_KUBERNETES_CONTEXT",
247 default_value = "minikube"
248 )]
249 orchestrator_kubernetes_context: String,
250 #[structopt(
253 long,
254 env = "ORCHESTRATOR_KUBERNETES_IMAGE_PULL_POLICY",
255 default_value = "always",
256 value_enum
257 )]
258 orchestrator_kubernetes_image_pull_policy: KubernetesImagePullPolicy,
259 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_INIT_CONTAINER_IMAGE")]
261 orchestrator_kubernetes_init_container_image: Option<String>,
262 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_EPHEMERAL_VOLUME_CLASS")]
268 orchestrator_kubernetes_ephemeral_volume_class: Option<String>,
269 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_FS_GROUP")]
271 orchestrator_kubernetes_service_fs_group: Option<i64>,
272 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_NAME_PREFIX")]
274 orchestrator_kubernetes_name_prefix: Option<String>,
275 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_DISABLE_POD_METRICS_COLLECTION")]
277 orchestrator_kubernetes_disable_pod_metrics_collection: bool,
278 #[clap(
280 long,
281 env = "ORCHESTRATOR_KUBERNETES_ENABLE_PROMETHEUS_SCRAPE_ANNOTATIONS"
282 )]
283 orchestrator_kubernetes_enable_prometheus_scrape_annotations: bool,
284 #[clap(long, env = "ORCHESTRATOR_PROCESS_WRAPPER")]
285 orchestrator_process_wrapper: Option<String>,
286 #[clap(
288 long,
289 env = "ORCHESTRATOR_PROCESS_SECRETS_DIRECTORY",
290 value_name = "PATH",
291 required_if_eq("orchestrator", "process")
292 )]
293 orchestrator_process_secrets_directory: Option<PathBuf>,
294 #[clap(long, env = "ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES")]
297 orchestrator_process_propagate_crashes: bool,
298 #[clap(long, env = "ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR")]
310 orchestrator_process_tcp_proxy_listen_addr: Option<IpAddr>,
311 #[clap(
324 long,
325 env = "ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY"
326 )]
327 orchestrator_process_prometheus_service_discovery_directory: Option<PathBuf>,
328 #[clap(
330 long,
331 env = "ORCHESTRATOR_PROCESS_SCRATCH_DIRECTORY",
332 value_name = "PATH"
333 )]
334 orchestrator_process_scratch_directory: Option<PathBuf>,
335 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_COVERAGE")]
338 orchestrator_kubernetes_coverage: bool,
339 #[structopt(
341 long,
342 value_enum,
343 env = "SECRETS_CONTROLLER",
344 default_value_ifs([
345 ("orchestrator", "kubernetes", Some("kubernetes")),
346 ("orchestrator", "process", Some("local-file"))
347 ]),
348 default_value("kubernetes"), )]
350 secrets_controller: SecretsControllerKind,
351 #[clap(
354 long,
355 env = "AWS_SECRETS_CONTROLLER_TAGS",
356 action = ArgAction::Append,
357 value_delimiter = ';',
358 required_if_eq("secrets_controller", "aws-secrets-manager")
359 )]
360 aws_secrets_controller_tags: Vec<KeyValueArg<String, String>>,
361 #[structopt(
363 long,
364 env = "CLUSTERD_IMAGE",
365 required_if_eq("orchestrator", "kubernetes"),
366 default_value_if("orchestrator", "process", Some("clusterd"))
367 )]
368 clusterd_image: Option<String>,
369 #[clap(long, env = "DEPLOY_GENERATION", default_value = "0")]
374 deploy_generation: u64,
375
376 #[clap(
379 long,
380 env = "METADATA_BACKEND_URL",
381 conflicts_with_all = &[
382 "persist_consensus_url",
383 "timestamp_oracle_url",
384 ],
385 )]
386 metadata_backend_url: Option<SensitiveUrl>,
387
388 #[clap(long, env = "HELM_CHART_VERSION")]
393 helm_chart_version: Option<String>,
394
395 #[clap(long, env = "PERSIST_BLOB_URL")]
398 persist_blob_url: SensitiveUrl,
399 #[clap(long, env = "PERSIST_CONSENSUS_URL")]
401 persist_consensus_url: Option<SensitiveUrl>,
402 #[clap(
406 long,
407 env = "PERSIST_PUBSUB_URL",
408 default_value = "http://localhost:6879"
409 )]
410 persist_pubsub_url: String,
411 #[clap(long, env = "PERSIST_ISOLATED_RUNTIME_THREADS")]
415 persist_isolated_runtime_threads: Option<isize>,
416 #[clap(
418 long,
419 env = "STORAGE_USAGE_COLLECTION_INTERVAL",
420 value_parser = humantime::parse_duration,
421 default_value = "3600s"
422 )]
423 storage_usage_collection_interval_sec: Duration,
424 #[clap(long, env = "STORAGE_USAGE_RETENTION_PERIOD", value_parser = humantime::parse_duration)]
428 storage_usage_retention_period: Option<Duration>,
429
430 #[clap(long, env = "TIMESTAMP_ORACLE_URL", value_name = "POSTGRES_URL")]
433 timestamp_oracle_url: Option<SensitiveUrl>,
434 #[clap(long, env = "AVAILABILITY_ZONE", use_value_delimiter = true)]
437 availability_zone: Vec<String>,
438 #[clap(
440 long,
441 env = "CLUSTER_REPLICA_SIZES",
442 requires = "bootstrap_default_cluster_replica_size"
443 )]
444 cluster_replica_sizes: String,
445 #[clap(long, env = "SEGMENT_API_KEY")]
447 segment_api_key: Option<String>,
448 #[clap(long, env = "SEGMENT_CLIENT_SIDE")]
454 segment_client_side: bool,
455 #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")]
461 launchdarkly_sdk_key: Option<String>,
462 #[clap(
469 long,
470 env = "LAUNCHDARKLY_KEY_MAP",
471 action = ArgAction::Append,
472 value_delimiter = ';'
473 )]
474 launchdarkly_key_map: Vec<KeyValueArg<String, String>>,
475 #[clap(
477 long,
478 env = "CONFIG_SYNC_TIMEOUT",
479 value_parser = humantime::parse_duration,
480 default_value = "30s"
481 )]
482 config_sync_timeout: Duration,
483 #[clap(
489 long,
490 env = "CONFIG_SYNC_LOOP_INTERVAL",
491 value_parser = humantime::parse_duration,
492 )]
493 config_sync_loop_interval: Option<Duration>,
494 #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
500 scratch_directory: Option<PathBuf>,
501
502 #[clap(
504 long,
505 env = "ENVIRONMENT_ID",
506 value_name = "<CLOUD>-<REGION>-<ORG-ID>-<ORDINAL>"
507 )]
508 environment_id: EnvironmentId,
509 #[clap(long, env = "BOOTSTRAP_ROLE")]
518 bootstrap_role: Option<String>,
519 #[clap(
521 long,
522 env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
523 default_value = "1"
524 )]
525 bootstrap_default_cluster_replica_size: String,
526 #[clap(
528 long,
529 env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
530 default_value = "1"
531 )]
532 bootstrap_builtin_system_cluster_replica_size: String,
533 #[clap(
535 long,
536 env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
537 default_value = "1"
538 )]
539 bootstrap_builtin_catalog_server_cluster_replica_size: String,
540 #[clap(
542 long,
543 env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
544 default_value = "1"
545 )]
546 bootstrap_builtin_probe_cluster_replica_size: String,
547 #[clap(
549 long,
550 env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
551 default_value = "1"
552 )]
553 bootstrap_builtin_support_cluster_replica_size: String,
554 #[clap(
556 long,
557 env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
558 default_value = "1"
559 )]
560 bootstrap_builtin_analytics_cluster_replica_size: String,
561 #[clap(
562 long,
563 env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR",
564 default_value = DEFAULT_REPLICATION_FACTOR.to_string(),
565 value_parser = clap::value_parser!(u32).range(0..=2)
566 )]
567 bootstrap_default_cluster_replication_factor: u32,
568 #[clap(
570 long,
571 env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR",
572 default_value = SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
573 value_parser = clap::value_parser!(u32).range(0..=2)
574 )]
575 bootstrap_builtin_system_cluster_replication_factor: u32,
576 #[clap(
578 long,
579 env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICATION_FACTOR",
580 default_value = CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
581 value_parser = clap::value_parser!(u32).range(0..=2)
582 )]
583 bootstrap_builtin_catalog_server_cluster_replication_factor: u32,
584 #[clap(
586 long,
587 env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR",
588 default_value = PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
589 value_parser = clap::value_parser!(u32).range(0..=2)
590 )]
591 bootstrap_builtin_probe_cluster_replication_factor: u32,
592 #[clap(
594 long,
595 env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICATION_FACTOR",
596 default_value = SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
597 value_parser = clap::value_parser!(u32).range(0..=2)
598 )]
599 bootstrap_builtin_support_cluster_replication_factor: u32,
600 #[clap(
602 long,
603 env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICATION_FACTOR",
604 default_value = ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
605 value_parser = clap::value_parser!(u32).range(0..=2)
606 )]
607 bootstrap_builtin_analytics_cluster_replication_factor: u32,
608 #[clap(
611 long,
612 env = "SYSTEM_PARAMETER_DEFAULT",
613 action = ArgAction::Append,
614 value_delimiter = ';'
615 )]
616 system_parameter_default: Vec<KeyValueArg<String, String>>,
617 #[clap(long, env = "LICENSE_KEY")]
619 license_key: Option<String>,
620 #[clap(long, hide = true)]
622 disable_license_key_checks: bool,
623
624 #[clap(long, env = "AWS_ACCOUNT_ID")]
628 aws_account_id: Option<String>,
629 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
632 aws_connection_role_arn: Option<String>,
633 #[clap(long, env = "AWS_EXTERNAL_ID_PREFIX", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
637 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
638 #[clap(
641 long,
642 env = "AWS_PRIVATELINK_AVAILABILITY_ZONES",
643 action = ArgAction::Append,
644 use_value_delimiter = true
645 )]
646 aws_privatelink_availability_zones: Option<Vec<String>>,
647
648 #[clap(flatten)]
650 tracing: TracingCliArgs,
651
652 #[clap(long, value_enum, requires = "unsafe_mode")]
660 unsafe_builtin_table_fingerprint_whitespace: Option<UnsafeBuiltinTableFingerprintWhitespace>,
661 #[clap(long, requires = "unsafe_mode", default_value = "1")]
667 unsafe_builtin_table_fingerprint_whitespace_version: usize,
668}
669
670#[derive(ValueEnum, Debug, Clone)]
671enum OrchestratorKind {
672 Kubernetes,
673 Process,
674}
675
676fn aws_secrets_controller_prefix(env_id: &EnvironmentId) -> String {
679 format!("/user-managed/{}/", env_id)
680}
681fn aws_secrets_controller_key_alias(env_id: &EnvironmentId) -> String {
682 format!("alias/customer_key_{}", env_id)
685}
686
687pub fn main() {
688 let args = cli::parse_args(CliConfig {
689 env_prefix: Some("MZ_"),
690 enable_version_flag: true,
691 });
692 if let Err(err) = run(args) {
693 panic!("environmentd: fatal: {}", err.display_with_causes());
694 }
695}
696
697fn run(mut args: Args) -> Result<(), anyhow::Error> {
698 mz_ore::panic::install_enhanced_handler();
699 let envd_start = Instant::now();
700
701 sys::enable_sigusr2_coverage_dump()?;
704 sys::enable_termination_signal_cleanup()?;
705
706 let license_key = if args.disable_license_key_checks {
707 ValidatedLicenseKey::disabled()
708 } else if let Some(license_key_file) = args.license_key {
709 let license_key_text = std::fs::read_to_string(&license_key_file)
710 .context("failed to open license key file")?;
711 let license_key = mz_license_keys::validate(
712 license_key_text.trim(),
713 &args.environment_id.organization_id().to_string(),
714 )
715 .context("failed to validate license key file")?;
716 if license_key.expired {
717 let message = format!(
718 "The license key provided at {license_key_file} is expired! Please contact Materialize for assistance."
719 );
720 match license_key.expiration_behavior {
721 ExpirationBehavior::Warn | ExpirationBehavior::DisableClusterCreation => {
722 warn!("{message}");
723 }
724 ExpirationBehavior::Disable => bail!("{message}"),
725 }
726 }
727 license_key
728 } else if matches!(args.orchestrator, OrchestratorKind::Kubernetes) {
729 bail!("--license-key is required when running in Kubernetes");
730 } else {
731 ValidatedLicenseKey::default()
732 };
733
734 if let Some(fingerprint_whitespace) = args.unsafe_builtin_table_fingerprint_whitespace {
736 assert!(args.unsafe_mode);
737 let whitespace = "\n".repeat(args.unsafe_builtin_table_fingerprint_whitespace_version);
738 *UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE
739 .lock()
740 .expect("lock poisoned") = Some((fingerprint_whitespace, whitespace));
741 }
742
743 let ncpus_useful = usize::max(1, cmp::min(num_cpus::get(), num_cpus::get_physical()));
746 let runtime = Arc::new(
747 tokio::runtime::Builder::new_multi_thread()
748 .worker_threads(ncpus_useful)
749 .thread_name_fn(|| {
752 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
753 let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
754 format!("tokio:work-{}", id)
755 })
756 .enable_all()
757 .build()?,
758 );
759
760 args.tracing.log_prefix = if matches!(args.orchestrator, OrchestratorKind::Process) {
764 Some("environmentd".to_string())
765 } else {
766 None
767 };
768
769 let metrics_registry = MetricsRegistry::new();
770 let (tracing_handle, _tracing_guard) = runtime.block_on(args.tracing.configure_tracing(
771 StaticTracingConfig {
772 service_name: "environmentd",
773 build_info: BUILD_INFO,
774 },
775 metrics_registry.clone(),
776 ))?;
777 register_runtime_metrics("main", runtime.metrics(), &metrics_registry);
778
779 let span = tracing::info_span!("environmentd::run").entered();
780
781 info!("startup: envd init: beginning");
782 info!("startup: envd init: preamble beginning");
783
784 let metrics = Metrics::register_into(&metrics_registry, BUILD_INFO);
785
786 runtime.block_on(mz_alloc::register_metrics_into(&metrics_registry));
787 runtime.block_on(mz_metrics::register_metrics_into(
788 &metrics_registry,
789 mz_dyncfgs::all_dyncfgs(),
790 ));
791
792 let _failpoint_scenario = FailScenario::setup();
794
795 let tls = args.tls.into_config()?;
797 let frontegg = Authenticator::from_args(args.frontegg, &metrics_registry)?;
798
799 let allowed_origins = if !args.cors_allowed_origin.is_empty() {
801 args.cors_allowed_origin
802 } else {
803 let port = args.http_listen_addr.port();
804 vec![
805 HeaderValue::from_str(&format!("http://localhost:{}", port)).unwrap(),
806 HeaderValue::from_str(&format!("http://127.0.0.1:{}", port)).unwrap(),
807 HeaderValue::from_str(&format!("http://[::1]:{}", port)).unwrap(),
808 HeaderValue::from_str(&format!("https://localhost:{}", port)).unwrap(),
809 HeaderValue::from_str(&format!("https://127.0.0.1:{}", port)).unwrap(),
810 HeaderValue::from_str(&format!("https://[::1]:{}", port)).unwrap(),
811 ]
812 };
813 let cors_allowed_origin = mz_http_util::build_cors_allowed_origin(&allowed_origins);
814
815 let entered = info_span!("environmentd::configure_controller").entered();
817 let (orchestrator, secrets_controller, cloud_resource_controller): (
818 Arc<dyn Orchestrator>,
819 Arc<dyn SecretsController>,
820 Option<Arc<dyn CloudResourceController>>,
821 ) = match args.orchestrator {
822 OrchestratorKind::Kubernetes => {
823 if args.orchestrator_process_scratch_directory.is_some() {
824 bail!(
825 "--orchestrator-process-scratch-directory is \
826 not currently usable with the kubernetes orchestrator"
827 );
828 }
829
830 let orchestrator = Arc::new(
831 runtime
832 .block_on(KubernetesOrchestrator::new(KubernetesOrchestratorConfig {
833 context: args.orchestrator_kubernetes_context.clone(),
834 scheduler_name: args.orchestrator_kubernetes_scheduler_name,
835 service_labels: args
836 .orchestrator_kubernetes_service_label
837 .into_iter()
838 .map(|l| (l.key, l.value))
839 .collect(),
840 service_node_selector: args
841 .orchestrator_kubernetes_service_node_selector
842 .into_iter()
843 .map(|l| (l.key, l.value))
844 .collect(),
845 service_account: args.orchestrator_kubernetes_service_account,
846 image_pull_policy: args.orchestrator_kubernetes_image_pull_policy,
847 aws_external_id_prefix: args.aws_external_id_prefix.clone(),
848 coverage: args.orchestrator_kubernetes_coverage,
849 ephemeral_volume_storage_class: args
850 .orchestrator_kubernetes_ephemeral_volume_class
851 .clone(),
852 service_fs_group: args.orchestrator_kubernetes_service_fs_group.clone(),
853 name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
854 collect_pod_metrics: !args
855 .orchestrator_kubernetes_disable_pod_metrics_collection,
856 enable_prometheus_scrape_annotations: args
857 .orchestrator_kubernetes_enable_prometheus_scrape_annotations,
858 }))
859 .context("creating kubernetes orchestrator")?,
860 );
861 let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
862 SecretsControllerKind::Kubernetes => {
863 let sc = Arc::clone(&orchestrator);
864 let sc: Arc<dyn SecretsController> = sc;
865 sc
866 }
867 SecretsControllerKind::AwsSecretsManager => {
868 Arc::new(
869 runtime.block_on(AwsSecretsController::new(
870 &aws_secrets_controller_prefix(&args.environment_id),
873 &aws_secrets_controller_key_alias(&args.environment_id),
874 args.aws_secrets_controller_tags
875 .into_iter()
876 .map(|tag| (tag.key, tag.value))
877 .collect(),
878 )),
879 )
880 }
881 SecretsControllerKind::LocalFile => bail!(
882 "SecretsControllerKind::LocalFile is not compatible with Orchestrator::Kubernetes."
883 ),
884 };
885 let cloud_resource_controller = Arc::clone(&orchestrator);
886 (
887 orchestrator,
888 secrets_controller,
889 Some(cloud_resource_controller),
890 )
891 }
892 OrchestratorKind::Process => {
893 if args
894 .orchestrator_kubernetes_ephemeral_volume_class
895 .is_some()
896 {
897 bail!(
898 "--orchestrator-kubernetes-ephemeral-volume-class is \
899 not usable with the process orchestrator"
900 );
901 }
902 let orchestrator = Arc::new(
903 runtime
904 .block_on(ProcessOrchestrator::new(ProcessOrchestratorConfig {
905 image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
911 suppress_output: false,
912 environment_id: args.environment_id.to_string(),
913 secrets_dir: args
914 .orchestrator_process_secrets_directory
915 .clone()
916 .expect("clap enforced"),
917 command_wrapper: args
918 .orchestrator_process_wrapper
919 .map_or(Ok(vec![]), |s| shell_words::split(&s))?,
920 propagate_crashes: args.orchestrator_process_propagate_crashes,
921 tcp_proxy: args.orchestrator_process_tcp_proxy_listen_addr.map(
922 |listen_addr| ProcessOrchestratorTcpProxyConfig {
923 listen_addr,
924 prometheus_service_discovery_dir: args
925 .orchestrator_process_prometheus_service_discovery_directory,
926 },
927 ),
928 scratch_directory: args
929 .orchestrator_process_scratch_directory
930 .expect("process orchestrator requires scratch directory"),
931 }))
932 .context("creating process orchestrator")?,
933 );
934 let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
935 SecretsControllerKind::Kubernetes => bail!(
936 "SecretsControllerKind::Kubernetes is not compatible with Orchestrator::Process."
937 ),
938 SecretsControllerKind::AwsSecretsManager => Arc::new(
939 runtime.block_on(AwsSecretsController::new(
940 &aws_secrets_controller_prefix(&args.environment_id),
941 &aws_secrets_controller_key_alias(&args.environment_id),
942 args.aws_secrets_controller_tags
943 .into_iter()
944 .map(|tag| (tag.key, tag.value))
945 .collect(),
946 )),
947 ),
948 SecretsControllerKind::LocalFile => {
949 let sc = Arc::clone(&orchestrator);
950 let sc: Arc<dyn SecretsController> = sc;
951 sc
952 }
953 };
954 (orchestrator, secrets_controller, None)
955 }
956 };
957 drop(entered);
958 let cloud_resource_reader = cloud_resource_controller.as_ref().map(|c| c.reader());
959 let secrets_reader = secrets_controller.reader();
960 let now = SYSTEM_TIME.clone();
961
962 let mut persist_config =
963 PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
964 persist_config.disable_compaction();
966
967 let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
968 let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
969
970 match args.persist_isolated_runtime_threads {
971 None | Some(0) => (),
973 Some(x @ ..=-1) => {
974 let threads = num_cpus::get().saturating_add_signed(x).max(1);
975 persist_config.isolated_runtime_worker_threads = threads;
976 }
977 Some(x @ 1..) => {
978 let threads = usize::try_from(x).expect("pattern matched a positive value");
979 persist_config.isolated_runtime_worker_threads = threads;
980 }
981 };
982
983 let _server = runtime.spawn_named(
984 || "persist::rpc::server",
985 async move {
986 info!(
987 "listening for Persist PubSub connections on {}",
988 args.internal_persist_pubsub_listen_addr
989 );
990 let res = persist_pubsub_server
993 .serve(args.internal_persist_pubsub_listen_addr)
994 .await;
995 error!("Persist Pubsub server exited {:?}", res);
996 }
997 .instrument(tracing::info_span!("persist::rpc::server")),
998 );
999
1000 let persist_clients = {
1001 let _tokio_guard = runtime.enter();
1003 PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
1004 let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
1005 cfg,
1006 persist_pubsub_client.sender,
1007 metrics,
1008 ));
1009 PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
1010 })
1011 };
1012
1013 let consensus_uri = args.persist_consensus_url.unwrap_or_else(|| {
1014 args.metadata_backend_url
1015 .as_ref()
1016 .map(|metadata_backend_url| {
1017 SensitiveUrl(
1018 Url::parse_with_params(
1019 metadata_backend_url.0.as_ref(),
1020 &[("options", "--search_path=consensus")],
1021 )
1022 .unwrap(),
1023 )
1024 })
1025 .expect("either --persist-consensus-url or --metadata-backend-url must be provided")
1026 });
1027 let timestamp_oracle_url = args.timestamp_oracle_url.or_else(|| {
1028 args.metadata_backend_url
1029 .as_ref()
1030 .map(|metadata_backend_url| {
1031 SensitiveUrl(
1032 Url::parse_with_params(
1033 metadata_backend_url.0.as_ref(),
1034 &[("options", "--search_path=tsoracle")],
1035 )
1036 .unwrap(),
1037 )
1038 })
1039 });
1040
1041 let persist_clients = Arc::new(persist_clients);
1042 let connection_context = ConnectionContext::from_cli_args(
1043 args.environment_id.to_string(),
1044 &args.tracing.startup_log_filter,
1045 args.aws_external_id_prefix,
1046 args.aws_connection_role_arn,
1047 secrets_reader,
1048 cloud_resource_reader,
1049 );
1050 let orchestrator = Arc::new(TracingOrchestrator::new(orchestrator, args.tracing.clone()));
1051 let controller = ControllerConfig {
1052 build_info: &BUILD_INFO,
1053 orchestrator,
1054 persist_location: PersistLocation {
1055 blob_uri: args.persist_blob_url,
1056 consensus_uri,
1057 },
1058 persist_clients: Arc::clone(&persist_clients),
1059 clusterd_image: args.clusterd_image.expect("clap enforced"),
1060 init_container_image: args.orchestrator_kubernetes_init_container_image,
1061 deploy_generation: args.deploy_generation,
1062 now: SYSTEM_TIME.clone(),
1063 metrics_registry: metrics_registry.clone(),
1064 persist_pubsub_url: args.persist_pubsub_url,
1065 connection_context,
1066 secrets_args: SecretsReaderCliArgs {
1069 secrets_reader: args.secrets_controller,
1070 secrets_reader_local_file_dir: args.orchestrator_process_secrets_directory,
1071 secrets_reader_kubernetes_context: Some(args.orchestrator_kubernetes_context),
1072 secrets_reader_aws_prefix: Some(aws_secrets_controller_prefix(&args.environment_id)),
1073 secrets_reader_name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
1074 },
1075 };
1076
1077 let cluster_replica_sizes = ClusterReplicaSizeMap::parse_from_str(
1078 &args.cluster_replica_sizes,
1079 !license_key.allow_credit_consumption_override,
1080 )
1081 .context("parsing replica size map")?;
1082
1083 emit_boot_diagnostics!(&BUILD_INFO);
1084 sys::adjust_rlimits();
1085
1086 info!(
1087 "startup: envd init: preamble complete in {:?}",
1088 envd_start.elapsed()
1089 );
1090
1091 let serve_start = Instant::now();
1092 info!("startup: envd init: serving beginning");
1093 let server = runtime.block_on(async {
1094 let listeners = Listeners::bind(ListenersConfig {
1095 sql_listen_addr: args.sql_listen_addr,
1096 http_listen_addr: args.http_listen_addr,
1097 internal_sql_listen_addr: args.internal_sql_listen_addr,
1098 internal_http_listen_addr: args.internal_http_listen_addr,
1099 })
1100 .await?;
1101 let catalog_config = CatalogConfig {
1102 persist_clients,
1103 metrics: Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry)),
1104 };
1105 let server = listeners
1106 .serve(crate::Config {
1107 unsafe_mode: args.unsafe_mode,
1109 all_features: args.all_features,
1110 tls,
1112 tls_reload_certs: mz_server_core::default_cert_reload_ticker(),
1113 frontegg,
1114 cors_allowed_origin,
1115 egress_addresses: args.announce_egress_address,
1116 http_host_name: args.http_host_name,
1117 internal_console_redirect_url: args.internal_console_redirect_url,
1118 self_hosted_auth: args.enable_self_hosted_auth,
1119 self_hosted_auth_internal: args.enable_self_hosted_auth_internal,
1120 controller,
1122 secrets_controller,
1123 cloud_resource_controller,
1124 storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
1126 storage_usage_retention_period: args.storage_usage_retention_period,
1127 catalog_config,
1129 availability_zones: args.availability_zone,
1130 cluster_replica_sizes,
1131 timestamp_oracle_url,
1132 segment_api_key: args.segment_api_key,
1133 segment_client_side: args.segment_client_side,
1134 launchdarkly_sdk_key: args.launchdarkly_sdk_key,
1135 launchdarkly_key_map: args
1136 .launchdarkly_key_map
1137 .into_iter()
1138 .map(|kv| (kv.key, kv.value))
1139 .collect(),
1140 config_sync_timeout: args.config_sync_timeout,
1141 config_sync_loop_interval: args.config_sync_loop_interval,
1142 environment_id: args.environment_id,
1144 bootstrap_role: args.bootstrap_role,
1145 bootstrap_default_cluster_replica_size: args.bootstrap_default_cluster_replica_size,
1146 bootstrap_default_cluster_replication_factor: args
1147 .bootstrap_default_cluster_replication_factor,
1148 bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1149 size: args.bootstrap_builtin_system_cluster_replica_size,
1150 replication_factor: args.bootstrap_builtin_system_cluster_replication_factor,
1151 },
1152 bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1153 size: args.bootstrap_builtin_catalog_server_cluster_replica_size,
1154 replication_factor: args
1155 .bootstrap_builtin_catalog_server_cluster_replication_factor,
1156 },
1157 bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1158 size: args.bootstrap_builtin_probe_cluster_replica_size,
1159 replication_factor: args.bootstrap_builtin_probe_cluster_replication_factor,
1160 },
1161 bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1162 size: args.bootstrap_builtin_support_cluster_replica_size,
1163 replication_factor: args.bootstrap_builtin_support_cluster_replication_factor,
1164 },
1165 bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1166 size: args.bootstrap_builtin_analytics_cluster_replica_size,
1167 replication_factor: args.bootstrap_builtin_analytics_cluster_replication_factor,
1168 },
1169 system_parameter_defaults: args
1170 .system_parameter_default
1171 .into_iter()
1172 .map(|kv| (kv.key, kv.value))
1173 .collect(),
1174 helm_chart_version: args.helm_chart_version.clone(),
1175 license_key,
1176 aws_account_id: args.aws_account_id,
1178 aws_privatelink_availability_zones: args.aws_privatelink_availability_zones,
1179 metrics_registry,
1181 tracing_handle,
1182 now,
1184 })
1185 .await
1186 .maybe_terminate("booting server")?;
1187 Ok::<_, anyhow::Error>(server)
1188 })?;
1189 info!(
1190 "startup: envd init: serving complete in {:?}",
1191 serve_start.elapsed()
1192 );
1193
1194 let start_duration = envd_start.elapsed();
1195 metrics
1196 .start_time_environmentd
1197 .set(start_duration.as_millis().try_into().expect("must fit"));
1198 let span = span.exit();
1199 let id = span.context().span().span_context().trace_id();
1200 drop(span);
1201
1202 info!("startup: envd init: complete in {start_duration:?}");
1203
1204 println!(
1205 "environmentd {} listening...",
1206 BUILD_INFO.human_version(args.helm_chart_version)
1207 );
1208 println!(" SQL address: {}", server.sql_local_addr());
1209 println!(" HTTP address: {}", server.http_local_addr());
1210 println!(
1211 " Internal SQL address: {}",
1212 server.internal_sql_local_addr()
1213 );
1214 println!(
1215 " Internal HTTP address: {}",
1216 server.internal_http_local_addr()
1217 );
1218 println!(
1219 " Internal Persist PubSub address: {}",
1220 args.internal_persist_pubsub_listen_addr
1221 );
1222
1223 println!(" Root trace ID: {id}");
1224
1225 loop {
1227 thread::park();
1228 }
1229}
1230
1231fn build_info() -> Vec<String> {
1232 let openssl_version =
1233 unsafe { CStr::from_ptr(openssl_sys::OpenSSL_version(openssl_sys::OPENSSL_VERSION)) };
1234 let rdkafka_version = unsafe { CStr::from_ptr(rdkafka_sys::bindings::rd_kafka_version_str()) };
1235 vec![
1236 openssl_version.to_string_lossy().into_owned(),
1237 format!("librdkafka v{}", rdkafka_version.to_string_lossy()),
1238 ]
1239}
1240
1241#[derive(Debug, Clone)]
1242struct Metrics {
1243 pub start_time_environmentd: IntGauge,
1244}
1245
1246impl Metrics {
1247 pub fn register_into(registry: &MetricsRegistry, build_info: BuildInfo) -> Metrics {
1248 Metrics {
1249 start_time_environmentd: registry.register(metric!(
1250 name: "mz_start_time_environmentd",
1251 help: "Time in milliseconds from environmentd start until the adapter is ready.",
1252 const_labels: {
1253 "version" => build_info.version,
1254 "build_type" => if cfg!(release) { "release" } else { "debug" }
1255 },
1256 )),
1257 }
1258 }
1259}