1use std::ffi::CStr;
16use std::fs::File;
17use std::net::{IpAddr, SocketAddr};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::LazyLock;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use std::{cmp, env, iter, thread};
24
25use anyhow::{Context, bail};
26use clap::{ArgAction, Parser, ValueEnum};
27use fail::FailScenario;
28use http::header::HeaderValue;
29use ipnet::IpNet;
30use itertools::Itertools;
31use mz_adapter::ResultExt;
32use mz_adapter_types::bootstrap_builtin_cluster_config::{
33 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
34 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, DEFAULT_REPLICATION_FACTOR,
35 PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR, SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
36 SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
37};
38use mz_auth::password::Password;
39use mz_aws_secrets_controller::AwsSecretsController;
40use mz_build_info::BuildInfo;
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController};
43use mz_controller::{ControllerConfig, ReplicaHttpLocator};
44use mz_frontegg_auth::{Authenticator as FronteggAuthenticator, FronteggCliArgs};
45use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
46use mz_orchestrator::Orchestrator;
47use mz_orchestrator_kubernetes::{
48 KubernetesImagePullPolicy, KubernetesOrchestrator, KubernetesOrchestratorConfig,
49};
50use mz_orchestrator_process::{
51 ProcessOrchestrator, ProcessOrchestratorConfig, ProcessOrchestratorTcpProxyConfig,
52};
53use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs, TracingOrchestrator};
54use mz_ore::cli::{self, CliConfig, KeyValueArg};
55use mz_ore::error::ErrorExt;
56use mz_ore::metric;
57use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
58use mz_ore::now::SYSTEM_TIME;
59use mz_ore::task::RuntimeExt;
60use mz_ore::url::SensitiveUrl;
61use mz_persist_client::PersistLocation;
62use mz_persist_client::cache::PersistClientCache;
63use mz_persist_client::cfg::PersistConfig;
64use mz_persist_client::rpc::{
65 MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
66};
67use mz_secrets::SecretsController;
68use mz_server_core::TlsCliArgs;
69use mz_service::emit_boot_diagnostics;
70use mz_service::secrets::{SecretsControllerKind, SecretsReaderCliArgs};
71use mz_sql::catalog::EnvironmentId;
72use mz_storage_types::connections::ConnectionContext;
73use opentelemetry::trace::TraceContextExt;
74use prometheus::IntGauge;
75use tracing::{Instrument, error, info, info_span, warn};
76use tracing_opentelemetry::OpenTelemetrySpanExt;
77use url::Url;
78
79use crate::environmentd::sys;
80use crate::{BUILD_INFO, CatalogConfig, ListenerConfig, Listeners, ListenersConfig};
81
82static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
83static LONG_VERSION: LazyLock<String> = LazyLock::new(|| {
84 iter::once(BUILD_INFO.human_version(None))
85 .chain(build_info())
86 .join("\n")
87});
88
89#[derive(Parser, Debug)]
91#[clap(
92 name = "environmentd",
93 next_line_help = true,
94 version = VERSION.as_str(),
95 long_version = LONG_VERSION.as_str(),
96)]
97pub struct Args {
98 #[clap(long, env = "UNSAFE_MODE")]
102 unsafe_mode: bool,
103 #[clap(long, env = "ALL_FEATURES")]
106 all_features: bool,
107
108 #[clap(
112 long,
113 env = "LISTENERS_CONFIG_PATH",
114 value_name = "PATH",
115 action = ArgAction::Set,
116 )]
117 listeners_config_path: PathBuf,
118 #[clap(
120 long,
121 env = "EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM",
122 action = ArgAction::Set,
123 )]
124 external_login_password_mz_system: Option<Password>,
125 #[clap(
131 long,
132 value_name = "HOST:PORT",
133 env = "INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR",
134 default_value = "127.0.0.1:6879",
135 action = ArgAction::Set,
136 )]
137 internal_persist_pubsub_listen_addr: SocketAddr,
138 #[structopt(long, env = "CORS_ALLOWED_ORIGIN")]
147 cors_allowed_origin: Vec<HeaderValue>,
148 #[clap(
151 long,
152 env = "ANNOUNCE_EGRESS_ADDRESS",
153 action = ArgAction::Append,
154 use_value_delimiter = true
155 )]
156 announce_egress_address: Vec<IpNet>,
157 #[clap(long, env = "HTTP_HOST_NAME")]
166 http_host_name: Option<String>,
167 #[clap(long, env = "INTERNAL_CONSOLE_REDIRECT_URL")]
170 internal_console_redirect_url: Option<String>,
171 #[clap(flatten)]
173 tls: TlsCliArgs,
174 #[clap(flatten)]
176 frontegg: FronteggCliArgs,
177 #[structopt(long, value_enum, env = "ORCHESTRATOR")]
180 orchestrator: OrchestratorKind,
181 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SCHEDULER_NAME")]
183 orchestrator_kubernetes_scheduler_name: Option<String>,
184 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ANNOTATION")]
187 orchestrator_kubernetes_service_annotation: Vec<KeyValueArg<String, String>>,
188 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_LABEL")]
191 orchestrator_kubernetes_service_label: Vec<KeyValueArg<String, String>>,
192 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_NODE_SELECTOR")]
195 orchestrator_kubernetes_service_node_selector: Vec<KeyValueArg<String, String>>,
196 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_AFFINITY")]
199 orchestrator_kubernetes_service_affinity: Option<String>,
200 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_TOLERATIONS")]
203 orchestrator_kubernetes_service_tolerations: Option<String>,
204 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ACCOUNT")]
207 orchestrator_kubernetes_service_account: Option<String>,
208 #[structopt(
213 long,
214 env = "ORCHESTRATOR_KUBERNETES_CONTEXT",
215 default_value = "minikube"
216 )]
217 orchestrator_kubernetes_context: String,
218 #[structopt(
221 long,
222 env = "ORCHESTRATOR_KUBERNETES_IMAGE_PULL_POLICY",
223 default_value = "always",
224 value_enum
225 )]
226 orchestrator_kubernetes_image_pull_policy: KubernetesImagePullPolicy,
227 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_INIT_CONTAINER_IMAGE")]
229 orchestrator_kubernetes_init_container_image: Option<String>,
230 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_EPHEMERAL_VOLUME_CLASS")]
236 orchestrator_kubernetes_ephemeral_volume_class: Option<String>,
237 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_FS_GROUP")]
239 orchestrator_kubernetes_service_fs_group: Option<i64>,
240 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_NAME_PREFIX")]
242 orchestrator_kubernetes_name_prefix: Option<String>,
243 #[clap(long, env = "ORCHESTRATOR_KUBERNETES_DISABLE_POD_METRICS_COLLECTION")]
248 orchestrator_kubernetes_disable_pod_metrics_collection: bool,
249 #[clap(
251 long,
252 env = "ORCHESTRATOR_KUBERNETES_ENABLE_PROMETHEUS_SCRAPE_ANNOTATIONS"
253 )]
254 orchestrator_kubernetes_enable_prometheus_scrape_annotations: bool,
255 #[clap(long, env = "ORCHESTRATOR_PROCESS_WRAPPER")]
256 orchestrator_process_wrapper: Option<String>,
257 #[clap(
259 long,
260 env = "ORCHESTRATOR_PROCESS_SECRETS_DIRECTORY",
261 value_name = "PATH",
262 required_if_eq("orchestrator", "process")
263 )]
264 orchestrator_process_secrets_directory: Option<PathBuf>,
265 #[clap(long, env = "ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES")]
268 orchestrator_process_propagate_crashes: bool,
269 #[clap(long, env = "ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR")]
281 orchestrator_process_tcp_proxy_listen_addr: Option<IpAddr>,
282 #[clap(
295 long,
296 env = "ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY"
297 )]
298 orchestrator_process_prometheus_service_discovery_directory: Option<PathBuf>,
299 #[clap(
301 long,
302 env = "ORCHESTRATOR_PROCESS_SCRATCH_DIRECTORY",
303 value_name = "PATH"
304 )]
305 orchestrator_process_scratch_directory: Option<PathBuf>,
306 #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_COVERAGE")]
309 orchestrator_kubernetes_coverage: bool,
310 #[structopt(
312 long,
313 value_enum,
314 env = "SECRETS_CONTROLLER",
315 default_value_ifs([
316 ("orchestrator", "kubernetes", Some("kubernetes")),
317 ("orchestrator", "process", Some("local-file"))
318 ]),
319 default_value("kubernetes"), )]
321 secrets_controller: SecretsControllerKind,
322 #[clap(
325 long,
326 env = "AWS_SECRETS_CONTROLLER_TAGS",
327 action = ArgAction::Append,
328 value_delimiter = ';',
329 required_if_eq("secrets_controller", "aws-secrets-manager")
330 )]
331 aws_secrets_controller_tags: Vec<KeyValueArg<String, String>>,
332 #[structopt(
334 long,
335 env = "CLUSTERD_IMAGE",
336 required_if_eq("orchestrator", "kubernetes"),
337 default_value_if("orchestrator", "process", Some("clusterd"))
338 )]
339 clusterd_image: Option<String>,
340 #[clap(long, env = "DEPLOY_GENERATION", default_value = "0")]
345 deploy_generation: u64,
346
347 #[clap(
350 long,
351 env = "METADATA_BACKEND_URL",
352 conflicts_with_all = &[
353 "persist_consensus_url",
354 "timestamp_oracle_url",
355 ],
356 )]
357 metadata_backend_url: Option<SensitiveUrl>,
358
359 #[clap(long, env = "HELM_CHART_VERSION")]
364 helm_chart_version: Option<String>,
365
366 #[clap(long, env = "PERSIST_BLOB_URL")]
369 persist_blob_url: SensitiveUrl,
370 #[clap(long, env = "PERSIST_CONSENSUS_URL")]
372 persist_consensus_url: Option<SensitiveUrl>,
373 #[clap(
377 long,
378 env = "PERSIST_PUBSUB_URL",
379 default_value = "http://localhost:6879"
380 )]
381 persist_pubsub_url: String,
382 #[clap(long, env = "PERSIST_ISOLATED_RUNTIME_THREADS")]
386 persist_isolated_runtime_threads: Option<isize>,
387 #[clap(
389 long,
390 env = "STORAGE_USAGE_COLLECTION_INTERVAL",
391 value_parser = humantime::parse_duration,
392 default_value = "3600s"
393 )]
394 storage_usage_collection_interval_sec: Duration,
395 #[clap(long, env = "STORAGE_USAGE_RETENTION_PERIOD", value_parser = humantime::parse_duration)]
399 storage_usage_retention_period: Option<Duration>,
400
401 #[clap(long, env = "TIMESTAMP_ORACLE_URL", value_name = "POSTGRES_URL")]
404 timestamp_oracle_url: Option<SensitiveUrl>,
405 #[clap(long, env = "AVAILABILITY_ZONE", use_value_delimiter = true)]
408 availability_zone: Vec<String>,
409 #[clap(
411 long,
412 env = "CLUSTER_REPLICA_SIZES",
413 requires = "bootstrap_default_cluster_replica_size"
414 )]
415 cluster_replica_sizes: String,
416 #[clap(long, env = "SEGMENT_API_KEY")]
418 segment_api_key: Option<String>,
419 #[clap(long, env = "SEGMENT_CLIENT_SIDE")]
425 segment_client_side: bool,
426 #[clap(long, env = "TEST_ONLY_DUMMY_SEGMENT_CLIENT")]
429 test_only_dummy_segment_client: bool,
430 #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")]
436 launchdarkly_sdk_key: Option<String>,
437 #[clap(
444 long,
445 env = "LAUNCHDARKLY_KEY_MAP",
446 action = ArgAction::Append,
447 value_delimiter = ';'
448 )]
449 launchdarkly_key_map: Vec<KeyValueArg<String, String>>,
450 #[clap(
452 long,
453 env = "CONFIG_SYNC_TIMEOUT",
454 value_parser = humantime::parse_duration,
455 default_value = "30s"
456 )]
457 config_sync_timeout: Duration,
458 #[clap(
464 long,
465 env = "CONFIG_SYNC_LOOP_INTERVAL",
466 value_parser = humantime::parse_duration,
467 )]
468 config_sync_loop_interval: Option<Duration>,
469 #[clap(long, env = "CONFIG_SYNC_FILE_PATH", value_name = "PATH")]
472 config_sync_file_path: Option<PathBuf>,
473
474 #[clap(
476 long,
477 env = "ENVIRONMENT_ID",
478 value_name = "<CLOUD>-<REGION>-<ORG-ID>-<ORDINAL>"
479 )]
480 environment_id: EnvironmentId,
481 #[clap(long, env = "BOOTSTRAP_ROLE")]
490 bootstrap_role: Option<String>,
491 #[clap(
493 long,
494 env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
495 default_value = "scale=1,workers=1"
496 )]
497 bootstrap_default_cluster_replica_size: String,
498 #[clap(
500 long,
501 env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
502 default_value = "scale=1,workers=1"
503 )]
504 bootstrap_builtin_system_cluster_replica_size: String,
505 #[clap(
507 long,
508 env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
509 default_value = "scale=1,workers=1"
510 )]
511 bootstrap_builtin_catalog_server_cluster_replica_size: String,
512 #[clap(
514 long,
515 env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
516 default_value = "scale=1,workers=1"
517 )]
518 bootstrap_builtin_probe_cluster_replica_size: String,
519 #[clap(
521 long,
522 env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
523 default_value = "scale=1,workers=1"
524 )]
525 bootstrap_builtin_support_cluster_replica_size: String,
526 #[clap(
528 long,
529 env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
530 default_value = "scale=1,workers=1"
531 )]
532 bootstrap_builtin_analytics_cluster_replica_size: String,
533 #[clap(
534 long,
535 env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR",
536 default_value = DEFAULT_REPLICATION_FACTOR.to_string(),
537 value_parser = clap::value_parser!(u32).range(0..=2)
538 )]
539 bootstrap_default_cluster_replication_factor: u32,
540 #[clap(
542 long,
543 env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR",
544 default_value = SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
545 value_parser = clap::value_parser!(u32).range(0..=2)
546 )]
547 bootstrap_builtin_system_cluster_replication_factor: u32,
548 #[clap(
550 long,
551 env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICATION_FACTOR",
552 default_value = CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
553 value_parser = clap::value_parser!(u32).range(0..=2)
554 )]
555 bootstrap_builtin_catalog_server_cluster_replication_factor: u32,
556 #[clap(
558 long,
559 env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR",
560 default_value = PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
561 value_parser = clap::value_parser!(u32).range(0..=2)
562 )]
563 bootstrap_builtin_probe_cluster_replication_factor: u32,
564 #[clap(
566 long,
567 env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICATION_FACTOR",
568 default_value = SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
569 value_parser = clap::value_parser!(u32).range(0..=2)
570 )]
571 bootstrap_builtin_support_cluster_replication_factor: u32,
572 #[clap(
574 long,
575 env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICATION_FACTOR",
576 default_value = ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
577 value_parser = clap::value_parser!(u32).range(0..=2)
578 )]
579 bootstrap_builtin_analytics_cluster_replication_factor: u32,
580 #[clap(
583 long,
584 env = "SYSTEM_PARAMETER_DEFAULT",
585 action = ArgAction::Append,
586 value_delimiter = ';'
587 )]
588 system_parameter_default: Vec<KeyValueArg<String, String>>,
589 #[clap(long, env = "LICENSE_KEY")]
591 license_key: Option<String>,
592
593 #[clap(long, env = "AWS_ACCOUNT_ID")]
597 aws_account_id: Option<String>,
598 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
601 aws_connection_role_arn: Option<String>,
602 #[clap(long, env = "AWS_EXTERNAL_ID_PREFIX", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
606 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
607 #[clap(
610 long,
611 env = "AWS_PRIVATELINK_AVAILABILITY_ZONES",
612 action = ArgAction::Append,
613 use_value_delimiter = true
614 )]
615 aws_privatelink_availability_zones: Option<Vec<String>>,
616
617 #[clap(flatten)]
619 tracing: TracingCliArgs,
620
621 #[clap(long, value_enum, requires = "unsafe_mode")]
628 unsafe_force_builtin_schema_migration: Option<String>,
629}
630
631#[derive(ValueEnum, Debug, Clone)]
632enum OrchestratorKind {
633 Kubernetes,
634 Process,
635}
636
637fn aws_secrets_controller_prefix(env_id: &EnvironmentId) -> String {
640 format!("/user-managed/{}/", env_id)
641}
642fn aws_secrets_controller_key_alias(env_id: &EnvironmentId) -> String {
643 format!("alias/customer_key_{}", env_id)
646}
647
648pub fn main() {
649 let args = cli::parse_args(CliConfig {
650 env_prefix: Some("MZ_"),
651 enable_version_flag: true,
652 });
653 if let Err(err) = run(args) {
654 panic!("environmentd: fatal: {}", err.display_with_causes());
655 }
656}
657
658fn run(mut args: Args) -> Result<(), anyhow::Error> {
659 mz_ore::panic::install_enhanced_handler();
660 let envd_start = Instant::now();
661
662 sys::enable_sigusr2_coverage_dump()?;
665 sys::enable_termination_signal_cleanup()?;
666
667 let license_key = if let Some(license_key_file) = args.license_key {
668 let license_key_text = std::fs::read_to_string(&license_key_file)
669 .context("failed to open license key file")?;
670 let license_key = mz_license_keys::validate(license_key_text.trim())
671 .context("failed to validate license key file")?;
672 if license_key.expired {
673 let message = format!(
674 "The license key provided at {license_key_file} is expired! Please contact Materialize for assistance."
675 );
676 match license_key.expiration_behavior {
677 ExpirationBehavior::Warn | ExpirationBehavior::DisableClusterCreation => {
678 warn!("{message}");
679 }
680 ExpirationBehavior::Disable => bail!("{message}"),
681 }
682 }
683 license_key
684 } else if matches!(args.orchestrator, OrchestratorKind::Kubernetes) {
685 bail!("--license-key is required when running in Kubernetes");
686 } else {
687 ValidatedLicenseKey::disabled()
689 };
690
691 let force_builtin_schema_migration = args
693 .unsafe_force_builtin_schema_migration
694 .inspect(|_mechanism| assert!(args.unsafe_mode));
695
696 let ncpus_useful = usize::max(1, cmp::min(num_cpus::get(), num_cpus::get_physical()));
699 let runtime = Arc::new(
700 tokio::runtime::Builder::new_multi_thread()
701 .worker_threads(ncpus_useful)
702 .thread_stack_size(3 * 1024 * 1024) .thread_name_fn(|| {
706 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
707 let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
708 format!("tokio:work-{}", id)
709 })
710 .enable_all()
711 .build()?,
712 );
713
714 args.tracing.log_prefix = if matches!(args.orchestrator, OrchestratorKind::Process) {
718 Some("environmentd".to_string())
719 } else {
720 None
721 };
722
723 let metrics_registry = MetricsRegistry::new();
724 let tracing_handle = runtime.block_on(args.tracing.configure_tracing(
725 StaticTracingConfig {
726 service_name: "environmentd",
727 build_info: BUILD_INFO,
728 },
729 metrics_registry.clone(),
730 ))?;
731 register_runtime_metrics("main", runtime.metrics(), &metrics_registry);
732
733 let span = tracing::info_span!("environmentd::run").entered();
734
735 info!("startup: envd init: beginning");
736 info!("startup: envd init: preamble beginning");
737
738 let metrics = Metrics::register_into(&metrics_registry, BUILD_INFO);
739
740 runtime.block_on(mz_alloc::register_metrics_into(&metrics_registry));
741 runtime.block_on(mz_metrics::register_metrics_into(
742 &metrics_registry,
743 mz_dyncfgs::all_dyncfgs(),
744 ));
745
746 let _failpoint_scenario = FailScenario::setup();
748
749 let tls = args.tls.into_config()?;
751 let frontegg_oauth_issuer_url = args.frontegg.oauth_issuer_url().map(str::to_string);
752 let frontegg = FronteggAuthenticator::from_args(args.frontegg, &metrics_registry)?;
753 let listeners_config: ListenersConfig = {
754 let f = File::open(args.listeners_config_path)?;
755 serde_json::from_reader(f)?
756 };
757
758 for (_, listener) in &listeners_config.sql {
759 listener
760 .validate()
761 .map_err(|e| anyhow::anyhow!("invalid SQL listener: {}", e))?;
762 }
763
764 for (_, listener) in &listeners_config.http {
765 listener
766 .validate()
767 .map_err(|e| anyhow::anyhow!("invalid HTTP listener: {}", e))?;
768 }
769
770 let allowed_origins = if !args.cors_allowed_origin.is_empty() {
772 args.cors_allowed_origin
773 } else {
774 let mut allowed_origins = Vec::with_capacity(listeners_config.http.len() * 6);
775 for (_, listener) in &listeners_config.http {
776 let port = listener.addr().port();
777 allowed_origins.extend([
778 HeaderValue::from_str(&format!("http://localhost:{}", port)).unwrap(),
779 HeaderValue::from_str(&format!("http://127.0.0.1:{}", port)).unwrap(),
780 HeaderValue::from_str(&format!("http://[::1]:{}", port)).unwrap(),
781 HeaderValue::from_str(&format!("https://localhost:{}", port)).unwrap(),
782 HeaderValue::from_str(&format!("https://127.0.0.1:{}", port)).unwrap(),
783 HeaderValue::from_str(&format!("https://[::1]:{}", port)).unwrap(),
784 ])
785 }
786 allowed_origins
787 };
788 let cors_allowed_origin = mz_http_util::build_cors_allowed_origin(&allowed_origins);
789 let cors_allowed_origin_list = allowed_origins.clone();
790
791 let entered = info_span!("environmentd::configure_controller").entered();
793 let (orchestrator, secrets_controller, cloud_resource_controller): (
794 Arc<dyn Orchestrator>,
795 Arc<dyn SecretsController>,
796 Option<Arc<dyn CloudResourceController>>,
797 ) = match args.orchestrator {
798 OrchestratorKind::Kubernetes => {
799 if args.orchestrator_process_scratch_directory.is_some() {
800 bail!(
801 "--orchestrator-process-scratch-directory is \
802 not currently usable with the kubernetes orchestrator"
803 );
804 }
805
806 let orchestrator = Arc::new(
807 runtime
808 .block_on(KubernetesOrchestrator::new(KubernetesOrchestratorConfig {
809 context: args.orchestrator_kubernetes_context.clone(),
810 scheduler_name: args.orchestrator_kubernetes_scheduler_name,
811 service_annotations: args
812 .orchestrator_kubernetes_service_annotation
813 .into_iter()
814 .map(|l| (l.key, l.value))
815 .collect(),
816 service_labels: args
817 .orchestrator_kubernetes_service_label
818 .into_iter()
819 .map(|l| (l.key, l.value))
820 .collect(),
821 service_node_selector: args
822 .orchestrator_kubernetes_service_node_selector
823 .into_iter()
824 .map(|l| (l.key, l.value))
825 .collect(),
826 service_affinity: args.orchestrator_kubernetes_service_affinity,
827 service_tolerations: args.orchestrator_kubernetes_service_tolerations,
828 service_account: args.orchestrator_kubernetes_service_account,
829 image_pull_policy: args.orchestrator_kubernetes_image_pull_policy,
830 aws_external_id_prefix: args.aws_external_id_prefix.clone(),
831 coverage: args.orchestrator_kubernetes_coverage,
832 ephemeral_volume_storage_class: args
833 .orchestrator_kubernetes_ephemeral_volume_class
834 .clone(),
835 service_fs_group: args.orchestrator_kubernetes_service_fs_group.clone(),
836 name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
837 collect_pod_metrics: !args
838 .orchestrator_kubernetes_disable_pod_metrics_collection,
839 enable_prometheus_scrape_annotations: args
840 .orchestrator_kubernetes_enable_prometheus_scrape_annotations,
841 }))
842 .context("creating kubernetes orchestrator")?,
843 );
844 let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
845 SecretsControllerKind::Kubernetes => {
846 let sc = Arc::clone(&orchestrator);
847 let sc: Arc<dyn SecretsController> = sc;
848 sc
849 }
850 SecretsControllerKind::AwsSecretsManager => {
851 Arc::new(
852 runtime.block_on(AwsSecretsController::new(
853 &aws_secrets_controller_prefix(&args.environment_id),
856 &aws_secrets_controller_key_alias(&args.environment_id),
857 args.aws_secrets_controller_tags
858 .into_iter()
859 .map(|tag| (tag.key, tag.value))
860 .collect(),
861 )),
862 )
863 }
864 SecretsControllerKind::LocalFile => bail!(
865 "SecretsControllerKind::LocalFile is not compatible with Orchestrator::Kubernetes."
866 ),
867 };
868 let cloud_resource_controller = Arc::clone(&orchestrator);
869 (
870 orchestrator,
871 secrets_controller,
872 Some(cloud_resource_controller),
873 )
874 }
875 OrchestratorKind::Process => {
876 if args
877 .orchestrator_kubernetes_ephemeral_volume_class
878 .is_some()
879 {
880 bail!(
881 "--orchestrator-kubernetes-ephemeral-volume-class is \
882 not usable with the process orchestrator"
883 );
884 }
885 let orchestrator = Arc::new(
886 runtime
887 .block_on(ProcessOrchestrator::new(ProcessOrchestratorConfig {
888 image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
894 suppress_output: false,
895 environment_id: args.environment_id.to_string(),
896 secrets_dir: args
897 .orchestrator_process_secrets_directory
898 .clone()
899 .expect("clap enforced"),
900 command_wrapper: args
901 .orchestrator_process_wrapper
902 .map_or(Ok(vec![]), |s| shell_words::split(&s))?,
903 propagate_crashes: args.orchestrator_process_propagate_crashes,
904 tcp_proxy: args.orchestrator_process_tcp_proxy_listen_addr.map(
905 |listen_addr| ProcessOrchestratorTcpProxyConfig {
906 listen_addr,
907 prometheus_service_discovery_dir: args
908 .orchestrator_process_prometheus_service_discovery_directory,
909 },
910 ),
911 scratch_directory: args
912 .orchestrator_process_scratch_directory
913 .expect("process orchestrator requires scratch directory"),
914 }))
915 .context("creating process orchestrator")?,
916 );
917 let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
918 SecretsControllerKind::Kubernetes => bail!(
919 "SecretsControllerKind::Kubernetes is not compatible with Orchestrator::Process."
920 ),
921 SecretsControllerKind::AwsSecretsManager => Arc::new(
922 runtime.block_on(AwsSecretsController::new(
923 &aws_secrets_controller_prefix(&args.environment_id),
924 &aws_secrets_controller_key_alias(&args.environment_id),
925 args.aws_secrets_controller_tags
926 .into_iter()
927 .map(|tag| (tag.key, tag.value))
928 .collect(),
929 )),
930 ),
931 SecretsControllerKind::LocalFile => {
932 let sc = Arc::clone(&orchestrator);
933 let sc: Arc<dyn SecretsController> = sc;
934 sc
935 }
936 };
937 (orchestrator, secrets_controller, None)
938 }
939 };
940 drop(entered);
941 let cloud_resource_reader = cloud_resource_controller.as_ref().map(|c| c.reader());
942 let secrets_reader = secrets_controller.reader();
943 let now = SYSTEM_TIME.clone();
944
945 let mut persist_config =
946 PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
947 persist_config.disable_compaction();
949
950 let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
951 let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
952
953 match args.persist_isolated_runtime_threads {
954 None | Some(0) => (),
956 Some(x @ ..=-1) => {
957 let threads = num_cpus::get().saturating_add_signed(x).max(1);
958 persist_config.isolated_runtime_worker_threads = threads;
959 }
960 Some(x @ 1..) => {
961 let threads = usize::try_from(x).expect("pattern matched a positive value");
962 persist_config.isolated_runtime_worker_threads = threads;
963 }
964 };
965
966 let _server = runtime.spawn_named(
967 || "persist::rpc::server",
968 async move {
969 info!(
970 "listening for Persist PubSub connections on {}",
971 args.internal_persist_pubsub_listen_addr
972 );
973 let res = persist_pubsub_server
976 .serve(args.internal_persist_pubsub_listen_addr)
977 .await;
978 error!("Persist Pubsub server exited {:?}", res);
979 }
980 .instrument(tracing::info_span!("persist::rpc::server")),
981 );
982
983 let persist_clients = {
984 let _tokio_guard = runtime.enter();
986 PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
987 let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
988 cfg,
989 persist_pubsub_client.sender,
990 metrics,
991 ));
992 PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
993 })
994 };
995
996 let consensus_uri = args.persist_consensus_url.unwrap_or_else(|| {
997 args.metadata_backend_url
998 .as_ref()
999 .map(|metadata_backend_url| {
1000 SensitiveUrl(
1001 Url::parse_with_params(
1002 metadata_backend_url.0.as_ref(),
1003 &[("options", "--search_path=consensus")],
1004 )
1005 .unwrap(),
1006 )
1007 })
1008 .expect("either --persist-consensus-url or --metadata-backend-url must be provided")
1009 });
1010 let timestamp_oracle_url = args.timestamp_oracle_url.or_else(|| {
1011 args.metadata_backend_url
1012 .as_ref()
1013 .map(|metadata_backend_url| {
1014 SensitiveUrl(
1015 Url::parse_with_params(
1016 metadata_backend_url.0.as_ref(),
1017 &[("options", "--search_path=tsoracle")],
1018 )
1019 .unwrap(),
1020 )
1021 })
1022 });
1023
1024 let persist_clients = Arc::new(persist_clients);
1025 let connection_context = ConnectionContext::from_cli_args(
1026 args.environment_id.to_string(),
1027 &args.tracing.startup_log_filter,
1028 args.aws_external_id_prefix,
1029 args.aws_connection_role_arn,
1030 secrets_reader,
1031 cloud_resource_reader,
1032 );
1033 let orchestrator = Arc::new(TracingOrchestrator::new(orchestrator, args.tracing.clone()));
1034 let replica_http_locator = Arc::new(ReplicaHttpLocator::default());
1035 let controller = ControllerConfig {
1036 build_info: &BUILD_INFO,
1037 orchestrator,
1038 persist_location: PersistLocation {
1039 blob_uri: args.persist_blob_url,
1040 consensus_uri,
1041 },
1042 persist_clients: Arc::clone(&persist_clients),
1043 clusterd_image: args.clusterd_image.expect("clap enforced"),
1044 init_container_image: args.orchestrator_kubernetes_init_container_image,
1045 deploy_generation: args.deploy_generation,
1046 now: SYSTEM_TIME.clone(),
1047 metrics_registry: metrics_registry.clone(),
1048 persist_pubsub_url: args.persist_pubsub_url,
1049 connection_context,
1050 secrets_args: SecretsReaderCliArgs {
1053 secrets_reader: args.secrets_controller,
1054 secrets_reader_local_file_dir: args.orchestrator_process_secrets_directory,
1055 secrets_reader_kubernetes_context: Some(args.orchestrator_kubernetes_context),
1056 secrets_reader_aws_prefix: Some(aws_secrets_controller_prefix(&args.environment_id)),
1057 secrets_reader_name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
1058 },
1059 replica_http_locator: Arc::clone(&replica_http_locator),
1060 };
1061
1062 let cluster_replica_sizes = ClusterReplicaSizeMap::parse_from_str(
1063 &args.cluster_replica_sizes,
1064 !license_key.allow_credit_consumption_override,
1065 )
1066 .context("parsing replica size map")?;
1067
1068 emit_boot_diagnostics!(&BUILD_INFO);
1069 sys::adjust_rlimits();
1070
1071 info!(
1072 "startup: envd init: preamble complete in {:?}",
1073 envd_start.elapsed()
1074 );
1075
1076 let serve_start = Instant::now();
1077 info!("startup: envd init: serving beginning");
1078 let server = runtime.block_on(async {
1079 let listeners = Listeners::bind(listeners_config).await?;
1080 let catalog_config = CatalogConfig {
1081 persist_clients,
1082 metrics: Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry)),
1083 };
1084 let server = listeners
1085 .serve(crate::Config {
1086 unsafe_mode: args.unsafe_mode,
1088 all_features: args.all_features,
1089 tls,
1091 tls_reload_certs: mz_server_core::default_cert_reload_ticker(),
1092 external_login_password_mz_system: args.external_login_password_mz_system,
1093 frontegg,
1094 frontegg_oauth_issuer_url,
1095 cors_allowed_origin,
1096 cors_allowed_origin_list,
1097 egress_addresses: args.announce_egress_address,
1098 http_host_name: args.http_host_name,
1099 internal_console_redirect_url: args.internal_console_redirect_url,
1100 controller,
1102 secrets_controller,
1103 cloud_resource_controller,
1104 storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
1106 storage_usage_retention_period: args.storage_usage_retention_period,
1107 catalog_config,
1109 availability_zones: args.availability_zone,
1110 cluster_replica_sizes,
1111 timestamp_oracle_url,
1112 segment_api_key: args.segment_api_key,
1113 segment_client_side: args.segment_client_side,
1114 test_only_dummy_segment_client: args.test_only_dummy_segment_client,
1115 launchdarkly_sdk_key: args.launchdarkly_sdk_key,
1116 launchdarkly_key_map: args
1117 .launchdarkly_key_map
1118 .into_iter()
1119 .map(|kv| (kv.key, kv.value))
1120 .collect(),
1121 config_sync_timeout: args.config_sync_timeout,
1122 config_sync_loop_interval: args.config_sync_loop_interval,
1123 config_sync_file_path: args.config_sync_file_path,
1124
1125 environment_id: args.environment_id,
1127 bootstrap_role: args.bootstrap_role,
1128 bootstrap_default_cluster_replica_size: args.bootstrap_default_cluster_replica_size,
1129 bootstrap_default_cluster_replication_factor: args
1130 .bootstrap_default_cluster_replication_factor,
1131 bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1132 size: args.bootstrap_builtin_system_cluster_replica_size,
1133 replication_factor: args.bootstrap_builtin_system_cluster_replication_factor,
1134 },
1135 bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1136 size: args.bootstrap_builtin_catalog_server_cluster_replica_size,
1137 replication_factor: args
1138 .bootstrap_builtin_catalog_server_cluster_replication_factor,
1139 },
1140 bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1141 size: args.bootstrap_builtin_probe_cluster_replica_size,
1142 replication_factor: args.bootstrap_builtin_probe_cluster_replication_factor,
1143 },
1144 bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1145 size: args.bootstrap_builtin_support_cluster_replica_size,
1146 replication_factor: args.bootstrap_builtin_support_cluster_replication_factor,
1147 },
1148 bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1149 size: args.bootstrap_builtin_analytics_cluster_replica_size,
1150 replication_factor: args.bootstrap_builtin_analytics_cluster_replication_factor,
1151 },
1152 system_parameter_defaults: args
1153 .system_parameter_default
1154 .into_iter()
1155 .map(|kv| (kv.key, kv.value))
1156 .collect(),
1157 helm_chart_version: args.helm_chart_version.clone(),
1158 license_key,
1159 aws_account_id: args.aws_account_id,
1161 aws_privatelink_availability_zones: args.aws_privatelink_availability_zones,
1162 metrics_registry,
1164 tracing_handle,
1165 now,
1167 force_builtin_schema_migration,
1168 })
1169 .await
1170 .maybe_terminate("booting server")?;
1171 Ok::<_, anyhow::Error>(server)
1172 })?;
1173 info!(
1174 "startup: envd init: serving complete in {:?}",
1175 serve_start.elapsed()
1176 );
1177
1178 let start_duration = envd_start.elapsed();
1179 metrics
1180 .start_time_environmentd
1181 .set(start_duration.as_millis().try_into().expect("must fit"));
1182 let span = span.exit();
1183 let id = span.context().span().span_context().trace_id();
1184 drop(span);
1185
1186 info!("startup: envd init: complete in {start_duration:?}");
1187
1188 println!(
1189 "environmentd {} listening...",
1190 BUILD_INFO.human_version(args.helm_chart_version)
1191 );
1192 for (name, handle) in &server.sql_listener_handles {
1193 println!("{} SQL address: {}", name, handle.local_addr);
1194 }
1195 for (name, handle) in &server.http_listener_handles {
1196 println!("{} HTTP address: {}", name, handle.local_addr);
1197 }
1198 println!(
1200 " Internal Persist PubSub address: {}",
1201 args.internal_persist_pubsub_listen_addr
1202 );
1203
1204 println!(" Root trace ID: {id}");
1205
1206 loop {
1208 thread::park();
1209 }
1210}
1211
1212fn build_info() -> Vec<String> {
1213 let openssl_version =
1214 unsafe { CStr::from_ptr(openssl_sys::OpenSSL_version(openssl_sys::OPENSSL_VERSION)) };
1215 let rdkafka_version = unsafe { CStr::from_ptr(rdkafka_sys::bindings::rd_kafka_version_str()) };
1216 vec![
1217 openssl_version.to_string_lossy().into_owned(),
1218 format!("librdkafka v{}", rdkafka_version.to_string_lossy()),
1219 ]
1220}
1221
1222#[derive(Debug, Clone)]
1223struct Metrics {
1224 pub start_time_environmentd: IntGauge,
1225}
1226
1227impl Metrics {
1228 pub fn register_into(registry: &MetricsRegistry, build_info: BuildInfo) -> Metrics {
1229 Metrics {
1230 start_time_environmentd: registry.register(metric!(
1231 name: "mz_start_time_environmentd",
1232 help: "Time in milliseconds from environmentd start until the adapter is ready.",
1233 const_labels: {
1234 "version" => build_info.version,
1235 "build_type" => if cfg!(release) { "release" } else { "debug" }
1236 },
1237 )),
1238 }
1239 }
1240}