Skip to main content

mz_environmentd/environmentd/
main.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Manages a single Materialize environment.
11//!
12//! It listens for SQL connections on port 6875 (MTRL) and for HTTP connections
13//! on port 6876.
14
15use std::ffi::CStr;
16use std::fs::File;
17use std::net::{IpAddr, SocketAddr};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::LazyLock;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use std::{cmp, env, iter, thread};
24
25use anyhow::{Context, bail};
26use clap::{ArgAction, Parser, ValueEnum};
27use fail::FailScenario;
28use http::header::HeaderValue;
29use ipnet::IpNet;
30use itertools::Itertools;
31use mz_adapter::ResultExt;
32use mz_adapter_types::bootstrap_builtin_cluster_config::{
33    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
34    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, DEFAULT_REPLICATION_FACTOR,
35    PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR, SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
36    SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
37};
38use mz_auth::password::Password;
39use mz_aws_secrets_controller::AwsSecretsController;
40use mz_build_info::BuildInfo;
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController};
43use mz_controller::{ControllerConfig, ReplicaHttpLocator};
44use mz_frontegg_auth::{Authenticator as FronteggAuthenticator, FronteggCliArgs};
45use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
46use mz_orchestrator::Orchestrator;
47use mz_orchestrator_kubernetes::{
48    KubernetesImagePullPolicy, KubernetesOrchestrator, KubernetesOrchestratorConfig,
49};
50use mz_orchestrator_process::{
51    ProcessOrchestrator, ProcessOrchestratorConfig, ProcessOrchestratorTcpProxyConfig,
52};
53use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs, TracingOrchestrator};
54use mz_ore::cli::{self, CliConfig, KeyValueArg};
55use mz_ore::error::ErrorExt;
56use mz_ore::metric;
57use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
58use mz_ore::now::SYSTEM_TIME;
59use mz_ore::task::RuntimeExt;
60use mz_ore::url::SensitiveUrl;
61use mz_persist_client::PersistLocation;
62use mz_persist_client::cache::PersistClientCache;
63use mz_persist_client::cfg::PersistConfig;
64use mz_persist_client::rpc::{
65    MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
66};
67use mz_secrets::SecretsController;
68use mz_server_core::TlsCliArgs;
69use mz_service::emit_boot_diagnostics;
70use mz_service::secrets::{SecretsControllerKind, SecretsReaderCliArgs};
71use mz_sql::catalog::EnvironmentId;
72use mz_storage_types::connections::ConnectionContext;
73use opentelemetry::trace::TraceContextExt;
74use prometheus::IntGauge;
75use tracing::{Instrument, error, info, info_span, warn};
76use tracing_opentelemetry::OpenTelemetrySpanExt;
77use url::Url;
78
79use crate::environmentd::sys;
80use crate::{BUILD_INFO, CatalogConfig, ListenerConfig, Listeners, ListenersConfig};
81
82static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
83static LONG_VERSION: LazyLock<String> = LazyLock::new(|| {
84    iter::once(BUILD_INFO.human_version(None))
85        .chain(build_info())
86        .join("\n")
87});
88
89/// Manages a single Materialize environment.
90#[derive(Parser, Debug)]
91#[clap(
92    name = "environmentd",
93    next_line_help = true,
94    version = VERSION.as_str(),
95    long_version = LONG_VERSION.as_str(),
96)]
97pub struct Args {
98    // === Special modes. ===
99    /// Enable unsafe features. Unsafe features are those that should never run
100    /// in production but are appropriate for testing/local development.
101    #[clap(long, env = "UNSAFE_MODE")]
102    unsafe_mode: bool,
103    /// Enables all feature flags, meant only as a tool for local development;
104    /// this should never be enabled in CI.
105    #[clap(long, env = "ALL_FEATURES")]
106    all_features: bool,
107
108    // === Connection options. ===
109    /// Path to a file containing the json-formatted configuration of our
110    /// metrics, HTTP, and sql listeners.
111    #[clap(
112        long,
113        env = "LISTENERS_CONFIG_PATH",
114        value_name = "PATH",
115        action = ArgAction::Set,
116    )]
117    listeners_config_path: PathBuf,
118    /// Password for the mz_system user.
119    #[clap(
120        long,
121        env = "EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM",
122        action = ArgAction::Set,
123    )]
124    external_login_password_mz_system: Option<Password>,
125    /// The address on which to listen for Persist PubSub connections.
126    ///
127    /// Connections to this address are not subject to encryption, authentication,
128    /// or access control. Care should be taken to not expose the listen address
129    /// to the public internet or other unauthorized parties.
130    #[clap(
131        long,
132        value_name = "HOST:PORT",
133        env = "INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR",
134        default_value = "127.0.0.1:6879",
135        action = ArgAction::Set,
136    )]
137    internal_persist_pubsub_listen_addr: SocketAddr,
138    /// Enable cross-origin resource sharing (CORS) for HTTP requests from the
139    /// specified origin.
140    ///
141    /// The default allows all local connections.
142    /// "*" allows all.
143    /// "*.domain.com" allows connections from any matching subdomain.
144    ///
145    /// Wildcards in other positions (e.g., "https://*.foo.com" or "https://foo.*.com") have no effect.
146    #[structopt(long, env = "CORS_ALLOWED_ORIGIN")]
147    cors_allowed_origin: Vec<HeaderValue>,
148    /// Public CIDR which the cloud environment has configured for
149    /// egress.
150    #[clap(
151        long,
152        env = "ANNOUNCE_EGRESS_ADDRESS",
153        action = ArgAction::Append,
154        use_value_delimiter = true
155    )]
156    announce_egress_address: Vec<IpNet>,
157    /// The external host name to connect to the HTTP server of this
158    /// environment.
159    ///
160    /// Used to render absolute URLs the server publishes: webhook URLs in
161    /// notices and the system catalog, and the OAuth Protected Resource
162    /// Metadata `resource`/`resource_metadata` URLs (RFC 9728). Not used to
163    /// establish connections directly. When unset, those URLs fall back to
164    /// the request's `Host` header.
165    #[clap(long, env = "HTTP_HOST_NAME")]
166    http_host_name: Option<String>,
167    /// The URL of the Materialize console to proxy from the /internal-console
168    /// endpoint on the internal HTTP server.
169    #[clap(long, env = "INTERNAL_CONSOLE_REDIRECT_URL")]
170    internal_console_redirect_url: Option<String>,
171    /// TLS arguments.
172    #[clap(flatten)]
173    tls: TlsCliArgs,
174    /// Frontegg arguments.
175    #[clap(flatten)]
176    frontegg: FronteggCliArgs,
177    // === Orchestrator options. ===
178    /// The service orchestrator implementation to use.
179    #[structopt(long, value_enum, env = "ORCHESTRATOR")]
180    orchestrator: OrchestratorKind,
181    /// Name of a non-default Kubernetes scheduler, if any.
182    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SCHEDULER_NAME")]
183    orchestrator_kubernetes_scheduler_name: Option<String>,
184    /// Annotations to apply to all services created by the Kubernetes orchestrator
185    /// in the form `KEY=VALUE`.
186    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ANNOTATION")]
187    orchestrator_kubernetes_service_annotation: Vec<KeyValueArg<String, String>>,
188    /// Labels to apply to all services created by the Kubernetes orchestrator
189    /// in the form `KEY=VALUE`.
190    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_LABEL")]
191    orchestrator_kubernetes_service_label: Vec<KeyValueArg<String, String>>,
192    /// Node selector to apply to all services created by the Kubernetes
193    /// orchestrator in the form `KEY=VALUE`.
194    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_NODE_SELECTOR")]
195    orchestrator_kubernetes_service_node_selector: Vec<KeyValueArg<String, String>>,
196    /// Affinity to apply to all services created by the Kubernetes
197    /// orchestrator as a JSON string.
198    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_AFFINITY")]
199    orchestrator_kubernetes_service_affinity: Option<String>,
200    /// Tolerations to apply to all services created by the Kubernetes
201    /// orchestrator as a JSON string.
202    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_TOLERATIONS")]
203    orchestrator_kubernetes_service_tolerations: Option<String>,
204    /// The name of a service account to apply to all services created by the
205    /// Kubernetes orchestrator.
206    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ACCOUNT")]
207    orchestrator_kubernetes_service_account: Option<String>,
208    /// The Kubernetes context to use with the Kubernetes orchestrator.
209    ///
210    /// This defaults to `minikube` to prevent disaster (e.g., connecting to a
211    /// production cluster that happens to be the active Kubernetes context.)
212    #[structopt(
213        long,
214        env = "ORCHESTRATOR_KUBERNETES_CONTEXT",
215        default_value = "minikube"
216    )]
217    orchestrator_kubernetes_context: String,
218    /// The image pull policy to use for services created by the Kubernetes
219    /// orchestrator.
220    #[structopt(
221        long,
222        env = "ORCHESTRATOR_KUBERNETES_IMAGE_PULL_POLICY",
223        default_value = "always",
224        value_enum
225    )]
226    orchestrator_kubernetes_image_pull_policy: KubernetesImagePullPolicy,
227    /// The init container for services created by the Kubernetes orchestrator.
228    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_INIT_CONTAINER_IMAGE")]
229    orchestrator_kubernetes_init_container_image: Option<String>,
230    /// The Kubernetes StorageClass to use for the ephemeral volume attached to
231    /// services that request disk.
232    ///
233    /// If unspecified, the Kubernetes orchestrator will refuse to create
234    /// services that request disk.
235    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_EPHEMERAL_VOLUME_CLASS")]
236    orchestrator_kubernetes_ephemeral_volume_class: Option<String>,
237    /// The optional fs group for service's pods' `securityContext`.
238    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_FS_GROUP")]
239    orchestrator_kubernetes_service_fs_group: Option<i64>,
240    /// The prefix to prepend to all kubernetes object names.
241    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_NAME_PREFIX")]
242    orchestrator_kubernetes_name_prefix: Option<String>,
243    /// Whether to enable pod metrics collection.
244    ///
245    /// Required for resource usage graphs in the console.
246    /// Requires metrics-server to be installed.
247    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_DISABLE_POD_METRICS_COLLECTION")]
248    orchestrator_kubernetes_disable_pod_metrics_collection: bool,
249    /// Whether to annotate pods for prometheus service discovery.
250    #[clap(
251        long,
252        env = "ORCHESTRATOR_KUBERNETES_ENABLE_PROMETHEUS_SCRAPE_ANNOTATIONS"
253    )]
254    orchestrator_kubernetes_enable_prometheus_scrape_annotations: bool,
255    #[clap(long, env = "ORCHESTRATOR_PROCESS_WRAPPER")]
256    orchestrator_process_wrapper: Option<String>,
257    /// Where the process orchestrator should store secrets.
258    #[clap(
259        long,
260        env = "ORCHESTRATOR_PROCESS_SECRETS_DIRECTORY",
261        value_name = "PATH",
262        required_if_eq("orchestrator", "process")
263    )]
264    orchestrator_process_secrets_directory: Option<PathBuf>,
265    /// Whether the process orchestrator should handle crashes in child
266    /// processes by crashing the parent process.
267    #[clap(long, env = "ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES")]
268    orchestrator_process_propagate_crashes: bool,
269    /// An IP address on which the process orchestrator should bind TCP proxies
270    /// for Unix domain sockets.
271    ///
272    /// When specified, for each named port of each created service, the process
273    /// orchestrator will bind a TCP listener to the specified address that
274    /// proxies incoming connections to the underlying Unix domain socket. The
275    /// allocated TCP port will be emitted as a tracing event.
276    ///
277    /// The primary use is live debugging the running child services via tools
278    /// that do not support Unix domain sockets (e.g., Prometheus, web
279    /// browsers).
280    #[clap(long, env = "ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR")]
281    orchestrator_process_tcp_proxy_listen_addr: Option<IpAddr>,
282    /// A directory in which the process orchestrator should write Prometheus
283    /// scrape targets, for use with Prometheus's file-based service discovery.
284    ///
285    /// Each namespaced orchestrator will maintain a single JSON file into the
286    /// directory named `NAMESPACE.json` containing the scrape targets for all
287    /// extant services. The scrape targets will use the TCP proxy address, as
288    /// Prometheus does not support scraping over Unix domain sockets.
289    ///
290    /// This option is ignored unless
291    /// `--orchestrator-process-tcp-proxy-listen-addr` is set.
292    ///
293    /// See also: <https://prometheus.io/docs/guides/file-sd/>
294    #[clap(
295        long,
296        env = "ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY"
297    )]
298    orchestrator_process_prometheus_service_discovery_directory: Option<PathBuf>,
299    /// A scratch directory that orchestrated processes can use for ephemeral storage.
300    #[clap(
301        long,
302        env = "ORCHESTRATOR_PROCESS_SCRATCH_DIRECTORY",
303        value_name = "PATH"
304    )]
305    orchestrator_process_scratch_directory: Option<PathBuf>,
306    /// Whether to use coverage build and collect coverage information. Not to be used for
307    /// production, only testing.
308    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_COVERAGE")]
309    orchestrator_kubernetes_coverage: bool,
310    /// The secrets controller implementation to use.
311    #[structopt(
312        long,
313        value_enum,
314        env = "SECRETS_CONTROLLER",
315        default_value_ifs([
316            ("orchestrator", "kubernetes", Some("kubernetes")),
317            ("orchestrator", "process", Some("local-file"))
318        ]),
319        default_value("kubernetes"), // This shouldn't be possible, but it makes clap happy.
320    )]
321    secrets_controller: SecretsControllerKind,
322    /// The list of tags to be set on AWS Secrets Manager secrets created by the
323    /// AWS secrets controller.
324    #[clap(
325        long,
326        env = "AWS_SECRETS_CONTROLLER_TAGS",
327        action = ArgAction::Append,
328        value_delimiter = ';',
329        required_if_eq("secrets_controller", "aws-secrets-manager")
330    )]
331    aws_secrets_controller_tags: Vec<KeyValueArg<String, String>>,
332    /// The clusterd image reference to use.
333    #[structopt(
334        long,
335        env = "CLUSTERD_IMAGE",
336        required_if_eq("orchestrator", "kubernetes"),
337        default_value_if("orchestrator", "process", Some("clusterd"))
338    )]
339    clusterd_image: Option<String>,
340    /// A number representing the environment's generation.
341    ///
342    /// This is incremented to request that the new process perform a graceful
343    /// transition of power from the prior generation.
344    #[clap(long, env = "DEPLOY_GENERATION", default_value = "0")]
345    deploy_generation: u64,
346
347    /// Can be provided in place of both persist_consensus_url and
348    /// timestamp_oracle_url in order to point both at the same backend
349    #[clap(
350        long,
351        env = "METADATA_BACKEND_URL",
352        conflicts_with_all = &[
353            "persist_consensus_url",
354            "timestamp_oracle_url",
355        ],
356    )]
357    metadata_backend_url: Option<SensitiveUrl>,
358
359    /// Helm chart version for self-hosted Materialize. This version is supposed to correspond to
360    /// the Materialize (core) version. This version is displayed in addition in `SELECT
361    /// mz_version()` if set and if it differs from the Materialize (core) version (which it should
362    /// not!).
363    #[clap(long, env = "HELM_CHART_VERSION")]
364    helm_chart_version: Option<String>,
365
366    // === Storage options. ===
367    /// Where the persist library should store its blob data.
368    #[clap(long, env = "PERSIST_BLOB_URL")]
369    persist_blob_url: SensitiveUrl,
370    /// Where the persist library should perform consensus.
371    #[clap(long, env = "PERSIST_CONSENSUS_URL")]
372    persist_consensus_url: Option<SensitiveUrl>,
373    /// The Persist PubSub URL.
374    ///
375    /// This URL is passed to `clusterd` for discovery of the Persist PubSub service.
376    #[clap(
377        long,
378        env = "PERSIST_PUBSUB_URL",
379        default_value = "http://localhost:6879"
380    )]
381    persist_pubsub_url: String,
382    /// The number of worker threads created for the IsolatedRuntime used for
383    /// storage related tasks. A negative value will subtract from the number
384    /// of threads returned by [`num_cpus::get`].
385    #[clap(long, env = "PERSIST_ISOLATED_RUNTIME_THREADS")]
386    persist_isolated_runtime_threads: Option<isize>,
387    /// The interval in seconds at which to collect storage usage information.
388    #[clap(
389        long,
390        env = "STORAGE_USAGE_COLLECTION_INTERVAL",
391        value_parser = humantime::parse_duration,
392        default_value = "3600s"
393    )]
394    storage_usage_collection_interval_sec: Duration,
395    /// The period for which to retain usage records. Note that the retention
396    /// period is only evaluated at server start time, so rebooting the server
397    /// is required to discard old records.
398    #[clap(long, env = "STORAGE_USAGE_RETENTION_PERIOD", value_parser = humantime::parse_duration)]
399    storage_usage_retention_period: Option<Duration>,
400
401    // === Adapter options. ===
402    /// The PostgreSQL URL for the Postgres-backed timestamp oracle.
403    #[clap(long, env = "TIMESTAMP_ORACLE_URL", value_name = "POSTGRES_URL")]
404    timestamp_oracle_url: Option<SensitiveUrl>,
405    /// Availability zones in which storage and compute resources may be
406    /// deployed.
407    #[clap(long, env = "AVAILABILITY_ZONE", use_value_delimiter = true)]
408    availability_zone: Vec<String>,
409    /// A map from size name to resource allocations for cluster replicas.
410    #[clap(
411        long,
412        env = "CLUSTER_REPLICA_SIZES",
413        requires = "bootstrap_default_cluster_replica_size"
414    )]
415    cluster_replica_sizes: String,
416    /// An API key for Segment. Enables export of audit events to Segment.
417    #[clap(long, env = "SEGMENT_API_KEY")]
418    segment_api_key: Option<String>,
419    /// Whether the Segment client is being used on the client side
420    /// (rather than the server side).
421    ///
422    /// Enabling this causes the Segment server to record the IP address from
423    /// which the event was sent.
424    #[clap(long, env = "SEGMENT_CLIENT_SIDE")]
425    segment_client_side: bool,
426    /// Only create a dummy segment client when no segment api key is provided, only to get more
427    /// testing coverage.
428    #[clap(long, env = "TEST_ONLY_DUMMY_SEGMENT_CLIENT")]
429    test_only_dummy_segment_client: bool,
430    /// An SDK key for LaunchDarkly.
431    ///
432    /// Setting this in combination with [`Self::config_sync_loop_interval`]
433    /// will enable synchronization of LaunchDarkly features with system
434    /// configuration parameters.
435    #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")]
436    launchdarkly_sdk_key: Option<String>,
437    /// A list of PARAM_NAME=KEY_NAME pairs from system parameter names to
438    /// LaunchDarkly feature keys.
439    ///
440    /// This is used (so far only for testing purposes) when propagating values
441    /// from the latter to the former. The identity map is assumed for absent
442    /// parameter names.
443    #[clap(
444        long,
445        env = "LAUNCHDARKLY_KEY_MAP",
446        action = ArgAction::Append,
447        value_delimiter = ';'
448    )]
449    launchdarkly_key_map: Vec<KeyValueArg<String, String>>,
450    /// The duration at which the system parameter synchronization times out during startup.
451    #[clap(
452        long,
453        env = "CONFIG_SYNC_TIMEOUT",
454        value_parser = humantime::parse_duration,
455        default_value = "30s"
456    )]
457    config_sync_timeout: Duration,
458    /// The interval in seconds at which to synchronize system parameter values.
459    ///
460    /// If this is not explicitly set, the loop that synchronizes LaunchDarkly
461    /// features with system configuration parameters will not run _even if
462    /// [`Self::launchdarkly_sdk_key`] is present_.
463    #[clap(
464        long,
465        env = "CONFIG_SYNC_LOOP_INTERVAL",
466        value_parser = humantime::parse_duration,
467    )]
468    config_sync_loop_interval: Option<Duration>,
469    /// Path to a JSON file containing system parameter values.
470    /// If specified, this file will be used instead of LaunchDarkly for configuration.
471    #[clap(long, env = "CONFIG_SYNC_FILE_PATH", value_name = "PATH")]
472    config_sync_file_path: Option<PathBuf>,
473
474    // === Bootstrap options. ===
475    #[clap(
476        long,
477        env = "ENVIRONMENT_ID",
478        value_name = "<CLOUD>-<REGION>-<ORG-ID>-<ORDINAL>"
479    )]
480    environment_id: EnvironmentId,
481    /// If set, a role with the provided name will be created with `CREATEDB`
482    /// and `CREATECLUSTER` attributes. It will also have `CREATE` privileges on
483    /// the `materialize` database, `materialize.public` schema, and
484    /// `quickstart` cluster.
485    ///
486    /// This option is meant for local development and testing to simplify the
487    /// initial process of granting attributes and privileges to some default
488    /// role.
489    #[clap(long, env = "BOOTSTRAP_ROLE")]
490    bootstrap_role: Option<String>,
491    /// The size of the default cluster replica if bootstrapping.
492    #[clap(
493        long,
494        env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
495        default_value = "scale=1,workers=1"
496    )]
497    bootstrap_default_cluster_replica_size: String,
498    /// The size of the builtin system cluster replicas if bootstrapping.
499    #[clap(
500        long,
501        env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
502        default_value = "scale=1,workers=1"
503    )]
504    bootstrap_builtin_system_cluster_replica_size: String,
505    /// The size of the builtin catalog server cluster replicas if bootstrapping.
506    #[clap(
507        long,
508        env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
509        default_value = "scale=1,workers=1"
510    )]
511    bootstrap_builtin_catalog_server_cluster_replica_size: String,
512    /// The size of the builtin probe cluster replicas if bootstrapping.
513    #[clap(
514        long,
515        env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
516        default_value = "scale=1,workers=1"
517    )]
518    bootstrap_builtin_probe_cluster_replica_size: String,
519    /// The size of the builtin support cluster replicas if bootstrapping.
520    #[clap(
521        long,
522        env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
523        default_value = "scale=1,workers=1"
524    )]
525    bootstrap_builtin_support_cluster_replica_size: String,
526    /// The size of the builtin analytics cluster replicas if bootstrapping.
527    #[clap(
528        long,
529        env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
530        default_value = "scale=1,workers=1"
531    )]
532    bootstrap_builtin_analytics_cluster_replica_size: String,
533    #[clap(
534        long,
535        env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR",
536        default_value = DEFAULT_REPLICATION_FACTOR.to_string(),
537        value_parser = clap::value_parser!(u32).range(0..=2)
538    )]
539    bootstrap_default_cluster_replication_factor: u32,
540    /// The replication factor of the builtin system cluster replicas if bootstrapping.
541    #[clap(
542        long,
543        env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR",
544        default_value = SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
545        value_parser = clap::value_parser!(u32).range(0..=2)
546    )]
547    bootstrap_builtin_system_cluster_replication_factor: u32,
548    /// The replication factor of the builtin catalog server cluster replicas if bootstrapping.
549    #[clap(
550        long,
551        env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICATION_FACTOR",
552        default_value = CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
553        value_parser = clap::value_parser!(u32).range(0..=2)
554    )]
555    bootstrap_builtin_catalog_server_cluster_replication_factor: u32,
556    /// The replication factor of the builtin probe cluster replicas if bootstrapping.
557    #[clap(
558        long,
559        env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR",
560        default_value = PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
561        value_parser = clap::value_parser!(u32).range(0..=2)
562    )]
563    bootstrap_builtin_probe_cluster_replication_factor: u32,
564    /// The replication factor of the builtin support cluster replicas if bootstrapping.
565    #[clap(
566        long,
567        env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICATION_FACTOR",
568        default_value = SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
569        value_parser = clap::value_parser!(u32).range(0..=2)
570    )]
571    bootstrap_builtin_support_cluster_replication_factor: u32,
572    /// The replication factor of the builtin analytics cluster replicas if bootstrapping.
573    #[clap(
574        long,
575        env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICATION_FACTOR",
576        default_value = ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
577        value_parser = clap::value_parser!(u32).range(0..=2)
578    )]
579    bootstrap_builtin_analytics_cluster_replication_factor: u32,
580    /// An list of NAME=VALUE pairs used to override static defaults
581    /// for system parameters.
582    #[clap(
583        long,
584        env = "SYSTEM_PARAMETER_DEFAULT",
585        action = ArgAction::Append,
586        value_delimiter = ';'
587    )]
588    system_parameter_default: Vec<KeyValueArg<String, String>>,
589    /// File containing a valid Materialize license key.
590    #[clap(long, env = "LICENSE_KEY")]
591    license_key: Option<String>,
592
593    // === AWS options. ===
594    /// The AWS account ID, which will be used to generate ARNs for
595    /// Materialize-controlled AWS resources.
596    #[clap(long, env = "AWS_ACCOUNT_ID")]
597    aws_account_id: Option<String>,
598    /// The ARN for a Materialize-controlled role to assume before assuming
599    /// a customer's requested role for an AWS connection.
600    #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
601    aws_connection_role_arn: Option<String>,
602    /// Prefix for an external ID to be supplied to all AWS AssumeRole operations.
603    ///
604    /// Details: <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>
605    #[clap(long, env = "AWS_EXTERNAL_ID_PREFIX", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
606    aws_external_id_prefix: Option<AwsExternalIdPrefix>,
607    /// The list of supported AWS PrivateLink availability zone ids.
608    /// Must be zone IDs, of format e.g. "use-az1".
609    #[clap(
610        long,
611        env = "AWS_PRIVATELINK_AVAILABILITY_ZONES",
612        action = ArgAction::Append,
613        use_value_delimiter = true
614    )]
615    aws_privatelink_availability_zones: Option<Vec<String>>,
616
617    // === Tracing options. ===
618    #[clap(flatten)]
619    tracing: TracingCliArgs,
620
621    // === Testing options. ===
622    /// Forces the migration of all builtin storage collections using the
623    /// specified migration mechanism (either "evolution" or "replacement").
624    ///
625    /// This argument is meant for testing only and as the names suggests
626    /// should not be set in production.
627    #[clap(long, value_enum, requires = "unsafe_mode")]
628    unsafe_force_builtin_schema_migration: Option<String>,
629}
630
631#[derive(ValueEnum, Debug, Clone)]
632enum OrchestratorKind {
633    Kubernetes,
634    Process,
635}
636
637// TODO [Alex Hunt] move this to a shared function that can be imported by the
638// region-controller.
639fn aws_secrets_controller_prefix(env_id: &EnvironmentId) -> String {
640    format!("/user-managed/{}/", env_id)
641}
642fn aws_secrets_controller_key_alias(env_id: &EnvironmentId) -> String {
643    // TODO [Alex Hunt] move this to a shared function that can be imported by the
644    // region-controller.
645    format!("alias/customer_key_{}", env_id)
646}
647
648pub fn main() {
649    let args = cli::parse_args(CliConfig {
650        env_prefix: Some("MZ_"),
651        enable_version_flag: true,
652    });
653    if let Err(err) = run(args) {
654        panic!("environmentd: fatal: {}", err.display_with_causes());
655    }
656}
657
658fn run(mut args: Args) -> Result<(), anyhow::Error> {
659    mz_ore::panic::install_enhanced_handler();
660    let envd_start = Instant::now();
661
662    // Configure signal handling as soon as possible. We want signals to be
663    // handled to our liking ASAP.
664    sys::enable_sigusr2_coverage_dump()?;
665    sys::enable_termination_signal_cleanup()?;
666
667    let license_key = if let Some(license_key_file) = args.license_key {
668        let license_key_text = std::fs::read_to_string(&license_key_file)
669            .context("failed to open license key file")?;
670        let license_key = mz_license_keys::validate(license_key_text.trim())
671            .context("failed to validate license key file")?;
672        if license_key.expired {
673            let message = format!(
674                "The license key provided at {license_key_file} is expired! Please contact Materialize for assistance."
675            );
676            match license_key.expiration_behavior {
677                ExpirationBehavior::Warn | ExpirationBehavior::DisableClusterCreation => {
678                    warn!("{message}");
679                }
680                ExpirationBehavior::Disable => bail!("{message}"),
681            }
682        }
683        license_key
684    } else if matches!(args.orchestrator, OrchestratorKind::Kubernetes) {
685        bail!("--license-key is required when running in Kubernetes");
686    } else {
687        // license key checks are optional for the emulator
688        ValidatedLicenseKey::disabled()
689    };
690
691    // Configure testing options.
692    let force_builtin_schema_migration = args
693        .unsafe_force_builtin_schema_migration
694        .inspect(|_mechanism| assert!(args.unsafe_mode));
695
696    // Start Tokio runtime.
697
698    let ncpus_useful = usize::max(1, cmp::min(num_cpus::get(), num_cpus::get_physical()));
699    let runtime = Arc::new(
700        tokio::runtime::Builder::new_multi_thread()
701            .worker_threads(ncpus_useful)
702            .thread_stack_size(3 * 1024 * 1024) // 3 MiB
703            // The default thread name exceeds the Linux limit on thread name
704            // length, so pick something shorter.
705            .thread_name_fn(|| {
706                static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
707                let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
708                format!("tokio:work-{}", id)
709            })
710            .enable_all()
711            .build()?,
712    );
713
714    // Configure tracing to log the service name when using the process
715    // orchestrator, which intermingles log output from multiple services. Other
716    // orchestrators separate log output from different services.
717    args.tracing.log_prefix = if matches!(args.orchestrator, OrchestratorKind::Process) {
718        Some("environmentd".to_string())
719    } else {
720        None
721    };
722
723    let metrics_registry = MetricsRegistry::new();
724    let tracing_handle = runtime.block_on(args.tracing.configure_tracing(
725        StaticTracingConfig {
726            service_name: "environmentd",
727            build_info: BUILD_INFO,
728        },
729        metrics_registry.clone(),
730    ))?;
731    register_runtime_metrics("main", runtime.metrics(), &metrics_registry);
732
733    let span = tracing::info_span!("environmentd::run").entered();
734
735    info!("startup: envd init: beginning");
736    info!("startup: envd init: preamble beginning");
737
738    let metrics = Metrics::register_into(&metrics_registry, BUILD_INFO);
739
740    runtime.block_on(mz_alloc::register_metrics_into(&metrics_registry));
741    runtime.block_on(mz_metrics::register_metrics_into(
742        &metrics_registry,
743        mz_dyncfgs::all_dyncfgs(),
744    ));
745
746    // Initialize fail crate for failpoint support
747    let _failpoint_scenario = FailScenario::setup();
748
749    // Configure connections.
750    let tls = args.tls.into_config()?;
751    let frontegg_oauth_issuer_url = args.frontegg.oauth_issuer_url().map(str::to_string);
752    let frontegg = FronteggAuthenticator::from_args(args.frontegg, &metrics_registry)?;
753    let listeners_config: ListenersConfig = {
754        let f = File::open(args.listeners_config_path)?;
755        serde_json::from_reader(f)?
756    };
757
758    for (_, listener) in &listeners_config.sql {
759        listener
760            .validate()
761            .map_err(|e| anyhow::anyhow!("invalid SQL listener: {}", e))?;
762    }
763
764    for (_, listener) in &listeners_config.http {
765        listener
766            .validate()
767            .map_err(|e| anyhow::anyhow!("invalid HTTP listener: {}", e))?;
768    }
769
770    // Configure CORS.
771    let allowed_origins = if !args.cors_allowed_origin.is_empty() {
772        args.cors_allowed_origin
773    } else {
774        let mut allowed_origins = Vec::with_capacity(listeners_config.http.len() * 6);
775        for (_, listener) in &listeners_config.http {
776            let port = listener.addr().port();
777            allowed_origins.extend([
778                HeaderValue::from_str(&format!("http://localhost:{}", port)).unwrap(),
779                HeaderValue::from_str(&format!("http://127.0.0.1:{}", port)).unwrap(),
780                HeaderValue::from_str(&format!("http://[::1]:{}", port)).unwrap(),
781                HeaderValue::from_str(&format!("https://localhost:{}", port)).unwrap(),
782                HeaderValue::from_str(&format!("https://127.0.0.1:{}", port)).unwrap(),
783                HeaderValue::from_str(&format!("https://[::1]:{}", port)).unwrap(),
784            ])
785        }
786        allowed_origins
787    };
788    let cors_allowed_origin = mz_http_util::build_cors_allowed_origin(&allowed_origins);
789    let cors_allowed_origin_list = allowed_origins.clone();
790
791    // Configure controller.
792    let entered = info_span!("environmentd::configure_controller").entered();
793    let (orchestrator, secrets_controller, cloud_resource_controller): (
794        Arc<dyn Orchestrator>,
795        Arc<dyn SecretsController>,
796        Option<Arc<dyn CloudResourceController>>,
797    ) = match args.orchestrator {
798        OrchestratorKind::Kubernetes => {
799            if args.orchestrator_process_scratch_directory.is_some() {
800                bail!(
801                    "--orchestrator-process-scratch-directory is \
802                      not currently usable with the kubernetes orchestrator"
803                );
804            }
805
806            let orchestrator = Arc::new(
807                runtime
808                    .block_on(KubernetesOrchestrator::new(KubernetesOrchestratorConfig {
809                        context: args.orchestrator_kubernetes_context.clone(),
810                        scheduler_name: args.orchestrator_kubernetes_scheduler_name,
811                        service_annotations: args
812                            .orchestrator_kubernetes_service_annotation
813                            .into_iter()
814                            .map(|l| (l.key, l.value))
815                            .collect(),
816                        service_labels: args
817                            .orchestrator_kubernetes_service_label
818                            .into_iter()
819                            .map(|l| (l.key, l.value))
820                            .collect(),
821                        service_node_selector: args
822                            .orchestrator_kubernetes_service_node_selector
823                            .into_iter()
824                            .map(|l| (l.key, l.value))
825                            .collect(),
826                        service_affinity: args.orchestrator_kubernetes_service_affinity,
827                        service_tolerations: args.orchestrator_kubernetes_service_tolerations,
828                        service_account: args.orchestrator_kubernetes_service_account,
829                        image_pull_policy: args.orchestrator_kubernetes_image_pull_policy,
830                        aws_external_id_prefix: args.aws_external_id_prefix.clone(),
831                        coverage: args.orchestrator_kubernetes_coverage,
832                        ephemeral_volume_storage_class: args
833                            .orchestrator_kubernetes_ephemeral_volume_class
834                            .clone(),
835                        service_fs_group: args.orchestrator_kubernetes_service_fs_group.clone(),
836                        name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
837                        collect_pod_metrics: !args
838                            .orchestrator_kubernetes_disable_pod_metrics_collection,
839                        enable_prometheus_scrape_annotations: args
840                            .orchestrator_kubernetes_enable_prometheus_scrape_annotations,
841                    }))
842                    .context("creating kubernetes orchestrator")?,
843            );
844            let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
845                SecretsControllerKind::Kubernetes => {
846                    let sc = Arc::clone(&orchestrator);
847                    let sc: Arc<dyn SecretsController> = sc;
848                    sc
849                }
850                SecretsControllerKind::AwsSecretsManager => {
851                    Arc::new(
852                        runtime.block_on(AwsSecretsController::new(
853                            // TODO [Alex Hunt] move this to a shared function that can be imported by the
854                            // region-controller.
855                            &aws_secrets_controller_prefix(&args.environment_id),
856                            &aws_secrets_controller_key_alias(&args.environment_id),
857                            args.aws_secrets_controller_tags
858                                .into_iter()
859                                .map(|tag| (tag.key, tag.value))
860                                .collect(),
861                        )),
862                    )
863                }
864                SecretsControllerKind::LocalFile => bail!(
865                    "SecretsControllerKind::LocalFile is not compatible with Orchestrator::Kubernetes."
866                ),
867            };
868            let cloud_resource_controller = Arc::clone(&orchestrator);
869            (
870                orchestrator,
871                secrets_controller,
872                Some(cloud_resource_controller),
873            )
874        }
875        OrchestratorKind::Process => {
876            if args
877                .orchestrator_kubernetes_ephemeral_volume_class
878                .is_some()
879            {
880                bail!(
881                    "--orchestrator-kubernetes-ephemeral-volume-class is \
882                      not usable with the process orchestrator"
883                );
884            }
885            let orchestrator = Arc::new(
886                runtime
887                    .block_on(ProcessOrchestrator::new(ProcessOrchestratorConfig {
888                        // Look for binaries in the same directory as the
889                        // running binary. When running via `cargo run`, this
890                        // means that debug binaries look for other debug
891                        // binaries and release binaries look for other release
892                        // binaries.
893                        image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
894                        suppress_output: false,
895                        environment_id: args.environment_id.to_string(),
896                        secrets_dir: args
897                            .orchestrator_process_secrets_directory
898                            .clone()
899                            .expect("clap enforced"),
900                        command_wrapper: args
901                            .orchestrator_process_wrapper
902                            .map_or(Ok(vec![]), |s| shell_words::split(&s))?,
903                        propagate_crashes: args.orchestrator_process_propagate_crashes,
904                        tcp_proxy: args.orchestrator_process_tcp_proxy_listen_addr.map(
905                            |listen_addr| ProcessOrchestratorTcpProxyConfig {
906                                listen_addr,
907                                prometheus_service_discovery_dir: args
908                                    .orchestrator_process_prometheus_service_discovery_directory,
909                            },
910                        ),
911                        scratch_directory: args
912                            .orchestrator_process_scratch_directory
913                            .expect("process orchestrator requires scratch directory"),
914                    }))
915                    .context("creating process orchestrator")?,
916            );
917            let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
918                SecretsControllerKind::Kubernetes => bail!(
919                    "SecretsControllerKind::Kubernetes is not compatible with Orchestrator::Process."
920                ),
921                SecretsControllerKind::AwsSecretsManager => Arc::new(
922                    runtime.block_on(AwsSecretsController::new(
923                        &aws_secrets_controller_prefix(&args.environment_id),
924                        &aws_secrets_controller_key_alias(&args.environment_id),
925                        args.aws_secrets_controller_tags
926                            .into_iter()
927                            .map(|tag| (tag.key, tag.value))
928                            .collect(),
929                    )),
930                ),
931                SecretsControllerKind::LocalFile => {
932                    let sc = Arc::clone(&orchestrator);
933                    let sc: Arc<dyn SecretsController> = sc;
934                    sc
935                }
936            };
937            (orchestrator, secrets_controller, None)
938        }
939    };
940    drop(entered);
941    let cloud_resource_reader = cloud_resource_controller.as_ref().map(|c| c.reader());
942    let secrets_reader = secrets_controller.reader();
943    let now = SYSTEM_TIME.clone();
944
945    let mut persist_config =
946        PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
947    // Start with compaction disabled, later enable it if we're not in read-only mode.
948    persist_config.disable_compaction();
949
950    let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
951    let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
952
953    match args.persist_isolated_runtime_threads {
954        // Use the default.
955        None | Some(0) => (),
956        Some(x @ ..=-1) => {
957            let threads = num_cpus::get().saturating_add_signed(x).max(1);
958            persist_config.isolated_runtime_worker_threads = threads;
959        }
960        Some(x @ 1..) => {
961            let threads = usize::try_from(x).expect("pattern matched a positive value");
962            persist_config.isolated_runtime_worker_threads = threads;
963        }
964    };
965
966    let _server = runtime.spawn_named(
967        || "persist::rpc::server",
968        async move {
969            info!(
970                "listening for Persist PubSub connections on {}",
971                args.internal_persist_pubsub_listen_addr
972            );
973            // Intentionally do not bubble up errors here, we don't want to take
974            // down environmentd if there are any issues with the pubsub server.
975            let res = persist_pubsub_server
976                .serve(args.internal_persist_pubsub_listen_addr)
977                .await;
978            error!("Persist Pubsub server exited {:?}", res);
979        }
980        .instrument(tracing::info_span!("persist::rpc::server")),
981    );
982
983    let persist_clients = {
984        // PersistClientCache may spawn tasks, so run within a tokio runtime context
985        let _tokio_guard = runtime.enter();
986        PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
987            let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
988                cfg,
989                persist_pubsub_client.sender,
990                metrics,
991            ));
992            PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
993        })
994    };
995
996    let consensus_uri = args.persist_consensus_url.unwrap_or_else(|| {
997        args.metadata_backend_url
998            .as_ref()
999            .map(|metadata_backend_url| {
1000                SensitiveUrl(
1001                    Url::parse_with_params(
1002                        metadata_backend_url.0.as_ref(),
1003                        &[("options", "--search_path=consensus")],
1004                    )
1005                    .unwrap(),
1006                )
1007            })
1008            .expect("either --persist-consensus-url or --metadata-backend-url must be provided")
1009    });
1010    let timestamp_oracle_url = args.timestamp_oracle_url.or_else(|| {
1011        args.metadata_backend_url
1012            .as_ref()
1013            .map(|metadata_backend_url| {
1014                SensitiveUrl(
1015                    Url::parse_with_params(
1016                        metadata_backend_url.0.as_ref(),
1017                        &[("options", "--search_path=tsoracle")],
1018                    )
1019                    .unwrap(),
1020                )
1021            })
1022    });
1023
1024    let persist_clients = Arc::new(persist_clients);
1025    let connection_context = ConnectionContext::from_cli_args(
1026        args.environment_id.to_string(),
1027        &args.tracing.startup_log_filter,
1028        args.aws_external_id_prefix,
1029        args.aws_connection_role_arn,
1030        secrets_reader,
1031        cloud_resource_reader,
1032    );
1033    let orchestrator = Arc::new(TracingOrchestrator::new(orchestrator, args.tracing.clone()));
1034    let replica_http_locator = Arc::new(ReplicaHttpLocator::default());
1035    let controller = ControllerConfig {
1036        build_info: &BUILD_INFO,
1037        orchestrator,
1038        persist_location: PersistLocation {
1039            blob_uri: args.persist_blob_url,
1040            consensus_uri,
1041        },
1042        persist_clients: Arc::clone(&persist_clients),
1043        clusterd_image: args.clusterd_image.expect("clap enforced"),
1044        init_container_image: args.orchestrator_kubernetes_init_container_image,
1045        deploy_generation: args.deploy_generation,
1046        now: SYSTEM_TIME.clone(),
1047        metrics_registry: metrics_registry.clone(),
1048        persist_pubsub_url: args.persist_pubsub_url,
1049        connection_context,
1050        // When serialized to args in the controller, only the relevant flags will be passed
1051        // through, so we just set all of them
1052        secrets_args: SecretsReaderCliArgs {
1053            secrets_reader: args.secrets_controller,
1054            secrets_reader_local_file_dir: args.orchestrator_process_secrets_directory,
1055            secrets_reader_kubernetes_context: Some(args.orchestrator_kubernetes_context),
1056            secrets_reader_aws_prefix: Some(aws_secrets_controller_prefix(&args.environment_id)),
1057            secrets_reader_name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
1058        },
1059        replica_http_locator: Arc::clone(&replica_http_locator),
1060    };
1061
1062    let cluster_replica_sizes = ClusterReplicaSizeMap::parse_from_str(
1063        &args.cluster_replica_sizes,
1064        !license_key.allow_credit_consumption_override,
1065    )
1066    .context("parsing replica size map")?;
1067
1068    emit_boot_diagnostics!(&BUILD_INFO);
1069    sys::adjust_rlimits();
1070
1071    info!(
1072        "startup: envd init: preamble complete in {:?}",
1073        envd_start.elapsed()
1074    );
1075
1076    let serve_start = Instant::now();
1077    info!("startup: envd init: serving beginning");
1078    let server = runtime.block_on(async {
1079        let listeners = Listeners::bind(listeners_config).await?;
1080        let catalog_config = CatalogConfig {
1081            persist_clients,
1082            metrics: Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry)),
1083        };
1084        let server = listeners
1085            .serve(crate::Config {
1086                // Special modes.
1087                unsafe_mode: args.unsafe_mode,
1088                all_features: args.all_features,
1089                // Connection options.
1090                tls,
1091                tls_reload_certs: mz_server_core::default_cert_reload_ticker(),
1092                external_login_password_mz_system: args.external_login_password_mz_system,
1093                frontegg,
1094                frontegg_oauth_issuer_url,
1095                cors_allowed_origin,
1096                cors_allowed_origin_list,
1097                egress_addresses: args.announce_egress_address,
1098                http_host_name: args.http_host_name,
1099                internal_console_redirect_url: args.internal_console_redirect_url,
1100                // Controller options.
1101                controller,
1102                secrets_controller,
1103                cloud_resource_controller,
1104                // Storage options.
1105                storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
1106                storage_usage_retention_period: args.storage_usage_retention_period,
1107                // Adapter options.
1108                catalog_config,
1109                availability_zones: args.availability_zone,
1110                cluster_replica_sizes,
1111                timestamp_oracle_url,
1112                segment_api_key: args.segment_api_key,
1113                segment_client_side: args.segment_client_side,
1114                test_only_dummy_segment_client: args.test_only_dummy_segment_client,
1115                launchdarkly_sdk_key: args.launchdarkly_sdk_key,
1116                launchdarkly_key_map: args
1117                    .launchdarkly_key_map
1118                    .into_iter()
1119                    .map(|kv| (kv.key, kv.value))
1120                    .collect(),
1121                config_sync_timeout: args.config_sync_timeout,
1122                config_sync_loop_interval: args.config_sync_loop_interval,
1123                config_sync_file_path: args.config_sync_file_path,
1124
1125                // Bootstrap options.
1126                environment_id: args.environment_id,
1127                bootstrap_role: args.bootstrap_role,
1128                bootstrap_default_cluster_replica_size: args.bootstrap_default_cluster_replica_size,
1129                bootstrap_default_cluster_replication_factor: args
1130                    .bootstrap_default_cluster_replication_factor,
1131                bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1132                    size: args.bootstrap_builtin_system_cluster_replica_size,
1133                    replication_factor: args.bootstrap_builtin_system_cluster_replication_factor,
1134                },
1135                bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1136                    size: args.bootstrap_builtin_catalog_server_cluster_replica_size,
1137                    replication_factor: args
1138                        .bootstrap_builtin_catalog_server_cluster_replication_factor,
1139                },
1140                bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1141                    size: args.bootstrap_builtin_probe_cluster_replica_size,
1142                    replication_factor: args.bootstrap_builtin_probe_cluster_replication_factor,
1143                },
1144                bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1145                    size: args.bootstrap_builtin_support_cluster_replica_size,
1146                    replication_factor: args.bootstrap_builtin_support_cluster_replication_factor,
1147                },
1148                bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1149                    size: args.bootstrap_builtin_analytics_cluster_replica_size,
1150                    replication_factor: args.bootstrap_builtin_analytics_cluster_replication_factor,
1151                },
1152                system_parameter_defaults: args
1153                    .system_parameter_default
1154                    .into_iter()
1155                    .map(|kv| (kv.key, kv.value))
1156                    .collect(),
1157                helm_chart_version: args.helm_chart_version.clone(),
1158                license_key,
1159                // AWS options.
1160                aws_account_id: args.aws_account_id,
1161                aws_privatelink_availability_zones: args.aws_privatelink_availability_zones,
1162                // Observability options.
1163                metrics_registry,
1164                tracing_handle,
1165                // Testing options.
1166                now,
1167                force_builtin_schema_migration,
1168            })
1169            .await
1170            .maybe_terminate("booting server")?;
1171        Ok::<_, anyhow::Error>(server)
1172    })?;
1173    info!(
1174        "startup: envd init: serving complete in {:?}",
1175        serve_start.elapsed()
1176    );
1177
1178    let start_duration = envd_start.elapsed();
1179    metrics
1180        .start_time_environmentd
1181        .set(start_duration.as_millis().try_into().expect("must fit"));
1182    let span = span.exit();
1183    let id = span.context().span().span_context().trace_id();
1184    drop(span);
1185
1186    info!("startup: envd init: complete in {start_duration:?}");
1187
1188    println!(
1189        "environmentd {} listening...",
1190        BUILD_INFO.human_version(args.helm_chart_version)
1191    );
1192    for (name, handle) in &server.sql_listener_handles {
1193        println!("{} SQL address: {}", name, handle.local_addr);
1194    }
1195    for (name, handle) in &server.http_listener_handles {
1196        println!("{} HTTP address: {}", name, handle.local_addr);
1197    }
1198    // TODO move persist pubsub address like metrics address?
1199    println!(
1200        " Internal Persist PubSub address: {}",
1201        args.internal_persist_pubsub_listen_addr
1202    );
1203
1204    println!(" Root trace ID: {id}");
1205
1206    // Block forever.
1207    loop {
1208        thread::park();
1209    }
1210}
1211
1212fn build_info() -> Vec<String> {
1213    let openssl_version =
1214        unsafe { CStr::from_ptr(openssl_sys::OpenSSL_version(openssl_sys::OPENSSL_VERSION)) };
1215    let rdkafka_version = unsafe { CStr::from_ptr(rdkafka_sys::bindings::rd_kafka_version_str()) };
1216    vec![
1217        openssl_version.to_string_lossy().into_owned(),
1218        format!("librdkafka v{}", rdkafka_version.to_string_lossy()),
1219    ]
1220}
1221
1222#[derive(Debug, Clone)]
1223struct Metrics {
1224    pub start_time_environmentd: IntGauge,
1225}
1226
1227impl Metrics {
1228    pub fn register_into(registry: &MetricsRegistry, build_info: BuildInfo) -> Metrics {
1229        Metrics {
1230            start_time_environmentd: registry.register(metric!(
1231                name: "mz_start_time_environmentd",
1232                help: "Time in milliseconds from environmentd start until the adapter is ready.",
1233                const_labels: {
1234                    "version" => build_info.version,
1235                    "build_type" => if cfg!(release) { "release" } else { "debug" }
1236                },
1237            )),
1238        }
1239    }
1240}