1use std::ffi::CStr;
16use std::fs::File;
17use std::net::{IpAddr, SocketAddr};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::LazyLock;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use std::{cmp, env, iter, thread};
24
25use anyhow::{Context, bail};
26use clap::{ArgAction, Parser, ValueEnum};
27use fail::FailScenario;
28use http::header::HeaderValue;
29use ipnet::IpNet;
30use itertools::Itertools;
31use mz_adapter::ResultExt;
32use mz_adapter_types::bootstrap_builtin_cluster_config::{
33 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
34 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, DEFAULT_REPLICATION_FACTOR,
35 PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR, SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
36 SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
37};
38use mz_auth::password::Password;
39use mz_aws_secrets_controller::AwsSecretsController;
40use mz_build_info::BuildInfo;
41use mz_catalog::builtin::{
42 UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE,
43 UnsafeBuiltinTableFingerprintWhitespace,
44};
45use mz_catalog::config::ClusterReplicaSizeMap;
46use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController};
47use mz_controller::ControllerConfig;
48use mz_frontegg_auth::{Authenticator as FronteggAuthenticator, FronteggCliArgs};
49use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
50use mz_orchestrator::Orchestrator;
51use mz_orchestrator_kubernetes::{
52 KubernetesImagePullPolicy, KubernetesOrchestrator, KubernetesOrchestratorConfig,
53};
54use mz_orchestrator_process::{
55 ProcessOrchestrator, ProcessOrchestratorConfig, ProcessOrchestratorTcpProxyConfig,
56};
57use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs, TracingOrchestrator};
58use mz_ore::cli::{self, CliConfig, KeyValueArg};
59use mz_ore::error::ErrorExt;
60use mz_ore::metric;
61use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
62use mz_ore::now::SYSTEM_TIME;
63use mz_ore::task::RuntimeExt;
64use mz_ore::url::SensitiveUrl;
65use mz_persist_client::PersistLocation;
66use mz_persist_client::cache::PersistClientCache;
67use mz_persist_client::cfg::PersistConfig;
68use mz_persist_client::rpc::{
69 MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
70};
71use mz_secrets::SecretsController;
72use mz_server_core::TlsCliArgs;
73use mz_service::emit_boot_diagnostics;
74use mz_service::secrets::{SecretsControllerKind, SecretsReaderCliArgs};
75use mz_sql::catalog::EnvironmentId;
76use mz_storage_types::connections::ConnectionContext;
77use opentelemetry::trace::TraceContextExt;
78use prometheus::IntGauge;
79use tracing::{Instrument, error, info, info_span, warn};
80use tracing_opentelemetry::OpenTelemetrySpanExt;
81use url::Url;
82
83use crate::environmentd::sys;
84use crate::{BUILD_INFO, CatalogConfig, ListenerConfig, Listeners, ListenersConfig};
85
86static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
87static LONG_VERSION: LazyLock<String> = LazyLock::new(|| {
88 iter::once(BUILD_INFO.human_version(None))
89 .chain(build_info())
90 .join("\n")
91});
92
93#[derive(Parser, Debug)]
95#[clap(
96 name = "environmentd",
97 next_line_help = true,
98 version = VERSION.as_str(),
99 long_version = LONG_VERSION.as_str(),
100)]
101pub struct Args {
102 #[clap(long, env = "UNSAFE_MODE")]
106 unsafe_mode: bool,
107 #[clap(long, env = "ALL_FEATURES")]
110 all_features: bool,
111
112 #[clap(
116 long,
117 env = "LISTENERS_CONFIG_PATH",
118 value_name = "PATH",
119 action = ArgAction::Set,
120 )]
121 listeners_config_path: PathBuf,
122 #[clap(
124 long,
125 env = "EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM",
126 action = ArgAction::Set,
127 )]
128 external_login_password_mz_system: Option<Password>,
129 #[clap(
135 long,
136 value_name = "HOST:PORT",
137 env = "INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR",
138 default_value = "127.0.0.1:6879",
139 action = ArgAction::Set,
140 )]
141 internal_persist_pubsub_listen_addr: SocketAddr,
142 #[structopt(long, env = "CORS_ALLOWED_ORIGIN")]
151 cors_allowed_origin: Vec<HeaderValue>,
152 #[clap(
155 long,
156 env = "ANNOUNCE_EGRESS_ADDRESS",
157 action = ArgAction::Append,
158 use_value_delimiter = true
159 )]
160 announce_egress_address: Vec<IpNet>,
161 #[clap(long, env = "HTTP_HOST_NAME")]
167 http_host_name: Option<String>,
168 #[clap(long, env = "INTERNAL_CONSOLE_REDIRECT_URL")]
171 internal_console_redirect_url: Option<String>,
172 #[clap(flatten)]
174 tls: TlsCliArgs,
175 #[clap(flatten)]
177 frontegg: FronteggCliArgs,
178 #[structopt(long, value_enum, env = "ORCHESTRATOR")]
181 orchestrator: OrchestratorKind,
182 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SCHEDULER_NAME")]
184 orchestrator_kubernetes_scheduler_name: Option<String>,
185 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ANNOTATION")]
188 orchestrator_kubernetes_service_annotation: Vec<KeyValueArg<String, String>>,
189 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_LABEL")]
192 orchestrator_kubernetes_service_label: Vec<KeyValueArg<String, String>>,
193 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_NODE_SELECTOR")]
196 orchestrator_kubernetes_service_node_selector: Vec<KeyValueArg<String, String>>,
197 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_AFFINITY")]
200 orchestrator_kubernetes_service_affinity: Option<String>,
201 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_TOLERATIONS")]
204 orchestrator_kubernetes_service_tolerations: Option<String>,
205 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ACCOUNT")]
208 orchestrator_kubernetes_service_account: Option<String>,
209 #[structopt(
214 long,
215 env = "ORCHESTRATOR_KUBERNETES_CONTEXT",
216 default_value = "minikube"
217 )]
218 orchestrator_kubernetes_context: String,
219 #[structopt(
222 long,
223 env = "ORCHESTRATOR_KUBERNETES_IMAGE_PULL_POLICY",
224 default_value = "always",
225 value_enum
226 )]
227 orchestrator_kubernetes_image_pull_policy: KubernetesImagePullPolicy,
228 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_INIT_CONTAINER_IMAGE")]
230 orchestrator_kubernetes_init_container_image: Option<String>,
231 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_EPHEMERAL_VOLUME_CLASS")]
237 orchestrator_kubernetes_ephemeral_volume_class: Option<String>,
238 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_FS_GROUP")]
240 orchestrator_kubernetes_service_fs_group: Option<i64>,
241 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_NAME_PREFIX")]
243 orchestrator_kubernetes_name_prefix: Option<String>,
244 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_DISABLE_POD_METRICS_COLLECTION")]
246 orchestrator_kubernetes_disable_pod_metrics_collection: bool,
247 #[clap(
249 long,
250 env = "ORCHESTRATOR_KUBERNETES_ENABLE_PROMETHEUS_SCRAPE_ANNOTATIONS"
251 )]
252 orchestrator_kubernetes_enable_prometheus_scrape_annotations: bool,
253 #[clap(long, env = "ORCHESTRATOR_PROCESS_WRAPPER")]
254 orchestrator_process_wrapper: Option<String>,
255 #[clap(
257 long,
258 env = "ORCHESTRATOR_PROCESS_SECRETS_DIRECTORY",
259 value_name = "PATH",
260 required_if_eq("orchestrator", "process")
261 )]
262 orchestrator_process_secrets_directory: Option<PathBuf>,
263 #[clap(long, env = "ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES")]
266 orchestrator_process_propagate_crashes: bool,
267 #[clap(long, env = "ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR")]
279 orchestrator_process_tcp_proxy_listen_addr: Option<IpAddr>,
280 #[clap(
293 long,
294 env = "ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY"
295 )]
296 orchestrator_process_prometheus_service_discovery_directory: Option<PathBuf>,
297 #[clap(
299 long,
300 env = "ORCHESTRATOR_PROCESS_SCRATCH_DIRECTORY",
301 value_name = "PATH"
302 )]
303 orchestrator_process_scratch_directory: Option<PathBuf>,
304 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_COVERAGE")]
307 orchestrator_kubernetes_coverage: bool,
308 #[structopt(
310 long,
311 value_enum,
312 env = "SECRETS_CONTROLLER",
313 default_value_ifs([
314 ("orchestrator", "kubernetes", Some("kubernetes")),
315 ("orchestrator", "process", Some("local-file"))
316 ]),
317 default_value("kubernetes"), )]
319 secrets_controller: SecretsControllerKind,
320 #[clap(
323 long,
324 env = "AWS_SECRETS_CONTROLLER_TAGS",
325 action = ArgAction::Append,
326 value_delimiter = ';',
327 required_if_eq("secrets_controller", "aws-secrets-manager")
328 )]
329 aws_secrets_controller_tags: Vec<KeyValueArg<String, String>>,
330 #[structopt(
332 long,
333 env = "CLUSTERD_IMAGE",
334 required_if_eq("orchestrator", "kubernetes"),
335 default_value_if("orchestrator", "process", Some("clusterd"))
336 )]
337 clusterd_image: Option<String>,
338 #[clap(long, env = "DEPLOY_GENERATION", default_value = "0")]
343 deploy_generation: u64,
344
345 #[clap(
348 long,
349 env = "METADATA_BACKEND_URL",
350 conflicts_with_all = &[
351 "persist_consensus_url",
352 "timestamp_oracle_url",
353 ],
354 )]
355 metadata_backend_url: Option<SensitiveUrl>,
356
357 #[clap(long, env = "HELM_CHART_VERSION")]
362 helm_chart_version: Option<String>,
363
364 #[clap(long, env = "PERSIST_BLOB_URL")]
367 persist_blob_url: SensitiveUrl,
368 #[clap(long, env = "PERSIST_CONSENSUS_URL")]
370 persist_consensus_url: Option<SensitiveUrl>,
371 #[clap(
375 long,
376 env = "PERSIST_PUBSUB_URL",
377 default_value = "http://localhost:6879"
378 )]
379 persist_pubsub_url: String,
380 #[clap(long, env = "PERSIST_ISOLATED_RUNTIME_THREADS")]
384 persist_isolated_runtime_threads: Option<isize>,
385 #[clap(
387 long,
388 env = "STORAGE_USAGE_COLLECTION_INTERVAL",
389 value_parser = humantime::parse_duration,
390 default_value = "3600s"
391 )]
392 storage_usage_collection_interval_sec: Duration,
393 #[clap(long, env = "STORAGE_USAGE_RETENTION_PERIOD", value_parser = humantime::parse_duration)]
397 storage_usage_retention_period: Option<Duration>,
398
399 #[clap(long, env = "TIMESTAMP_ORACLE_URL", value_name = "POSTGRES_URL")]
402 timestamp_oracle_url: Option<SensitiveUrl>,
403 #[clap(long, env = "AVAILABILITY_ZONE", use_value_delimiter = true)]
406 availability_zone: Vec<String>,
407 #[clap(
409 long,
410 env = "CLUSTER_REPLICA_SIZES",
411 requires = "bootstrap_default_cluster_replica_size"
412 )]
413 cluster_replica_sizes: String,
414 #[clap(long, env = "SEGMENT_API_KEY")]
416 segment_api_key: Option<String>,
417 #[clap(long, env = "SEGMENT_CLIENT_SIDE")]
423 segment_client_side: bool,
424 #[clap(long, env = "TEST_ONLY_DUMMY_SEGMENT_CLIENT")]
427 test_only_dummy_segment_client: bool,
428 #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")]
434 launchdarkly_sdk_key: Option<String>,
435 #[clap(
442 long,
443 env = "LAUNCHDARKLY_KEY_MAP",
444 action = ArgAction::Append,
445 value_delimiter = ';'
446 )]
447 launchdarkly_key_map: Vec<KeyValueArg<String, String>>,
448 #[clap(
450 long,
451 env = "CONFIG_SYNC_TIMEOUT",
452 value_parser = humantime::parse_duration,
453 default_value = "30s"
454 )]
455 config_sync_timeout: Duration,
456 #[clap(
462 long,
463 env = "CONFIG_SYNC_LOOP_INTERVAL",
464 value_parser = humantime::parse_duration,
465 )]
466 config_sync_loop_interval: Option<Duration>,
467 #[clap(long, env = "CONFIG_SYNC_FILE_PATH", value_name = "PATH")]
470 config_sync_file_path: Option<PathBuf>,
471
472 #[clap(
474 long,
475 env = "ENVIRONMENT_ID",
476 value_name = "<CLOUD>-<REGION>-<ORG-ID>-<ORDINAL>"
477 )]
478 environment_id: EnvironmentId,
479 #[clap(long, env = "BOOTSTRAP_ROLE")]
488 bootstrap_role: Option<String>,
489 #[clap(
491 long,
492 env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
493 default_value = "scale=1,workers=1"
494 )]
495 bootstrap_default_cluster_replica_size: String,
496 #[clap(
498 long,
499 env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
500 default_value = "scale=1,workers=1"
501 )]
502 bootstrap_builtin_system_cluster_replica_size: String,
503 #[clap(
505 long,
506 env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
507 default_value = "scale=1,workers=1"
508 )]
509 bootstrap_builtin_catalog_server_cluster_replica_size: String,
510 #[clap(
512 long,
513 env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
514 default_value = "scale=1,workers=1"
515 )]
516 bootstrap_builtin_probe_cluster_replica_size: String,
517 #[clap(
519 long,
520 env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
521 default_value = "scale=1,workers=1"
522 )]
523 bootstrap_builtin_support_cluster_replica_size: String,
524 #[clap(
526 long,
527 env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
528 default_value = "scale=1,workers=1"
529 )]
530 bootstrap_builtin_analytics_cluster_replica_size: String,
531 #[clap(
532 long,
533 env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR",
534 default_value = DEFAULT_REPLICATION_FACTOR.to_string(),
535 value_parser = clap::value_parser!(u32).range(0..=2)
536 )]
537 bootstrap_default_cluster_replication_factor: u32,
538 #[clap(
540 long,
541 env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR",
542 default_value = SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
543 value_parser = clap::value_parser!(u32).range(0..=2)
544 )]
545 bootstrap_builtin_system_cluster_replication_factor: u32,
546 #[clap(
548 long,
549 env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICATION_FACTOR",
550 default_value = CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
551 value_parser = clap::value_parser!(u32).range(0..=2)
552 )]
553 bootstrap_builtin_catalog_server_cluster_replication_factor: u32,
554 #[clap(
556 long,
557 env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR",
558 default_value = PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
559 value_parser = clap::value_parser!(u32).range(0..=2)
560 )]
561 bootstrap_builtin_probe_cluster_replication_factor: u32,
562 #[clap(
564 long,
565 env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICATION_FACTOR",
566 default_value = SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
567 value_parser = clap::value_parser!(u32).range(0..=2)
568 )]
569 bootstrap_builtin_support_cluster_replication_factor: u32,
570 #[clap(
572 long,
573 env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICATION_FACTOR",
574 default_value = ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
575 value_parser = clap::value_parser!(u32).range(0..=2)
576 )]
577 bootstrap_builtin_analytics_cluster_replication_factor: u32,
578 #[clap(
581 long,
582 env = "SYSTEM_PARAMETER_DEFAULT",
583 action = ArgAction::Append,
584 value_delimiter = ';'
585 )]
586 system_parameter_default: Vec<KeyValueArg<String, String>>,
587 #[clap(long, env = "LICENSE_KEY")]
589 license_key: Option<String>,
590
591 #[clap(long, env = "AWS_ACCOUNT_ID")]
595 aws_account_id: Option<String>,
596 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
599 aws_connection_role_arn: Option<String>,
600 #[clap(long, env = "AWS_EXTERNAL_ID_PREFIX", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
604 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
605 #[clap(
608 long,
609 env = "AWS_PRIVATELINK_AVAILABILITY_ZONES",
610 action = ArgAction::Append,
611 use_value_delimiter = true
612 )]
613 aws_privatelink_availability_zones: Option<Vec<String>>,
614
615 #[clap(flatten)]
617 tracing: TracingCliArgs,
618
619 #[clap(long, value_enum, requires = "unsafe_mode")]
627 unsafe_builtin_table_fingerprint_whitespace: Option<UnsafeBuiltinTableFingerprintWhitespace>,
628 #[clap(long, requires = "unsafe_mode", default_value = "1")]
634 unsafe_builtin_table_fingerprint_whitespace_version: usize,
635}
636
637#[derive(ValueEnum, Debug, Clone)]
638enum OrchestratorKind {
639 Kubernetes,
640 Process,
641}
642
643fn aws_secrets_controller_prefix(env_id: &EnvironmentId) -> String {
646 format!("/user-managed/{}/", env_id)
647}
648fn aws_secrets_controller_key_alias(env_id: &EnvironmentId) -> String {
649 format!("alias/customer_key_{}", env_id)
652}
653
654pub fn main() {
655 let args = cli::parse_args(CliConfig {
656 env_prefix: Some("MZ_"),
657 enable_version_flag: true,
658 });
659 if let Err(err) = run(args) {
660 panic!("environmentd: fatal: {}", err.display_with_causes());
661 }
662}
663
664fn run(mut args: Args) -> Result<(), anyhow::Error> {
665 mz_ore::panic::install_enhanced_handler();
666 let envd_start = Instant::now();
667
668 sys::enable_sigusr2_coverage_dump()?;
671 sys::enable_termination_signal_cleanup()?;
672
673 let license_key = if let Some(license_key_file) = args.license_key {
674 let license_key_text = std::fs::read_to_string(&license_key_file)
675 .context("failed to open license key file")?;
676 let license_key = mz_license_keys::validate(
677 license_key_text.trim(),
678 &args.environment_id.organization_id().to_string(),
679 )
680 .context("failed to validate license key file")?;
681 if license_key.expired {
682 let message = format!(
683 "The license key provided at {license_key_file} is expired! Please contact Materialize for assistance."
684 );
685 match license_key.expiration_behavior {
686 ExpirationBehavior::Warn | ExpirationBehavior::DisableClusterCreation => {
687 warn!("{message}");
688 }
689 ExpirationBehavior::Disable => bail!("{message}"),
690 }
691 }
692 license_key
693 } else if matches!(args.orchestrator, OrchestratorKind::Kubernetes) {
694 bail!("--license-key is required when running in Kubernetes");
695 } else {
696 ValidatedLicenseKey::disabled()
698 };
699
700 if let Some(fingerprint_whitespace) = args.unsafe_builtin_table_fingerprint_whitespace {
702 assert!(args.unsafe_mode);
703 let whitespace = "\n".repeat(args.unsafe_builtin_table_fingerprint_whitespace_version);
704 *UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE
705 .lock()
706 .expect("lock poisoned") = Some((fingerprint_whitespace, whitespace));
707 }
708
709 let ncpus_useful = usize::max(1, cmp::min(num_cpus::get(), num_cpus::get_physical()));
712 let runtime = Arc::new(
713 tokio::runtime::Builder::new_multi_thread()
714 .worker_threads(ncpus_useful)
715 .thread_name_fn(|| {
718 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
719 let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
720 format!("tokio:work-{}", id)
721 })
722 .enable_all()
723 .build()?,
724 );
725
726 args.tracing.log_prefix = if matches!(args.orchestrator, OrchestratorKind::Process) {
730 Some("environmentd".to_string())
731 } else {
732 None
733 };
734
735 let metrics_registry = MetricsRegistry::new();
736 let (tracing_handle, _tracing_guard) = runtime.block_on(args.tracing.configure_tracing(
737 StaticTracingConfig {
738 service_name: "environmentd",
739 build_info: BUILD_INFO,
740 },
741 metrics_registry.clone(),
742 ))?;
743 register_runtime_metrics("main", runtime.metrics(), &metrics_registry);
744
745 let span = tracing::info_span!("environmentd::run").entered();
746
747 info!("startup: envd init: beginning");
748 info!("startup: envd init: preamble beginning");
749
750 let metrics = Metrics::register_into(&metrics_registry, BUILD_INFO);
751
752 runtime.block_on(mz_alloc::register_metrics_into(&metrics_registry));
753 runtime.block_on(mz_metrics::register_metrics_into(
754 &metrics_registry,
755 mz_dyncfgs::all_dyncfgs(),
756 ));
757
758 let _failpoint_scenario = FailScenario::setup();
760
761 let tls = args.tls.into_config()?;
763 let frontegg = FronteggAuthenticator::from_args(args.frontegg, &metrics_registry)?;
764 let listeners_config: ListenersConfig = {
765 let f = File::open(args.listeners_config_path)?;
766 serde_json::from_reader(f)?
767 };
768
769 for (_, listener) in &listeners_config.sql {
770 listener
771 .validate()
772 .map_err(|e| anyhow::anyhow!("invalid SQL listener: {}", e))?;
773 }
774
775 for (_, listener) in &listeners_config.http {
776 listener
777 .validate()
778 .map_err(|e| anyhow::anyhow!("invalid HTTP listener: {}", e))?;
779 }
780
781 let allowed_origins = if !args.cors_allowed_origin.is_empty() {
783 args.cors_allowed_origin
784 } else {
785 let mut allowed_origins = Vec::with_capacity(listeners_config.http.len() * 6);
786 for (_, listener) in &listeners_config.http {
787 let port = listener.addr().port();
788 allowed_origins.extend([
789 HeaderValue::from_str(&format!("http://localhost:{}", port)).unwrap(),
790 HeaderValue::from_str(&format!("http://127.0.0.1:{}", port)).unwrap(),
791 HeaderValue::from_str(&format!("http://[::1]:{}", port)).unwrap(),
792 HeaderValue::from_str(&format!("https://localhost:{}", port)).unwrap(),
793 HeaderValue::from_str(&format!("https://127.0.0.1:{}", port)).unwrap(),
794 HeaderValue::from_str(&format!("https://[::1]:{}", port)).unwrap(),
795 ])
796 }
797 allowed_origins
798 };
799 let cors_allowed_origin = mz_http_util::build_cors_allowed_origin(&allowed_origins);
800
801 let entered = info_span!("environmentd::configure_controller").entered();
803 let (orchestrator, secrets_controller, cloud_resource_controller): (
804 Arc<dyn Orchestrator>,
805 Arc<dyn SecretsController>,
806 Option<Arc<dyn CloudResourceController>>,
807 ) = match args.orchestrator {
808 OrchestratorKind::Kubernetes => {
809 if args.orchestrator_process_scratch_directory.is_some() {
810 bail!(
811 "--orchestrator-process-scratch-directory is \
812 not currently usable with the kubernetes orchestrator"
813 );
814 }
815
816 let orchestrator = Arc::new(
817 runtime
818 .block_on(KubernetesOrchestrator::new(KubernetesOrchestratorConfig {
819 context: args.orchestrator_kubernetes_context.clone(),
820 scheduler_name: args.orchestrator_kubernetes_scheduler_name,
821 service_annotations: args
822 .orchestrator_kubernetes_service_annotation
823 .into_iter()
824 .map(|l| (l.key, l.value))
825 .collect(),
826 service_labels: args
827 .orchestrator_kubernetes_service_label
828 .into_iter()
829 .map(|l| (l.key, l.value))
830 .collect(),
831 service_node_selector: args
832 .orchestrator_kubernetes_service_node_selector
833 .into_iter()
834 .map(|l| (l.key, l.value))
835 .collect(),
836 service_affinity: args.orchestrator_kubernetes_service_affinity,
837 service_tolerations: args.orchestrator_kubernetes_service_tolerations,
838 service_account: args.orchestrator_kubernetes_service_account,
839 image_pull_policy: args.orchestrator_kubernetes_image_pull_policy,
840 aws_external_id_prefix: args.aws_external_id_prefix.clone(),
841 coverage: args.orchestrator_kubernetes_coverage,
842 ephemeral_volume_storage_class: args
843 .orchestrator_kubernetes_ephemeral_volume_class
844 .clone(),
845 service_fs_group: args.orchestrator_kubernetes_service_fs_group.clone(),
846 name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
847 collect_pod_metrics: !args
848 .orchestrator_kubernetes_disable_pod_metrics_collection,
849 enable_prometheus_scrape_annotations: args
850 .orchestrator_kubernetes_enable_prometheus_scrape_annotations,
851 }))
852 .context("creating kubernetes orchestrator")?,
853 );
854 let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
855 SecretsControllerKind::Kubernetes => {
856 let sc = Arc::clone(&orchestrator);
857 let sc: Arc<dyn SecretsController> = sc;
858 sc
859 }
860 SecretsControllerKind::AwsSecretsManager => {
861 Arc::new(
862 runtime.block_on(AwsSecretsController::new(
863 &aws_secrets_controller_prefix(&args.environment_id),
866 &aws_secrets_controller_key_alias(&args.environment_id),
867 args.aws_secrets_controller_tags
868 .into_iter()
869 .map(|tag| (tag.key, tag.value))
870 .collect(),
871 )),
872 )
873 }
874 SecretsControllerKind::LocalFile => bail!(
875 "SecretsControllerKind::LocalFile is not compatible with Orchestrator::Kubernetes."
876 ),
877 };
878 let cloud_resource_controller = Arc::clone(&orchestrator);
879 (
880 orchestrator,
881 secrets_controller,
882 Some(cloud_resource_controller),
883 )
884 }
885 OrchestratorKind::Process => {
886 if args
887 .orchestrator_kubernetes_ephemeral_volume_class
888 .is_some()
889 {
890 bail!(
891 "--orchestrator-kubernetes-ephemeral-volume-class is \
892 not usable with the process orchestrator"
893 );
894 }
895 let orchestrator = Arc::new(
896 runtime
897 .block_on(ProcessOrchestrator::new(ProcessOrchestratorConfig {
898 image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
904 suppress_output: false,
905 environment_id: args.environment_id.to_string(),
906 secrets_dir: args
907 .orchestrator_process_secrets_directory
908 .clone()
909 .expect("clap enforced"),
910 command_wrapper: args
911 .orchestrator_process_wrapper
912 .map_or(Ok(vec![]), |s| shell_words::split(&s))?,
913 propagate_crashes: args.orchestrator_process_propagate_crashes,
914 tcp_proxy: args.orchestrator_process_tcp_proxy_listen_addr.map(
915 |listen_addr| ProcessOrchestratorTcpProxyConfig {
916 listen_addr,
917 prometheus_service_discovery_dir: args
918 .orchestrator_process_prometheus_service_discovery_directory,
919 },
920 ),
921 scratch_directory: args
922 .orchestrator_process_scratch_directory
923 .expect("process orchestrator requires scratch directory"),
924 }))
925 .context("creating process orchestrator")?,
926 );
927 let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
928 SecretsControllerKind::Kubernetes => bail!(
929 "SecretsControllerKind::Kubernetes is not compatible with Orchestrator::Process."
930 ),
931 SecretsControllerKind::AwsSecretsManager => Arc::new(
932 runtime.block_on(AwsSecretsController::new(
933 &aws_secrets_controller_prefix(&args.environment_id),
934 &aws_secrets_controller_key_alias(&args.environment_id),
935 args.aws_secrets_controller_tags
936 .into_iter()
937 .map(|tag| (tag.key, tag.value))
938 .collect(),
939 )),
940 ),
941 SecretsControllerKind::LocalFile => {
942 let sc = Arc::clone(&orchestrator);
943 let sc: Arc<dyn SecretsController> = sc;
944 sc
945 }
946 };
947 (orchestrator, secrets_controller, None)
948 }
949 };
950 drop(entered);
951 let cloud_resource_reader = cloud_resource_controller.as_ref().map(|c| c.reader());
952 let secrets_reader = secrets_controller.reader();
953 let now = SYSTEM_TIME.clone();
954
955 let mut persist_config =
956 PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
957 persist_config.disable_compaction();
959
960 let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
961 let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
962
963 match args.persist_isolated_runtime_threads {
964 None | Some(0) => (),
966 Some(x @ ..=-1) => {
967 let threads = num_cpus::get().saturating_add_signed(x).max(1);
968 persist_config.isolated_runtime_worker_threads = threads;
969 }
970 Some(x @ 1..) => {
971 let threads = usize::try_from(x).expect("pattern matched a positive value");
972 persist_config.isolated_runtime_worker_threads = threads;
973 }
974 };
975
976 let _server = runtime.spawn_named(
977 || "persist::rpc::server",
978 async move {
979 info!(
980 "listening for Persist PubSub connections on {}",
981 args.internal_persist_pubsub_listen_addr
982 );
983 let res = persist_pubsub_server
986 .serve(args.internal_persist_pubsub_listen_addr)
987 .await;
988 error!("Persist Pubsub server exited {:?}", res);
989 }
990 .instrument(tracing::info_span!("persist::rpc::server")),
991 );
992
993 let persist_clients = {
994 let _tokio_guard = runtime.enter();
996 PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
997 let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
998 cfg,
999 persist_pubsub_client.sender,
1000 metrics,
1001 ));
1002 PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
1003 })
1004 };
1005
1006 let consensus_uri = args.persist_consensus_url.unwrap_or_else(|| {
1007 args.metadata_backend_url
1008 .as_ref()
1009 .map(|metadata_backend_url| {
1010 SensitiveUrl(
1011 Url::parse_with_params(
1012 metadata_backend_url.0.as_ref(),
1013 &[("options", "--search_path=consensus")],
1014 )
1015 .unwrap(),
1016 )
1017 })
1018 .expect("either --persist-consensus-url or --metadata-backend-url must be provided")
1019 });
1020 let timestamp_oracle_url = args.timestamp_oracle_url.or_else(|| {
1021 args.metadata_backend_url
1022 .as_ref()
1023 .map(|metadata_backend_url| {
1024 SensitiveUrl(
1025 Url::parse_with_params(
1026 metadata_backend_url.0.as_ref(),
1027 &[("options", "--search_path=tsoracle")],
1028 )
1029 .unwrap(),
1030 )
1031 })
1032 });
1033
1034 let persist_clients = Arc::new(persist_clients);
1035 let connection_context = ConnectionContext::from_cli_args(
1036 args.environment_id.to_string(),
1037 &args.tracing.startup_log_filter,
1038 args.aws_external_id_prefix,
1039 args.aws_connection_role_arn,
1040 secrets_reader,
1041 cloud_resource_reader,
1042 );
1043 let orchestrator = Arc::new(TracingOrchestrator::new(orchestrator, args.tracing.clone()));
1044 let controller = ControllerConfig {
1045 build_info: &BUILD_INFO,
1046 orchestrator,
1047 persist_location: PersistLocation {
1048 blob_uri: args.persist_blob_url,
1049 consensus_uri,
1050 },
1051 persist_clients: Arc::clone(&persist_clients),
1052 clusterd_image: args.clusterd_image.expect("clap enforced"),
1053 init_container_image: args.orchestrator_kubernetes_init_container_image,
1054 deploy_generation: args.deploy_generation,
1055 now: SYSTEM_TIME.clone(),
1056 metrics_registry: metrics_registry.clone(),
1057 persist_pubsub_url: args.persist_pubsub_url,
1058 connection_context,
1059 secrets_args: SecretsReaderCliArgs {
1062 secrets_reader: args.secrets_controller,
1063 secrets_reader_local_file_dir: args.orchestrator_process_secrets_directory,
1064 secrets_reader_kubernetes_context: Some(args.orchestrator_kubernetes_context),
1065 secrets_reader_aws_prefix: Some(aws_secrets_controller_prefix(&args.environment_id)),
1066 secrets_reader_name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
1067 },
1068 };
1069
1070 let cluster_replica_sizes = ClusterReplicaSizeMap::parse_from_str(
1071 &args.cluster_replica_sizes,
1072 !license_key.allow_credit_consumption_override,
1073 )
1074 .context("parsing replica size map")?;
1075
1076 emit_boot_diagnostics!(&BUILD_INFO);
1077 sys::adjust_rlimits();
1078
1079 info!(
1080 "startup: envd init: preamble complete in {:?}",
1081 envd_start.elapsed()
1082 );
1083
1084 let serve_start = Instant::now();
1085 info!("startup: envd init: serving beginning");
1086 let server = runtime.block_on(async {
1087 let listeners = Listeners::bind(listeners_config).await?;
1088 let catalog_config = CatalogConfig {
1089 persist_clients,
1090 metrics: Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry)),
1091 };
1092 let server = listeners
1093 .serve(crate::Config {
1094 unsafe_mode: args.unsafe_mode,
1096 all_features: args.all_features,
1097 tls,
1099 tls_reload_certs: mz_server_core::default_cert_reload_ticker(),
1100 external_login_password_mz_system: args.external_login_password_mz_system,
1101 frontegg,
1102 cors_allowed_origin,
1103 egress_addresses: args.announce_egress_address,
1104 http_host_name: args.http_host_name,
1105 internal_console_redirect_url: args.internal_console_redirect_url,
1106 controller,
1108 secrets_controller,
1109 cloud_resource_controller,
1110 storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
1112 storage_usage_retention_period: args.storage_usage_retention_period,
1113 catalog_config,
1115 availability_zones: args.availability_zone,
1116 cluster_replica_sizes,
1117 timestamp_oracle_url,
1118 segment_api_key: args.segment_api_key,
1119 segment_client_side: args.segment_client_side,
1120 test_only_dummy_segment_client: args.test_only_dummy_segment_client,
1121 launchdarkly_sdk_key: args.launchdarkly_sdk_key,
1122 launchdarkly_key_map: args
1123 .launchdarkly_key_map
1124 .into_iter()
1125 .map(|kv| (kv.key, kv.value))
1126 .collect(),
1127 config_sync_timeout: args.config_sync_timeout,
1128 config_sync_loop_interval: args.config_sync_loop_interval,
1129 config_sync_file_path: args.config_sync_file_path,
1130
1131 environment_id: args.environment_id,
1133 bootstrap_role: args.bootstrap_role,
1134 bootstrap_default_cluster_replica_size: args.bootstrap_default_cluster_replica_size,
1135 bootstrap_default_cluster_replication_factor: args
1136 .bootstrap_default_cluster_replication_factor,
1137 bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1138 size: args.bootstrap_builtin_system_cluster_replica_size,
1139 replication_factor: args.bootstrap_builtin_system_cluster_replication_factor,
1140 },
1141 bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1142 size: args.bootstrap_builtin_catalog_server_cluster_replica_size,
1143 replication_factor: args
1144 .bootstrap_builtin_catalog_server_cluster_replication_factor,
1145 },
1146 bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1147 size: args.bootstrap_builtin_probe_cluster_replica_size,
1148 replication_factor: args.bootstrap_builtin_probe_cluster_replication_factor,
1149 },
1150 bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1151 size: args.bootstrap_builtin_support_cluster_replica_size,
1152 replication_factor: args.bootstrap_builtin_support_cluster_replication_factor,
1153 },
1154 bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1155 size: args.bootstrap_builtin_analytics_cluster_replica_size,
1156 replication_factor: args.bootstrap_builtin_analytics_cluster_replication_factor,
1157 },
1158 system_parameter_defaults: args
1159 .system_parameter_default
1160 .into_iter()
1161 .map(|kv| (kv.key, kv.value))
1162 .collect(),
1163 helm_chart_version: args.helm_chart_version.clone(),
1164 license_key,
1165 aws_account_id: args.aws_account_id,
1167 aws_privatelink_availability_zones: args.aws_privatelink_availability_zones,
1168 metrics_registry,
1170 tracing_handle,
1171 now,
1173 })
1174 .await
1175 .maybe_terminate("booting server")?;
1176 Ok::<_, anyhow::Error>(server)
1177 })?;
1178 info!(
1179 "startup: envd init: serving complete in {:?}",
1180 serve_start.elapsed()
1181 );
1182
1183 let start_duration = envd_start.elapsed();
1184 metrics
1185 .start_time_environmentd
1186 .set(start_duration.as_millis().try_into().expect("must fit"));
1187 let span = span.exit();
1188 let id = span.context().span().span_context().trace_id();
1189 drop(span);
1190
1191 info!("startup: envd init: complete in {start_duration:?}");
1192
1193 println!(
1194 "environmentd {} listening...",
1195 BUILD_INFO.human_version(args.helm_chart_version)
1196 );
1197 for (name, handle) in &server.sql_listener_handles {
1198 println!("{} SQL address: {}", name, handle.local_addr);
1199 }
1200 for (name, handle) in &server.http_listener_handles {
1201 println!("{} HTTP address: {}", name, handle.local_addr);
1202 }
1203 println!(
1205 " Internal Persist PubSub address: {}",
1206 args.internal_persist_pubsub_listen_addr
1207 );
1208
1209 println!(" Root trace ID: {id}");
1210
1211 loop {
1213 thread::park();
1214 }
1215}
1216
1217fn build_info() -> Vec<String> {
1218 let openssl_version =
1219 unsafe { CStr::from_ptr(openssl_sys::OpenSSL_version(openssl_sys::OPENSSL_VERSION)) };
1220 let rdkafka_version = unsafe { CStr::from_ptr(rdkafka_sys::bindings::rd_kafka_version_str()) };
1221 vec![
1222 openssl_version.to_string_lossy().into_owned(),
1223 format!("librdkafka v{}", rdkafka_version.to_string_lossy()),
1224 ]
1225}
1226
1227#[derive(Debug, Clone)]
1228struct Metrics {
1229 pub start_time_environmentd: IntGauge,
1230}
1231
1232impl Metrics {
1233 pub fn register_into(registry: &MetricsRegistry, build_info: BuildInfo) -> Metrics {
1234 Metrics {
1235 start_time_environmentd: registry.register(metric!(
1236 name: "mz_start_time_environmentd",
1237 help: "Time in milliseconds from environmentd start until the adapter is ready.",
1238 const_labels: {
1239 "version" => build_info.version,
1240 "build_type" => if cfg!(release) { "release" } else { "debug" }
1241 },
1242 )),
1243 }
1244 }
1245}