1use std::ffi::CStr;
16use std::fs::File;
17use std::net::{IpAddr, SocketAddr};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::LazyLock;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use std::{cmp, env, iter, thread};
24
25use anyhow::{Context, bail};
26use clap::{ArgAction, Parser, ValueEnum};
27use fail::FailScenario;
28use http::header::HeaderValue;
29use ipnet::IpNet;
30use itertools::Itertools;
31use mz_adapter::ResultExt;
32use mz_adapter_types::bootstrap_builtin_cluster_config::{
33 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
34 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, DEFAULT_REPLICATION_FACTOR,
35 PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR, SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
36 SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
37};
38use mz_auth::password::Password;
39use mz_aws_secrets_controller::AwsSecretsController;
40use mz_build_info::BuildInfo;
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController};
43use mz_controller::{ControllerConfig, ReplicaHttpLocator};
44use mz_frontegg_auth::{Authenticator as FronteggAuthenticator, FronteggCliArgs};
45use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
46use mz_orchestrator::Orchestrator;
47use mz_orchestrator_kubernetes::{
48 KubernetesImagePullPolicy, KubernetesOrchestrator, KubernetesOrchestratorConfig,
49};
50use mz_orchestrator_process::{
51 ProcessOrchestrator, ProcessOrchestratorConfig, ProcessOrchestratorTcpProxyConfig,
52};
53use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs, TracingOrchestrator};
54use mz_ore::cli::{self, CliConfig, KeyValueArg};
55use mz_ore::error::ErrorExt;
56use mz_ore::metric;
57use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
58use mz_ore::now::SYSTEM_TIME;
59use mz_ore::task::RuntimeExt;
60use mz_ore::url::SensitiveUrl;
61use mz_persist_client::PersistLocation;
62use mz_persist_client::cache::PersistClientCache;
63use mz_persist_client::cfg::PersistConfig;
64use mz_persist_client::rpc::{
65 MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
66};
67use mz_secrets::SecretsController;
68use mz_server_core::TlsCliArgs;
69use mz_service::emit_boot_diagnostics;
70use mz_service::secrets::{SecretsControllerKind, SecretsReaderCliArgs};
71use mz_sql::catalog::EnvironmentId;
72use mz_storage_types::connections::ConnectionContext;
73use opentelemetry::trace::TraceContextExt;
74use prometheus::IntGauge;
75use tracing::{Instrument, error, info, info_span, warn};
76use tracing_opentelemetry::OpenTelemetrySpanExt;
77use url::Url;
78
79use crate::environmentd::sys;
80use crate::{BUILD_INFO, CatalogConfig, ListenerConfig, Listeners, ListenersConfig};
81
82static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
83static LONG_VERSION: LazyLock<String> = LazyLock::new(|| {
84 iter::once(BUILD_INFO.human_version(None))
85 .chain(build_info())
86 .join("\n")
87});
88
89#[derive(Parser, Debug)]
91#[clap(
92 name = "environmentd",
93 next_line_help = true,
94 version = VERSION.as_str(),
95 long_version = LONG_VERSION.as_str(),
96)]
97pub struct Args {
98 #[clap(long, env = "UNSAFE_MODE")]
102 unsafe_mode: bool,
103 #[clap(long, env = "ALL_FEATURES")]
106 all_features: bool,
107
108 #[clap(
112 long,
113 env = "LISTENERS_CONFIG_PATH",
114 value_name = "PATH",
115 action = ArgAction::Set,
116 )]
117 listeners_config_path: PathBuf,
118 #[clap(
120 long,
121 env = "EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM",
122 action = ArgAction::Set,
123 )]
124 external_login_password_mz_system: Option<Password>,
125 #[clap(
131 long,
132 value_name = "HOST:PORT",
133 env = "INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR",
134 default_value = "127.0.0.1:6879",
135 action = ArgAction::Set,
136 )]
137 internal_persist_pubsub_listen_addr: SocketAddr,
138 #[structopt(long, env = "CORS_ALLOWED_ORIGIN")]
147 cors_allowed_origin: Vec<HeaderValue>,
148 #[clap(
151 long,
152 env = "ANNOUNCE_EGRESS_ADDRESS",
153 action = ArgAction::Append,
154 use_value_delimiter = true
155 )]
156 announce_egress_address: Vec<IpNet>,
157 #[clap(long, env = "HTTP_HOST_NAME")]
163 http_host_name: Option<String>,
164 #[clap(long, env = "INTERNAL_CONSOLE_REDIRECT_URL")]
167 internal_console_redirect_url: Option<String>,
168 #[clap(flatten)]
170 tls: TlsCliArgs,
171 #[clap(flatten)]
173 frontegg: FronteggCliArgs,
174 #[structopt(long, value_enum, env = "ORCHESTRATOR")]
177 orchestrator: OrchestratorKind,
178 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SCHEDULER_NAME")]
180 orchestrator_kubernetes_scheduler_name: Option<String>,
181 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ANNOTATION")]
184 orchestrator_kubernetes_service_annotation: Vec<KeyValueArg<String, String>>,
185 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_LABEL")]
188 orchestrator_kubernetes_service_label: Vec<KeyValueArg<String, String>>,
189 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_NODE_SELECTOR")]
192 orchestrator_kubernetes_service_node_selector: Vec<KeyValueArg<String, String>>,
193 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_AFFINITY")]
196 orchestrator_kubernetes_service_affinity: Option<String>,
197 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_TOLERATIONS")]
200 orchestrator_kubernetes_service_tolerations: Option<String>,
201 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ACCOUNT")]
204 orchestrator_kubernetes_service_account: Option<String>,
205 #[structopt(
210 long,
211 env = "ORCHESTRATOR_KUBERNETES_CONTEXT",
212 default_value = "minikube"
213 )]
214 orchestrator_kubernetes_context: String,
215 #[structopt(
218 long,
219 env = "ORCHESTRATOR_KUBERNETES_IMAGE_PULL_POLICY",
220 default_value = "always",
221 value_enum
222 )]
223 orchestrator_kubernetes_image_pull_policy: KubernetesImagePullPolicy,
224 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_INIT_CONTAINER_IMAGE")]
226 orchestrator_kubernetes_init_container_image: Option<String>,
227 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_EPHEMERAL_VOLUME_CLASS")]
233 orchestrator_kubernetes_ephemeral_volume_class: Option<String>,
234 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_FS_GROUP")]
236 orchestrator_kubernetes_service_fs_group: Option<i64>,
237 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_NAME_PREFIX")]
239 orchestrator_kubernetes_name_prefix: Option<String>,
240 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_DISABLE_POD_METRICS_COLLECTION")]
245 orchestrator_kubernetes_disable_pod_metrics_collection: bool,
246 #[clap(
248 long,
249 env = "ORCHESTRATOR_KUBERNETES_ENABLE_PROMETHEUS_SCRAPE_ANNOTATIONS"
250 )]
251 orchestrator_kubernetes_enable_prometheus_scrape_annotations: bool,
252 #[clap(long, env = "ORCHESTRATOR_PROCESS_WRAPPER")]
253 orchestrator_process_wrapper: Option<String>,
254 #[clap(
256 long,
257 env = "ORCHESTRATOR_PROCESS_SECRETS_DIRECTORY",
258 value_name = "PATH",
259 required_if_eq("orchestrator", "process")
260 )]
261 orchestrator_process_secrets_directory: Option<PathBuf>,
262 #[clap(long, env = "ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES")]
265 orchestrator_process_propagate_crashes: bool,
266 #[clap(long, env = "ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR")]
278 orchestrator_process_tcp_proxy_listen_addr: Option<IpAddr>,
279 #[clap(
292 long,
293 env = "ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY"
294 )]
295 orchestrator_process_prometheus_service_discovery_directory: Option<PathBuf>,
296 #[clap(
298 long,
299 env = "ORCHESTRATOR_PROCESS_SCRATCH_DIRECTORY",
300 value_name = "PATH"
301 )]
302 orchestrator_process_scratch_directory: Option<PathBuf>,
303 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_COVERAGE")]
306 orchestrator_kubernetes_coverage: bool,
307 #[structopt(
309 long,
310 value_enum,
311 env = "SECRETS_CONTROLLER",
312 default_value_ifs([
313 ("orchestrator", "kubernetes", Some("kubernetes")),
314 ("orchestrator", "process", Some("local-file"))
315 ]),
316 default_value("kubernetes"), )]
318 secrets_controller: SecretsControllerKind,
319 #[clap(
322 long,
323 env = "AWS_SECRETS_CONTROLLER_TAGS",
324 action = ArgAction::Append,
325 value_delimiter = ';',
326 required_if_eq("secrets_controller", "aws-secrets-manager")
327 )]
328 aws_secrets_controller_tags: Vec<KeyValueArg<String, String>>,
329 #[structopt(
331 long,
332 env = "CLUSTERD_IMAGE",
333 required_if_eq("orchestrator", "kubernetes"),
334 default_value_if("orchestrator", "process", Some("clusterd"))
335 )]
336 clusterd_image: Option<String>,
337 #[clap(long, env = "DEPLOY_GENERATION", default_value = "0")]
342 deploy_generation: u64,
343
344 #[clap(
347 long,
348 env = "METADATA_BACKEND_URL",
349 conflicts_with_all = &[
350 "persist_consensus_url",
351 "timestamp_oracle_url",
352 ],
353 )]
354 metadata_backend_url: Option<SensitiveUrl>,
355
356 #[clap(long, env = "HELM_CHART_VERSION")]
361 helm_chart_version: Option<String>,
362
363 #[clap(long, env = "PERSIST_BLOB_URL")]
366 persist_blob_url: SensitiveUrl,
367 #[clap(long, env = "PERSIST_CONSENSUS_URL")]
369 persist_consensus_url: Option<SensitiveUrl>,
370 #[clap(
374 long,
375 env = "PERSIST_PUBSUB_URL",
376 default_value = "http://localhost:6879"
377 )]
378 persist_pubsub_url: String,
379 #[clap(long, env = "PERSIST_ISOLATED_RUNTIME_THREADS")]
383 persist_isolated_runtime_threads: Option<isize>,
384 #[clap(
386 long,
387 env = "STORAGE_USAGE_COLLECTION_INTERVAL",
388 value_parser = humantime::parse_duration,
389 default_value = "3600s"
390 )]
391 storage_usage_collection_interval_sec: Duration,
392 #[clap(long, env = "STORAGE_USAGE_RETENTION_PERIOD", value_parser = humantime::parse_duration)]
396 storage_usage_retention_period: Option<Duration>,
397
398 #[clap(long, env = "TIMESTAMP_ORACLE_URL", value_name = "POSTGRES_URL")]
401 timestamp_oracle_url: Option<SensitiveUrl>,
402 #[clap(long, env = "AVAILABILITY_ZONE", use_value_delimiter = true)]
405 availability_zone: Vec<String>,
406 #[clap(
408 long,
409 env = "CLUSTER_REPLICA_SIZES",
410 requires = "bootstrap_default_cluster_replica_size"
411 )]
412 cluster_replica_sizes: String,
413 #[clap(long, env = "SEGMENT_API_KEY")]
415 segment_api_key: Option<String>,
416 #[clap(long, env = "SEGMENT_CLIENT_SIDE")]
422 segment_client_side: bool,
423 #[clap(long, env = "TEST_ONLY_DUMMY_SEGMENT_CLIENT")]
426 test_only_dummy_segment_client: bool,
427 #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")]
433 launchdarkly_sdk_key: Option<String>,
434 #[clap(
441 long,
442 env = "LAUNCHDARKLY_KEY_MAP",
443 action = ArgAction::Append,
444 value_delimiter = ';'
445 )]
446 launchdarkly_key_map: Vec<KeyValueArg<String, String>>,
447 #[clap(
449 long,
450 env = "CONFIG_SYNC_TIMEOUT",
451 value_parser = humantime::parse_duration,
452 default_value = "30s"
453 )]
454 config_sync_timeout: Duration,
455 #[clap(
461 long,
462 env = "CONFIG_SYNC_LOOP_INTERVAL",
463 value_parser = humantime::parse_duration,
464 )]
465 config_sync_loop_interval: Option<Duration>,
466 #[clap(long, env = "CONFIG_SYNC_FILE_PATH", value_name = "PATH")]
469 config_sync_file_path: Option<PathBuf>,
470
471 #[clap(
473 long,
474 env = "ENVIRONMENT_ID",
475 value_name = "<CLOUD>-<REGION>-<ORG-ID>-<ORDINAL>"
476 )]
477 environment_id: EnvironmentId,
478 #[clap(long, env = "BOOTSTRAP_ROLE")]
487 bootstrap_role: Option<String>,
488 #[clap(
490 long,
491 env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
492 default_value = "scale=1,workers=1"
493 )]
494 bootstrap_default_cluster_replica_size: String,
495 #[clap(
497 long,
498 env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
499 default_value = "scale=1,workers=1"
500 )]
501 bootstrap_builtin_system_cluster_replica_size: String,
502 #[clap(
504 long,
505 env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
506 default_value = "scale=1,workers=1"
507 )]
508 bootstrap_builtin_catalog_server_cluster_replica_size: String,
509 #[clap(
511 long,
512 env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
513 default_value = "scale=1,workers=1"
514 )]
515 bootstrap_builtin_probe_cluster_replica_size: String,
516 #[clap(
518 long,
519 env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
520 default_value = "scale=1,workers=1"
521 )]
522 bootstrap_builtin_support_cluster_replica_size: String,
523 #[clap(
525 long,
526 env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
527 default_value = "scale=1,workers=1"
528 )]
529 bootstrap_builtin_analytics_cluster_replica_size: String,
530 #[clap(
531 long,
532 env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR",
533 default_value = DEFAULT_REPLICATION_FACTOR.to_string(),
534 value_parser = clap::value_parser!(u32).range(0..=2)
535 )]
536 bootstrap_default_cluster_replication_factor: u32,
537 #[clap(
539 long,
540 env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR",
541 default_value = SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
542 value_parser = clap::value_parser!(u32).range(0..=2)
543 )]
544 bootstrap_builtin_system_cluster_replication_factor: u32,
545 #[clap(
547 long,
548 env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICATION_FACTOR",
549 default_value = CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
550 value_parser = clap::value_parser!(u32).range(0..=2)
551 )]
552 bootstrap_builtin_catalog_server_cluster_replication_factor: u32,
553 #[clap(
555 long,
556 env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR",
557 default_value = PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
558 value_parser = clap::value_parser!(u32).range(0..=2)
559 )]
560 bootstrap_builtin_probe_cluster_replication_factor: u32,
561 #[clap(
563 long,
564 env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICATION_FACTOR",
565 default_value = SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
566 value_parser = clap::value_parser!(u32).range(0..=2)
567 )]
568 bootstrap_builtin_support_cluster_replication_factor: u32,
569 #[clap(
571 long,
572 env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICATION_FACTOR",
573 default_value = ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
574 value_parser = clap::value_parser!(u32).range(0..=2)
575 )]
576 bootstrap_builtin_analytics_cluster_replication_factor: u32,
577 #[clap(
580 long,
581 env = "SYSTEM_PARAMETER_DEFAULT",
582 action = ArgAction::Append,
583 value_delimiter = ';'
584 )]
585 system_parameter_default: Vec<KeyValueArg<String, String>>,
586 #[clap(long, env = "LICENSE_KEY")]
588 license_key: Option<String>,
589
590 #[clap(long, env = "AWS_ACCOUNT_ID")]
594 aws_account_id: Option<String>,
595 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
598 aws_connection_role_arn: Option<String>,
599 #[clap(long, env = "AWS_EXTERNAL_ID_PREFIX", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
603 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
604 #[clap(
607 long,
608 env = "AWS_PRIVATELINK_AVAILABILITY_ZONES",
609 action = ArgAction::Append,
610 use_value_delimiter = true
611 )]
612 aws_privatelink_availability_zones: Option<Vec<String>>,
613
614 #[clap(flatten)]
616 tracing: TracingCliArgs,
617
618 #[clap(long, value_enum, requires = "unsafe_mode")]
625 unsafe_force_builtin_schema_migration: Option<String>,
626}
627
628#[derive(ValueEnum, Debug, Clone)]
629enum OrchestratorKind {
630 Kubernetes,
631 Process,
632}
633
634fn aws_secrets_controller_prefix(env_id: &EnvironmentId) -> String {
637 format!("/user-managed/{}/", env_id)
638}
639fn aws_secrets_controller_key_alias(env_id: &EnvironmentId) -> String {
640 format!("alias/customer_key_{}", env_id)
643}
644
645pub fn main() {
646 let args = cli::parse_args(CliConfig {
647 env_prefix: Some("MZ_"),
648 enable_version_flag: true,
649 });
650 if let Err(err) = run(args) {
651 panic!("environmentd: fatal: {}", err.display_with_causes());
652 }
653}
654
655fn run(mut args: Args) -> Result<(), anyhow::Error> {
656 mz_ore::panic::install_enhanced_handler();
657 let envd_start = Instant::now();
658
659 sys::enable_sigusr2_coverage_dump()?;
662 sys::enable_termination_signal_cleanup()?;
663
664 let license_key = if let Some(license_key_file) = args.license_key {
665 let license_key_text = std::fs::read_to_string(&license_key_file)
666 .context("failed to open license key file")?;
667 let license_key = mz_license_keys::validate(license_key_text.trim())
668 .context("failed to validate license key file")?;
669 if license_key.expired {
670 let message = format!(
671 "The license key provided at {license_key_file} is expired! Please contact Materialize for assistance."
672 );
673 match license_key.expiration_behavior {
674 ExpirationBehavior::Warn | ExpirationBehavior::DisableClusterCreation => {
675 warn!("{message}");
676 }
677 ExpirationBehavior::Disable => bail!("{message}"),
678 }
679 }
680 license_key
681 } else if matches!(args.orchestrator, OrchestratorKind::Kubernetes) {
682 bail!("--license-key is required when running in Kubernetes");
683 } else {
684 ValidatedLicenseKey::disabled()
686 };
687
688 let force_builtin_schema_migration = args
690 .unsafe_force_builtin_schema_migration
691 .inspect(|_mechanism| assert!(args.unsafe_mode));
692
693 let ncpus_useful = usize::max(1, cmp::min(num_cpus::get(), num_cpus::get_physical()));
696 let runtime = Arc::new(
697 tokio::runtime::Builder::new_multi_thread()
698 .worker_threads(ncpus_useful)
699 .thread_stack_size(3 * 1024 * 1024) .thread_name_fn(|| {
703 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
704 let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
705 format!("tokio:work-{}", id)
706 })
707 .enable_all()
708 .build()?,
709 );
710
711 args.tracing.log_prefix = if matches!(args.orchestrator, OrchestratorKind::Process) {
715 Some("environmentd".to_string())
716 } else {
717 None
718 };
719
720 let metrics_registry = MetricsRegistry::new();
721 let tracing_handle = runtime.block_on(args.tracing.configure_tracing(
722 StaticTracingConfig {
723 service_name: "environmentd",
724 build_info: BUILD_INFO,
725 },
726 metrics_registry.clone(),
727 ))?;
728 register_runtime_metrics("main", runtime.metrics(), &metrics_registry);
729
730 let span = tracing::info_span!("environmentd::run").entered();
731
732 info!("startup: envd init: beginning");
733 info!("startup: envd init: preamble beginning");
734
735 let metrics = Metrics::register_into(&metrics_registry, BUILD_INFO);
736
737 runtime.block_on(mz_alloc::register_metrics_into(&metrics_registry));
738 runtime.block_on(mz_metrics::register_metrics_into(
739 &metrics_registry,
740 mz_dyncfgs::all_dyncfgs(),
741 ));
742
743 let _failpoint_scenario = FailScenario::setup();
745
746 let tls = args.tls.into_config()?;
748 let frontegg = FronteggAuthenticator::from_args(args.frontegg, &metrics_registry)?;
749 let listeners_config: ListenersConfig = {
750 let f = File::open(args.listeners_config_path)?;
751 serde_json::from_reader(f)?
752 };
753
754 for (_, listener) in &listeners_config.sql {
755 listener
756 .validate()
757 .map_err(|e| anyhow::anyhow!("invalid SQL listener: {}", e))?;
758 }
759
760 for (_, listener) in &listeners_config.http {
761 listener
762 .validate()
763 .map_err(|e| anyhow::anyhow!("invalid HTTP listener: {}", e))?;
764 }
765
766 let allowed_origins = if !args.cors_allowed_origin.is_empty() {
768 args.cors_allowed_origin
769 } else {
770 let mut allowed_origins = Vec::with_capacity(listeners_config.http.len() * 6);
771 for (_, listener) in &listeners_config.http {
772 let port = listener.addr().port();
773 allowed_origins.extend([
774 HeaderValue::from_str(&format!("http://localhost:{}", port)).unwrap(),
775 HeaderValue::from_str(&format!("http://127.0.0.1:{}", port)).unwrap(),
776 HeaderValue::from_str(&format!("http://[::1]:{}", port)).unwrap(),
777 HeaderValue::from_str(&format!("https://localhost:{}", port)).unwrap(),
778 HeaderValue::from_str(&format!("https://127.0.0.1:{}", port)).unwrap(),
779 HeaderValue::from_str(&format!("https://[::1]:{}", port)).unwrap(),
780 ])
781 }
782 allowed_origins
783 };
784 let cors_allowed_origin = mz_http_util::build_cors_allowed_origin(&allowed_origins);
785 let cors_allowed_origin_list = allowed_origins.clone();
786
787 let entered = info_span!("environmentd::configure_controller").entered();
789 let (orchestrator, secrets_controller, cloud_resource_controller): (
790 Arc<dyn Orchestrator>,
791 Arc<dyn SecretsController>,
792 Option<Arc<dyn CloudResourceController>>,
793 ) = match args.orchestrator {
794 OrchestratorKind::Kubernetes => {
795 if args.orchestrator_process_scratch_directory.is_some() {
796 bail!(
797 "--orchestrator-process-scratch-directory is \
798 not currently usable with the kubernetes orchestrator"
799 );
800 }
801
802 let orchestrator = Arc::new(
803 runtime
804 .block_on(KubernetesOrchestrator::new(KubernetesOrchestratorConfig {
805 context: args.orchestrator_kubernetes_context.clone(),
806 scheduler_name: args.orchestrator_kubernetes_scheduler_name,
807 service_annotations: args
808 .orchestrator_kubernetes_service_annotation
809 .into_iter()
810 .map(|l| (l.key, l.value))
811 .collect(),
812 service_labels: args
813 .orchestrator_kubernetes_service_label
814 .into_iter()
815 .map(|l| (l.key, l.value))
816 .collect(),
817 service_node_selector: args
818 .orchestrator_kubernetes_service_node_selector
819 .into_iter()
820 .map(|l| (l.key, l.value))
821 .collect(),
822 service_affinity: args.orchestrator_kubernetes_service_affinity,
823 service_tolerations: args.orchestrator_kubernetes_service_tolerations,
824 service_account: args.orchestrator_kubernetes_service_account,
825 image_pull_policy: args.orchestrator_kubernetes_image_pull_policy,
826 aws_external_id_prefix: args.aws_external_id_prefix.clone(),
827 coverage: args.orchestrator_kubernetes_coverage,
828 ephemeral_volume_storage_class: args
829 .orchestrator_kubernetes_ephemeral_volume_class
830 .clone(),
831 service_fs_group: args.orchestrator_kubernetes_service_fs_group.clone(),
832 name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
833 collect_pod_metrics: !args
834 .orchestrator_kubernetes_disable_pod_metrics_collection,
835 enable_prometheus_scrape_annotations: args
836 .orchestrator_kubernetes_enable_prometheus_scrape_annotations,
837 }))
838 .context("creating kubernetes orchestrator")?,
839 );
840 let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
841 SecretsControllerKind::Kubernetes => {
842 let sc = Arc::clone(&orchestrator);
843 let sc: Arc<dyn SecretsController> = sc;
844 sc
845 }
846 SecretsControllerKind::AwsSecretsManager => {
847 Arc::new(
848 runtime.block_on(AwsSecretsController::new(
849 &aws_secrets_controller_prefix(&args.environment_id),
852 &aws_secrets_controller_key_alias(&args.environment_id),
853 args.aws_secrets_controller_tags
854 .into_iter()
855 .map(|tag| (tag.key, tag.value))
856 .collect(),
857 )),
858 )
859 }
860 SecretsControllerKind::LocalFile => bail!(
861 "SecretsControllerKind::LocalFile is not compatible with Orchestrator::Kubernetes."
862 ),
863 };
864 let cloud_resource_controller = Arc::clone(&orchestrator);
865 (
866 orchestrator,
867 secrets_controller,
868 Some(cloud_resource_controller),
869 )
870 }
871 OrchestratorKind::Process => {
872 if args
873 .orchestrator_kubernetes_ephemeral_volume_class
874 .is_some()
875 {
876 bail!(
877 "--orchestrator-kubernetes-ephemeral-volume-class is \
878 not usable with the process orchestrator"
879 );
880 }
881 let orchestrator = Arc::new(
882 runtime
883 .block_on(ProcessOrchestrator::new(ProcessOrchestratorConfig {
884 image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
890 suppress_output: false,
891 environment_id: args.environment_id.to_string(),
892 secrets_dir: args
893 .orchestrator_process_secrets_directory
894 .clone()
895 .expect("clap enforced"),
896 command_wrapper: args
897 .orchestrator_process_wrapper
898 .map_or(Ok(vec![]), |s| shell_words::split(&s))?,
899 propagate_crashes: args.orchestrator_process_propagate_crashes,
900 tcp_proxy: args.orchestrator_process_tcp_proxy_listen_addr.map(
901 |listen_addr| ProcessOrchestratorTcpProxyConfig {
902 listen_addr,
903 prometheus_service_discovery_dir: args
904 .orchestrator_process_prometheus_service_discovery_directory,
905 },
906 ),
907 scratch_directory: args
908 .orchestrator_process_scratch_directory
909 .expect("process orchestrator requires scratch directory"),
910 }))
911 .context("creating process orchestrator")?,
912 );
913 let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
914 SecretsControllerKind::Kubernetes => bail!(
915 "SecretsControllerKind::Kubernetes is not compatible with Orchestrator::Process."
916 ),
917 SecretsControllerKind::AwsSecretsManager => Arc::new(
918 runtime.block_on(AwsSecretsController::new(
919 &aws_secrets_controller_prefix(&args.environment_id),
920 &aws_secrets_controller_key_alias(&args.environment_id),
921 args.aws_secrets_controller_tags
922 .into_iter()
923 .map(|tag| (tag.key, tag.value))
924 .collect(),
925 )),
926 ),
927 SecretsControllerKind::LocalFile => {
928 let sc = Arc::clone(&orchestrator);
929 let sc: Arc<dyn SecretsController> = sc;
930 sc
931 }
932 };
933 (orchestrator, secrets_controller, None)
934 }
935 };
936 drop(entered);
937 let cloud_resource_reader = cloud_resource_controller.as_ref().map(|c| c.reader());
938 let secrets_reader = secrets_controller.reader();
939 let now = SYSTEM_TIME.clone();
940
941 let mut persist_config =
942 PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
943 persist_config.disable_compaction();
945
946 let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
947 let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
948
949 match args.persist_isolated_runtime_threads {
950 None | Some(0) => (),
952 Some(x @ ..=-1) => {
953 let threads = num_cpus::get().saturating_add_signed(x).max(1);
954 persist_config.isolated_runtime_worker_threads = threads;
955 }
956 Some(x @ 1..) => {
957 let threads = usize::try_from(x).expect("pattern matched a positive value");
958 persist_config.isolated_runtime_worker_threads = threads;
959 }
960 };
961
962 let _server = runtime.spawn_named(
963 || "persist::rpc::server",
964 async move {
965 info!(
966 "listening for Persist PubSub connections on {}",
967 args.internal_persist_pubsub_listen_addr
968 );
969 let res = persist_pubsub_server
972 .serve(args.internal_persist_pubsub_listen_addr)
973 .await;
974 error!("Persist Pubsub server exited {:?}", res);
975 }
976 .instrument(tracing::info_span!("persist::rpc::server")),
977 );
978
979 let persist_clients = {
980 let _tokio_guard = runtime.enter();
982 PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
983 let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
984 cfg,
985 persist_pubsub_client.sender,
986 metrics,
987 ));
988 PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
989 })
990 };
991
992 let consensus_uri = args.persist_consensus_url.unwrap_or_else(|| {
993 args.metadata_backend_url
994 .as_ref()
995 .map(|metadata_backend_url| {
996 SensitiveUrl(
997 Url::parse_with_params(
998 metadata_backend_url.0.as_ref(),
999 &[("options", "--search_path=consensus")],
1000 )
1001 .unwrap(),
1002 )
1003 })
1004 .expect("either --persist-consensus-url or --metadata-backend-url must be provided")
1005 });
1006 let timestamp_oracle_url = args.timestamp_oracle_url.or_else(|| {
1007 args.metadata_backend_url
1008 .as_ref()
1009 .map(|metadata_backend_url| {
1010 SensitiveUrl(
1011 Url::parse_with_params(
1012 metadata_backend_url.0.as_ref(),
1013 &[("options", "--search_path=tsoracle")],
1014 )
1015 .unwrap(),
1016 )
1017 })
1018 });
1019
1020 let persist_clients = Arc::new(persist_clients);
1021 let connection_context = ConnectionContext::from_cli_args(
1022 args.environment_id.to_string(),
1023 &args.tracing.startup_log_filter,
1024 args.aws_external_id_prefix,
1025 args.aws_connection_role_arn,
1026 secrets_reader,
1027 cloud_resource_reader,
1028 );
1029 let orchestrator = Arc::new(TracingOrchestrator::new(orchestrator, args.tracing.clone()));
1030 let replica_http_locator = Arc::new(ReplicaHttpLocator::default());
1031 let controller = ControllerConfig {
1032 build_info: &BUILD_INFO,
1033 orchestrator,
1034 persist_location: PersistLocation {
1035 blob_uri: args.persist_blob_url,
1036 consensus_uri,
1037 },
1038 persist_clients: Arc::clone(&persist_clients),
1039 clusterd_image: args.clusterd_image.expect("clap enforced"),
1040 init_container_image: args.orchestrator_kubernetes_init_container_image,
1041 deploy_generation: args.deploy_generation,
1042 now: SYSTEM_TIME.clone(),
1043 metrics_registry: metrics_registry.clone(),
1044 persist_pubsub_url: args.persist_pubsub_url,
1045 connection_context,
1046 secrets_args: SecretsReaderCliArgs {
1049 secrets_reader: args.secrets_controller,
1050 secrets_reader_local_file_dir: args.orchestrator_process_secrets_directory,
1051 secrets_reader_kubernetes_context: Some(args.orchestrator_kubernetes_context),
1052 secrets_reader_aws_prefix: Some(aws_secrets_controller_prefix(&args.environment_id)),
1053 secrets_reader_name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
1054 },
1055 replica_http_locator: Arc::clone(&replica_http_locator),
1056 };
1057
1058 let cluster_replica_sizes = ClusterReplicaSizeMap::parse_from_str(
1059 &args.cluster_replica_sizes,
1060 !license_key.allow_credit_consumption_override,
1061 )
1062 .context("parsing replica size map")?;
1063
1064 emit_boot_diagnostics!(&BUILD_INFO);
1065 sys::adjust_rlimits();
1066
1067 info!(
1068 "startup: envd init: preamble complete in {:?}",
1069 envd_start.elapsed()
1070 );
1071
1072 let serve_start = Instant::now();
1073 info!("startup: envd init: serving beginning");
1074 let server = runtime.block_on(async {
1075 let listeners = Listeners::bind(listeners_config).await?;
1076 let catalog_config = CatalogConfig {
1077 persist_clients,
1078 metrics: Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry)),
1079 };
1080 let server = listeners
1081 .serve(crate::Config {
1082 unsafe_mode: args.unsafe_mode,
1084 all_features: args.all_features,
1085 tls,
1087 tls_reload_certs: mz_server_core::default_cert_reload_ticker(),
1088 external_login_password_mz_system: args.external_login_password_mz_system,
1089 frontegg,
1090 cors_allowed_origin,
1091 cors_allowed_origin_list,
1092 egress_addresses: args.announce_egress_address,
1093 http_host_name: args.http_host_name,
1094 internal_console_redirect_url: args.internal_console_redirect_url,
1095 controller,
1097 secrets_controller,
1098 cloud_resource_controller,
1099 storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
1101 storage_usage_retention_period: args.storage_usage_retention_period,
1102 catalog_config,
1104 availability_zones: args.availability_zone,
1105 cluster_replica_sizes,
1106 timestamp_oracle_url,
1107 segment_api_key: args.segment_api_key,
1108 segment_client_side: args.segment_client_side,
1109 test_only_dummy_segment_client: args.test_only_dummy_segment_client,
1110 launchdarkly_sdk_key: args.launchdarkly_sdk_key,
1111 launchdarkly_key_map: args
1112 .launchdarkly_key_map
1113 .into_iter()
1114 .map(|kv| (kv.key, kv.value))
1115 .collect(),
1116 config_sync_timeout: args.config_sync_timeout,
1117 config_sync_loop_interval: args.config_sync_loop_interval,
1118 config_sync_file_path: args.config_sync_file_path,
1119
1120 environment_id: args.environment_id,
1122 bootstrap_role: args.bootstrap_role,
1123 bootstrap_default_cluster_replica_size: args.bootstrap_default_cluster_replica_size,
1124 bootstrap_default_cluster_replication_factor: args
1125 .bootstrap_default_cluster_replication_factor,
1126 bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1127 size: args.bootstrap_builtin_system_cluster_replica_size,
1128 replication_factor: args.bootstrap_builtin_system_cluster_replication_factor,
1129 },
1130 bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1131 size: args.bootstrap_builtin_catalog_server_cluster_replica_size,
1132 replication_factor: args
1133 .bootstrap_builtin_catalog_server_cluster_replication_factor,
1134 },
1135 bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1136 size: args.bootstrap_builtin_probe_cluster_replica_size,
1137 replication_factor: args.bootstrap_builtin_probe_cluster_replication_factor,
1138 },
1139 bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1140 size: args.bootstrap_builtin_support_cluster_replica_size,
1141 replication_factor: args.bootstrap_builtin_support_cluster_replication_factor,
1142 },
1143 bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1144 size: args.bootstrap_builtin_analytics_cluster_replica_size,
1145 replication_factor: args.bootstrap_builtin_analytics_cluster_replication_factor,
1146 },
1147 system_parameter_defaults: args
1148 .system_parameter_default
1149 .into_iter()
1150 .map(|kv| (kv.key, kv.value))
1151 .collect(),
1152 helm_chart_version: args.helm_chart_version.clone(),
1153 license_key,
1154 aws_account_id: args.aws_account_id,
1156 aws_privatelink_availability_zones: args.aws_privatelink_availability_zones,
1157 metrics_registry,
1159 tracing_handle,
1160 now,
1162 force_builtin_schema_migration,
1163 })
1164 .await
1165 .maybe_terminate("booting server")?;
1166 Ok::<_, anyhow::Error>(server)
1167 })?;
1168 info!(
1169 "startup: envd init: serving complete in {:?}",
1170 serve_start.elapsed()
1171 );
1172
1173 let start_duration = envd_start.elapsed();
1174 metrics
1175 .start_time_environmentd
1176 .set(start_duration.as_millis().try_into().expect("must fit"));
1177 let span = span.exit();
1178 let id = span.context().span().span_context().trace_id();
1179 drop(span);
1180
1181 info!("startup: envd init: complete in {start_duration:?}");
1182
1183 println!(
1184 "environmentd {} listening...",
1185 BUILD_INFO.human_version(args.helm_chart_version)
1186 );
1187 for (name, handle) in &server.sql_listener_handles {
1188 println!("{} SQL address: {}", name, handle.local_addr);
1189 }
1190 for (name, handle) in &server.http_listener_handles {
1191 println!("{} HTTP address: {}", name, handle.local_addr);
1192 }
1193 println!(
1195 " Internal Persist PubSub address: {}",
1196 args.internal_persist_pubsub_listen_addr
1197 );
1198
1199 println!(" Root trace ID: {id}");
1200
1201 loop {
1203 thread::park();
1204 }
1205}
1206
1207fn build_info() -> Vec<String> {
1208 let openssl_version =
1209 unsafe { CStr::from_ptr(openssl_sys::OpenSSL_version(openssl_sys::OPENSSL_VERSION)) };
1210 let rdkafka_version = unsafe { CStr::from_ptr(rdkafka_sys::bindings::rd_kafka_version_str()) };
1211 vec![
1212 openssl_version.to_string_lossy().into_owned(),
1213 format!("librdkafka v{}", rdkafka_version.to_string_lossy()),
1214 ]
1215}
1216
1217#[derive(Debug, Clone)]
1218struct Metrics {
1219 pub start_time_environmentd: IntGauge,
1220}
1221
1222impl Metrics {
1223 pub fn register_into(registry: &MetricsRegistry, build_info: BuildInfo) -> Metrics {
1224 Metrics {
1225 start_time_environmentd: registry.register(metric!(
1226 name: "mz_start_time_environmentd",
1227 help: "Time in milliseconds from environmentd start until the adapter is ready.",
1228 const_labels: {
1229 "version" => build_info.version,
1230 "build_type" => if cfg!(release) { "release" } else { "debug" }
1231 },
1232 )),
1233 }
1234 }
1235}