mz_environmentd/environmentd/
main.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Manages a single Materialize environment.
11//!
12//! It listens for SQL connections on port 6875 (MTRL) and for HTTP connections
13//! on port 6876.
14
15use std::ffi::CStr;
16use std::net::{IpAddr, SocketAddr};
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::sync::LazyLock;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::{Duration, Instant};
22use std::{cmp, env, iter, thread};
23
24use anyhow::{Context, bail};
25use clap::{ArgAction, Parser, ValueEnum};
26use fail::FailScenario;
27use http::header::HeaderValue;
28use ipnet::IpNet;
29use itertools::Itertools;
30use mz_adapter::ResultExt;
31use mz_adapter_types::bootstrap_builtin_cluster_config::{
32    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
33    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, DEFAULT_REPLICATION_FACTOR,
34    PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR, SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
35    SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
36};
37use mz_aws_secrets_controller::AwsSecretsController;
38use mz_build_info::BuildInfo;
39use mz_catalog::builtin::{
40    UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE,
41    UnsafeBuiltinTableFingerprintWhitespace,
42};
43use mz_catalog::config::ClusterReplicaSizeMap;
44use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController};
45use mz_controller::ControllerConfig;
46use mz_frontegg_auth::{Authenticator, FronteggCliArgs};
47use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
48use mz_orchestrator::Orchestrator;
49use mz_orchestrator_kubernetes::{
50    KubernetesImagePullPolicy, KubernetesOrchestrator, KubernetesOrchestratorConfig,
51};
52use mz_orchestrator_process::{
53    ProcessOrchestrator, ProcessOrchestratorConfig, ProcessOrchestratorTcpProxyConfig,
54};
55use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs, TracingOrchestrator};
56use mz_ore::cli::{self, CliConfig, KeyValueArg};
57use mz_ore::error::ErrorExt;
58use mz_ore::metric;
59use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
60use mz_ore::now::SYSTEM_TIME;
61use mz_ore::task::RuntimeExt;
62use mz_ore::url::SensitiveUrl;
63use mz_persist_client::PersistLocation;
64use mz_persist_client::cache::PersistClientCache;
65use mz_persist_client::cfg::PersistConfig;
66use mz_persist_client::rpc::{
67    MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
68};
69use mz_secrets::SecretsController;
70use mz_server_core::TlsCliArgs;
71use mz_service::emit_boot_diagnostics;
72use mz_service::secrets::{SecretsControllerKind, SecretsReaderCliArgs};
73use mz_sql::catalog::EnvironmentId;
74use mz_storage_types::connections::ConnectionContext;
75use opentelemetry::trace::TraceContextExt;
76use prometheus::IntGauge;
77use tracing::{Instrument, error, info, info_span, warn};
78use tracing_opentelemetry::OpenTelemetrySpanExt;
79use url::Url;
80
81use crate::environmentd::sys;
82use crate::{BUILD_INFO, CatalogConfig, Listeners, ListenersConfig};
83
84static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
85static LONG_VERSION: LazyLock<String> = LazyLock::new(|| {
86    iter::once(BUILD_INFO.human_version(None))
87        .chain(build_info())
88        .join("\n")
89});
90
91/// Manages a single Materialize environment.
92#[derive(Parser, Debug)]
93#[clap(
94    name = "environmentd",
95    next_line_help = true,
96    version = VERSION.as_str(),
97    long_version = LONG_VERSION.as_str(),
98)]
99pub struct Args {
100    // === Special modes. ===
101    /// Enable unsafe features. Unsafe features are those that should never run
102    /// in production but are appropriate for testing/local development.
103    #[clap(long, env = "UNSAFE_MODE")]
104    unsafe_mode: bool,
105    /// Enables all feature flags, meant only as a tool for local development;
106    /// this should never be enabled in CI.
107    #[clap(long, env = "ALL_FEATURES")]
108    all_features: bool,
109
110    // === Connection options. ===
111    /// The address on which to listen for untrusted SQL connections.
112    ///
113    /// Connections on this address are subject to encryption, authentication,
114    /// and authorization as specified by the `--tls-mode` and `--frontegg-auth`
115    /// options.
116    #[clap(
117        long,
118        env = "SQL_LISTEN_ADDR",
119        value_name = "HOST:PORT",
120        default_value = "127.0.0.1:6875",
121        action = ArgAction::Set,
122    )]
123    sql_listen_addr: SocketAddr,
124    /// The address on which to listen for untrusted HTTP connections.
125    ///
126    /// Connections on this address are subject to encryption, authentication,
127    /// and authorization as specified by the `--tls-mode` and `--frontegg-auth`
128    /// options.
129    #[clap(
130        long,
131        env = "HTTP_LISTEN_ADDR",
132        value_name = "HOST:PORT",
133        default_value = "127.0.0.1:6876",
134        action = ArgAction::Set,
135    )]
136    http_listen_addr: SocketAddr,
137    /// The address on which to listen for trusted SQL connections.
138    ///
139    /// Connections to this address are not subject to encryption, authentication,
140    /// or access control. Care should be taken to not expose this address to the
141    /// public internet
142    /// or other unauthorized parties.
143    #[clap(
144        long,
145        value_name = "HOST:PORT",
146        env = "INTERNAL_SQL_LISTEN_ADDR",
147        default_value = "127.0.0.1:6877",
148        action = ArgAction::Set,
149    )]
150    internal_sql_listen_addr: SocketAddr,
151    /// The address on which to listen for trusted HTTP connections.
152    ///
153    /// Connections to this address are not subject to encryption, authentication,
154    /// or access control. Care should be taken to not expose the listen address
155    /// to the public internet or other unauthorized parties.
156    #[clap(
157        long,
158        value_name = "HOST:PORT",
159        env = "INTERNAL_HTTP_LISTEN_ADDR",
160        default_value = "127.0.0.1:6878",
161        action = ArgAction::Set,
162    )]
163    internal_http_listen_addr: SocketAddr,
164    /// The address on which to listen for Persist PubSub connections.
165    ///
166    /// Connections to this address are not subject to encryption, authentication,
167    /// or access control. Care should be taken to not expose the listen address
168    /// to the public internet or other unauthorized parties.
169    #[clap(
170        long,
171        value_name = "HOST:PORT",
172        env = "INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR",
173        default_value = "127.0.0.1:6879",
174        action = ArgAction::Set,
175    )]
176    internal_persist_pubsub_listen_addr: SocketAddr,
177    /// Enable cross-origin resource sharing (CORS) for HTTP requests from the
178    /// specified origin.
179    ///
180    /// The default allows all local connections.
181    /// "*" allows all.
182    /// "*.domain.com" allows connections from any matching subdomain.
183    ///
184    /// Wildcards in other positions (e.g., "https://*.foo.com" or "https://foo.*.com") have no effect.
185    #[structopt(long, env = "CORS_ALLOWED_ORIGIN")]
186    cors_allowed_origin: Vec<HeaderValue>,
187    /// Public CIDR which the cloud environment has configured for
188    /// egress.
189    #[clap(
190        long,
191        env = "ANNOUNCE_EGRESS_ADDRESS",
192        action = ArgAction::Append,
193        use_value_delimiter = true
194    )]
195    announce_egress_address: Vec<IpNet>,
196    /// The external host name to connect to the HTTP server of this
197    /// environment.
198    ///
199    /// Presently used to render webhook URLs for end users in notices and the
200    /// system catalog. Not used to establish connections directly.
201    #[clap(long, env = "HTTP_HOST_NAME")]
202    http_host_name: Option<String>,
203    /// The URL of the Materialize console to proxy from the /internal-console
204    /// endpoint on the internal HTTP server.
205    #[clap(long, env = "INTERNAL_CONSOLE_REDIRECT_URL")]
206    internal_console_redirect_url: Option<String>,
207    /// TLS arguments.
208    #[clap(flatten)]
209    tls: TlsCliArgs,
210    /// Frontegg arguments.
211    #[clap(flatten)]
212    frontegg: FronteggCliArgs,
213    // TODO(auth): we probably want to consolidate all these auth options
214    // into something cleaner.
215    /// Self hosted auth
216    #[clap(long, env = "ENABLE_SELF_HOSTED_AUTH")]
217    enable_self_hosted_auth: bool,
218    /// Self hosted auth over internal port
219    #[clap(long, env = "ENABLE_SELF_HOSTED_AUTH_INTERNAL")]
220    enable_self_hosted_auth_internal: bool,
221    // === Orchestrator options. ===
222    /// The service orchestrator implementation to use.
223    #[structopt(long, value_enum, env = "ORCHESTRATOR")]
224    orchestrator: OrchestratorKind,
225    /// Name of a non-default Kubernetes scheduler, if any.
226    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SCHEDULER_NAME")]
227    orchestrator_kubernetes_scheduler_name: Option<String>,
228    /// Labels to apply to all services created by the Kubernetes orchestrator
229    /// in the form `KEY=VALUE`.
230    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_LABEL")]
231    orchestrator_kubernetes_service_label: Vec<KeyValueArg<String, String>>,
232    /// Node selector to apply to all services created by the Kubernetes
233    /// orchestrator in the form `KEY=VALUE`.
234    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_NODE_SELECTOR")]
235    orchestrator_kubernetes_service_node_selector: Vec<KeyValueArg<String, String>>,
236    /// The name of a service account to apply to all services created by the
237    /// Kubernetes orchestrator.
238    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ACCOUNT")]
239    orchestrator_kubernetes_service_account: Option<String>,
240    /// The Kubernetes context to use with the Kubernetes orchestrator.
241    ///
242    /// This defaults to `minikube` to prevent disaster (e.g., connecting to a
243    /// production cluster that happens to be the active Kubernetes context.)
244    #[structopt(
245        long,
246        env = "ORCHESTRATOR_KUBERNETES_CONTEXT",
247        default_value = "minikube"
248    )]
249    orchestrator_kubernetes_context: String,
250    /// The image pull policy to use for services created by the Kubernetes
251    /// orchestrator.
252    #[structopt(
253        long,
254        env = "ORCHESTRATOR_KUBERNETES_IMAGE_PULL_POLICY",
255        default_value = "always",
256        value_enum
257    )]
258    orchestrator_kubernetes_image_pull_policy: KubernetesImagePullPolicy,
259    /// The init container for services created by the Kubernetes orchestrator.
260    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_INIT_CONTAINER_IMAGE")]
261    orchestrator_kubernetes_init_container_image: Option<String>,
262    /// The Kubernetes StorageClass to use for the ephemeral volume attached to
263    /// services that request disk.
264    ///
265    /// If unspecified, the Kubernetes orchestrator will refuse to create
266    /// services that request disk.
267    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_EPHEMERAL_VOLUME_CLASS")]
268    orchestrator_kubernetes_ephemeral_volume_class: Option<String>,
269    /// The optional fs group for service's pods' `securityContext`.
270    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_FS_GROUP")]
271    orchestrator_kubernetes_service_fs_group: Option<i64>,
272    /// The prefix to prepend to all kubernetes object names.
273    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_NAME_PREFIX")]
274    orchestrator_kubernetes_name_prefix: Option<String>,
275    /// Whether to enable pod metrics collection.
276    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_DISABLE_POD_METRICS_COLLECTION")]
277    orchestrator_kubernetes_disable_pod_metrics_collection: bool,
278    /// Whether to annotate pods for prometheus service discovery.
279    #[clap(
280        long,
281        env = "ORCHESTRATOR_KUBERNETES_ENABLE_PROMETHEUS_SCRAPE_ANNOTATIONS"
282    )]
283    orchestrator_kubernetes_enable_prometheus_scrape_annotations: bool,
284    #[clap(long, env = "ORCHESTRATOR_PROCESS_WRAPPER")]
285    orchestrator_process_wrapper: Option<String>,
286    /// Where the process orchestrator should store secrets.
287    #[clap(
288        long,
289        env = "ORCHESTRATOR_PROCESS_SECRETS_DIRECTORY",
290        value_name = "PATH",
291        required_if_eq("orchestrator", "process")
292    )]
293    orchestrator_process_secrets_directory: Option<PathBuf>,
294    /// Whether the process orchestrator should handle crashes in child
295    /// processes by crashing the parent process.
296    #[clap(long, env = "ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES")]
297    orchestrator_process_propagate_crashes: bool,
298    /// An IP address on which the process orchestrator should bind TCP proxies
299    /// for Unix domain sockets.
300    ///
301    /// When specified, for each named port of each created service, the process
302    /// orchestrator will bind a TCP listener to the specified address that
303    /// proxies incoming connections to the underlying Unix domain socket. The
304    /// allocated TCP port will be emitted as a tracing event.
305    ///
306    /// The primary use is live debugging the running child services via tools
307    /// that do not support Unix domain sockets (e.g., Prometheus, web
308    /// browsers).
309    #[clap(long, env = "ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR")]
310    orchestrator_process_tcp_proxy_listen_addr: Option<IpAddr>,
311    /// A directory in which the process orchestrator should write Prometheus
312    /// scrape targets, for use with Prometheus's file-based service discovery.
313    ///
314    /// Each namespaced orchestrator will maintain a single JSON file into the
315    /// directory named `NAMESPACE.json` containing the scrape targets for all
316    /// extant services. The scrape targets will use the TCP proxy address, as
317    /// Prometheus does not support scraping over Unix domain sockets.
318    ///
319    /// This option is ignored unless
320    /// `--orchestrator-process-tcp-proxy-listen-addr` is set.
321    ///
322    /// See also: <https://prometheus.io/docs/guides/file-sd/>
323    #[clap(
324        long,
325        env = "ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY"
326    )]
327    orchestrator_process_prometheus_service_discovery_directory: Option<PathBuf>,
328    /// A scratch directory that orchestrated processes can use for ephemeral storage.
329    #[clap(
330        long,
331        env = "ORCHESTRATOR_PROCESS_SCRATCH_DIRECTORY",
332        value_name = "PATH"
333    )]
334    orchestrator_process_scratch_directory: Option<PathBuf>,
335    /// Whether to use coverage build and collect coverage information. Not to be used for
336    /// production, only testing.
337    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_COVERAGE")]
338    orchestrator_kubernetes_coverage: bool,
339    /// The secrets controller implementation to use.
340    #[structopt(
341        long,
342        value_enum,
343        env = "SECRETS_CONTROLLER",
344        default_value_ifs([
345            ("orchestrator", "kubernetes", Some("kubernetes")),
346            ("orchestrator", "process", Some("local-file"))
347        ]),
348        default_value("kubernetes"), // This shouldn't be possible, but it makes clap happy.
349    )]
350    secrets_controller: SecretsControllerKind,
351    /// The list of tags to be set on AWS Secrets Manager secrets created by the
352    /// AWS secrets controller.
353    #[clap(
354        long,
355        env = "AWS_SECRETS_CONTROLLER_TAGS",
356        action = ArgAction::Append,
357        value_delimiter = ';',
358        required_if_eq("secrets_controller", "aws-secrets-manager")
359    )]
360    aws_secrets_controller_tags: Vec<KeyValueArg<String, String>>,
361    /// The clusterd image reference to use.
362    #[structopt(
363        long,
364        env = "CLUSTERD_IMAGE",
365        required_if_eq("orchestrator", "kubernetes"),
366        default_value_if("orchestrator", "process", Some("clusterd"))
367    )]
368    clusterd_image: Option<String>,
369    /// A number representing the environment's generation.
370    ///
371    /// This is incremented to request that the new process perform a graceful
372    /// transition of power from the prior generation.
373    #[clap(long, env = "DEPLOY_GENERATION", default_value = "0")]
374    deploy_generation: u64,
375
376    /// Can be provided in place of both persist_consensus_url and
377    /// timestamp_oracle_url in order to point both at the same backend
378    #[clap(
379        long,
380        env = "METADATA_BACKEND_URL",
381        conflicts_with_all = &[
382            "persist_consensus_url",
383            "timestamp_oracle_url",
384        ],
385    )]
386    metadata_backend_url: Option<SensitiveUrl>,
387
388    /// Helm chart version for self-hosted Materialize. This version does not correspond to the
389    /// Materialize (core) version (v0.125.0), but is time-based for our twice-a-year helm chart
390    /// releases: v25.1.Z, v25.2.Z in 2025, then v26.1.Z, v26.2.Z in 2026, and so on. This version
391    /// is displayed in addition in `SELECT mz_version()` if set.
392    #[clap(long, env = "HELM_CHART_VERSION")]
393    helm_chart_version: Option<String>,
394
395    // === Storage options. ===
396    /// Where the persist library should store its blob data.
397    #[clap(long, env = "PERSIST_BLOB_URL")]
398    persist_blob_url: SensitiveUrl,
399    /// Where the persist library should perform consensus.
400    #[clap(long, env = "PERSIST_CONSENSUS_URL")]
401    persist_consensus_url: Option<SensitiveUrl>,
402    /// The Persist PubSub URL.
403    ///
404    /// This URL is passed to `clusterd` for discovery of the Persist PubSub service.
405    #[clap(
406        long,
407        env = "PERSIST_PUBSUB_URL",
408        default_value = "http://localhost:6879"
409    )]
410    persist_pubsub_url: String,
411    /// The number of worker threads created for the IsolatedRuntime used for
412    /// storage related tasks. A negative value will subtract from the number
413    /// of threads returned by [`num_cpus::get`].
414    #[clap(long, env = "PERSIST_ISOLATED_RUNTIME_THREADS")]
415    persist_isolated_runtime_threads: Option<isize>,
416    /// The interval in seconds at which to collect storage usage information.
417    #[clap(
418        long,
419        env = "STORAGE_USAGE_COLLECTION_INTERVAL",
420        value_parser = humantime::parse_duration,
421        default_value = "3600s"
422    )]
423    storage_usage_collection_interval_sec: Duration,
424    /// The period for which to retain usage records. Note that the retention
425    /// period is only evaluated at server start time, so rebooting the server
426    /// is required to discard old records.
427    #[clap(long, env = "STORAGE_USAGE_RETENTION_PERIOD", value_parser = humantime::parse_duration)]
428    storage_usage_retention_period: Option<Duration>,
429
430    // === Adapter options. ===
431    /// The PostgreSQL URL for the Postgres-backed timestamp oracle.
432    #[clap(long, env = "TIMESTAMP_ORACLE_URL", value_name = "POSTGRES_URL")]
433    timestamp_oracle_url: Option<SensitiveUrl>,
434    /// Availability zones in which storage and compute resources may be
435    /// deployed.
436    #[clap(long, env = "AVAILABILITY_ZONE", use_value_delimiter = true)]
437    availability_zone: Vec<String>,
438    /// A map from size name to resource allocations for cluster replicas.
439    #[clap(
440        long,
441        env = "CLUSTER_REPLICA_SIZES",
442        requires = "bootstrap_default_cluster_replica_size"
443    )]
444    cluster_replica_sizes: String,
445    /// An API key for Segment. Enables export of audit events to Segment.
446    #[clap(long, env = "SEGMENT_API_KEY")]
447    segment_api_key: Option<String>,
448    /// Whether the Segment client is being used on the client side
449    /// (rather than the server side).
450    ///
451    /// Enabling this causes the Segment server to record the IP address from
452    /// which the event was sent.
453    #[clap(long, env = "SEGMENT_CLIENT_SIDE")]
454    segment_client_side: bool,
455    /// An SDK key for LaunchDarkly.
456    ///
457    /// Setting this in combination with [`Self::config_sync_loop_interval`]
458    /// will enable synchronization of LaunchDarkly features with system
459    /// configuration parameters.
460    #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")]
461    launchdarkly_sdk_key: Option<String>,
462    /// A list of PARAM_NAME=KEY_NAME pairs from system parameter names to
463    /// LaunchDarkly feature keys.
464    ///
465    /// This is used (so far only for testing purposes) when propagating values
466    /// from the latter to the former. The identity map is assumed for absent
467    /// parameter names.
468    #[clap(
469        long,
470        env = "LAUNCHDARKLY_KEY_MAP",
471        action = ArgAction::Append,
472        value_delimiter = ';'
473    )]
474    launchdarkly_key_map: Vec<KeyValueArg<String, String>>,
475    /// The duration at which the system parameter synchronization times out during startup.
476    #[clap(
477        long,
478        env = "CONFIG_SYNC_TIMEOUT",
479        value_parser = humantime::parse_duration,
480        default_value = "30s"
481    )]
482    config_sync_timeout: Duration,
483    /// The interval in seconds at which to synchronize system parameter values.
484    ///
485    /// If this is not explicitly set, the loop that synchronizes LaunchDarkly
486    /// features with system configuration parameters will not run _even if
487    /// [`Self::launchdarkly_sdk_key`] is present_.
488    #[clap(
489        long,
490        env = "CONFIG_SYNC_LOOP_INTERVAL",
491        value_parser = humantime::parse_duration,
492    )]
493    config_sync_loop_interval: Option<Duration>,
494    /// A scratch directory that can be used for ephemeral storage.
495    //
496    // NOTE(jkosh44): this argument is intentionally unused at present. It is
497    // future proofing for a world where `environmentd` needs to spill
498    // ephemeral state to disk.
499    #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
500    scratch_directory: Option<PathBuf>,
501
502    // === Bootstrap options. ===
503    #[clap(
504        long,
505        env = "ENVIRONMENT_ID",
506        value_name = "<CLOUD>-<REGION>-<ORG-ID>-<ORDINAL>"
507    )]
508    environment_id: EnvironmentId,
509    /// If set, a role with the provided name will be created with `CREATEDB`
510    /// and `CREATECLUSTER` attributes. It will also have `CREATE` privileges on
511    /// the `materialize` database, `materialize.public` schema, and
512    /// `quickstart` cluster.
513    ///
514    /// This option is meant for local development and testing to simplify the
515    /// initial process of granting attributes and privileges to some default
516    /// role.
517    #[clap(long, env = "BOOTSTRAP_ROLE")]
518    bootstrap_role: Option<String>,
519    /// The size of the default cluster replica if bootstrapping.
520    #[clap(
521        long,
522        env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
523        default_value = "1"
524    )]
525    bootstrap_default_cluster_replica_size: String,
526    /// The size of the builtin system cluster replicas if bootstrapping.
527    #[clap(
528        long,
529        env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
530        default_value = "1"
531    )]
532    bootstrap_builtin_system_cluster_replica_size: String,
533    /// The size of the builtin catalog server cluster replicas if bootstrapping.
534    #[clap(
535        long,
536        env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
537        default_value = "1"
538    )]
539    bootstrap_builtin_catalog_server_cluster_replica_size: String,
540    /// The size of the builtin probe cluster replicas if bootstrapping.
541    #[clap(
542        long,
543        env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
544        default_value = "1"
545    )]
546    bootstrap_builtin_probe_cluster_replica_size: String,
547    /// The size of the builtin support cluster replicas if bootstrapping.
548    #[clap(
549        long,
550        env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
551        default_value = "1"
552    )]
553    bootstrap_builtin_support_cluster_replica_size: String,
554    /// The size of the builtin analytics cluster replicas if bootstrapping.
555    #[clap(
556        long,
557        env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
558        default_value = "1"
559    )]
560    bootstrap_builtin_analytics_cluster_replica_size: String,
561    #[clap(
562        long,
563        env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR",
564        default_value = DEFAULT_REPLICATION_FACTOR.to_string(),
565        value_parser = clap::value_parser!(u32).range(0..=2)
566    )]
567    bootstrap_default_cluster_replication_factor: u32,
568    /// The replication factor of the builtin system cluster replicas if bootstrapping.
569    #[clap(
570        long,
571        env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR",
572        default_value = SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
573        value_parser = clap::value_parser!(u32).range(0..=2)
574    )]
575    bootstrap_builtin_system_cluster_replication_factor: u32,
576    /// The replication factor of the builtin catalog server cluster replicas if bootstrapping.
577    #[clap(
578        long,
579        env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICATION_FACTOR",
580        default_value = CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
581        value_parser = clap::value_parser!(u32).range(0..=2)
582    )]
583    bootstrap_builtin_catalog_server_cluster_replication_factor: u32,
584    /// The replication factor of the builtin probe cluster replicas if bootstrapping.
585    #[clap(
586        long,
587        env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR",
588        default_value = PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
589        value_parser = clap::value_parser!(u32).range(0..=2)
590    )]
591    bootstrap_builtin_probe_cluster_replication_factor: u32,
592    /// The replication factor of the builtin support cluster replicas if bootstrapping.
593    #[clap(
594        long,
595        env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICATION_FACTOR",
596        default_value = SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
597        value_parser = clap::value_parser!(u32).range(0..=2)
598    )]
599    bootstrap_builtin_support_cluster_replication_factor: u32,
600    /// The replication factor of the builtin analytics cluster replicas if bootstrapping.
601    #[clap(
602        long,
603        env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICATION_FACTOR",
604        default_value = ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
605        value_parser = clap::value_parser!(u32).range(0..=2)
606    )]
607    bootstrap_builtin_analytics_cluster_replication_factor: u32,
608    /// An list of NAME=VALUE pairs used to override static defaults
609    /// for system parameters.
610    #[clap(
611        long,
612        env = "SYSTEM_PARAMETER_DEFAULT",
613        action = ArgAction::Append,
614        value_delimiter = ';'
615    )]
616    system_parameter_default: Vec<KeyValueArg<String, String>>,
617    /// File containing a valid Materialize license key.
618    #[clap(long, env = "LICENSE_KEY")]
619    license_key: Option<String>,
620    // TODO: temporary until we get the rest of the infrastructure in place
621    #[clap(long, hide = true)]
622    disable_license_key_checks: bool,
623
624    // === AWS options. ===
625    /// The AWS account ID, which will be used to generate ARNs for
626    /// Materialize-controlled AWS resources.
627    #[clap(long, env = "AWS_ACCOUNT_ID")]
628    aws_account_id: Option<String>,
629    /// The ARN for a Materialize-controlled role to assume before assuming
630    /// a customer's requested role for an AWS connection.
631    #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
632    aws_connection_role_arn: Option<String>,
633    /// Prefix for an external ID to be supplied to all AWS AssumeRole operations.
634    ///
635    /// Details: <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>
636    #[clap(long, env = "AWS_EXTERNAL_ID_PREFIX", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
637    aws_external_id_prefix: Option<AwsExternalIdPrefix>,
638    /// The list of supported AWS PrivateLink availability zone ids.
639    /// Must be zone IDs, of format e.g. "use-az1".
640    #[clap(
641        long,
642        env = "AWS_PRIVATELINK_AVAILABILITY_ZONES",
643        action = ArgAction::Append,
644        use_value_delimiter = true
645    )]
646    aws_privatelink_availability_zones: Option<Vec<String>>,
647
648    // === Tracing options. ===
649    #[clap(flatten)]
650    tracing: TracingCliArgs,
651
652    // === Testing options. ===
653    /// Injects arbitrary whitespace into builtin table fingerprints, which can
654    /// trigger builtin item migrations. The amount of whitespace is determined
655    /// by
656    /// `unsafe_builtin_table_fingerprint_whitespace_version`.
657    /// This argument is meant for testing only and as the names suggests
658    /// should not be set in production.
659    #[clap(long, value_enum, requires = "unsafe_mode")]
660    unsafe_builtin_table_fingerprint_whitespace: Option<UnsafeBuiltinTableFingerprintWhitespace>,
661    /// Controls the amount of whitespace injected by
662    /// `unsafe_builtin_table_fingerprint_whitespace`.
663    /// Incrementing this value can allow triggering multiple builtin
664    /// migrations from a single test. This argument is meant for testing only
665    /// and as the names suggests should not be set in production.
666    #[clap(long, requires = "unsafe_mode", default_value = "1")]
667    unsafe_builtin_table_fingerprint_whitespace_version: usize,
668}
669
670#[derive(ValueEnum, Debug, Clone)]
671enum OrchestratorKind {
672    Kubernetes,
673    Process,
674}
675
676// TODO [Alex Hunt] move this to a shared function that can be imported by the
677// region-controller.
678fn aws_secrets_controller_prefix(env_id: &EnvironmentId) -> String {
679    format!("/user-managed/{}/", env_id)
680}
681fn aws_secrets_controller_key_alias(env_id: &EnvironmentId) -> String {
682    // TODO [Alex Hunt] move this to a shared function that can be imported by the
683    // region-controller.
684    format!("alias/customer_key_{}", env_id)
685}
686
687pub fn main() {
688    let args = cli::parse_args(CliConfig {
689        env_prefix: Some("MZ_"),
690        enable_version_flag: true,
691    });
692    if let Err(err) = run(args) {
693        panic!("environmentd: fatal: {}", err.display_with_causes());
694    }
695}
696
697fn run(mut args: Args) -> Result<(), anyhow::Error> {
698    mz_ore::panic::install_enhanced_handler();
699    let envd_start = Instant::now();
700
701    // Configure signal handling as soon as possible. We want signals to be
702    // handled to our liking ASAP.
703    sys::enable_sigusr2_coverage_dump()?;
704    sys::enable_termination_signal_cleanup()?;
705
706    let license_key = if args.disable_license_key_checks {
707        ValidatedLicenseKey::disabled()
708    } else if let Some(license_key_file) = args.license_key {
709        let license_key_text = std::fs::read_to_string(&license_key_file)
710            .context("failed to open license key file")?;
711        let license_key = mz_license_keys::validate(
712            license_key_text.trim(),
713            &args.environment_id.organization_id().to_string(),
714        )
715        .context("failed to validate license key file")?;
716        if license_key.expired {
717            let message = format!(
718                "The license key provided at {license_key_file} is expired! Please contact Materialize for assistance."
719            );
720            match license_key.expiration_behavior {
721                ExpirationBehavior::Warn | ExpirationBehavior::DisableClusterCreation => {
722                    warn!("{message}");
723                }
724                ExpirationBehavior::Disable => bail!("{message}"),
725            }
726        }
727        license_key
728    } else if matches!(args.orchestrator, OrchestratorKind::Kubernetes) {
729        bail!("--license-key is required when running in Kubernetes");
730    } else {
731        ValidatedLicenseKey::default()
732    };
733
734    // Configure testing options.
735    if let Some(fingerprint_whitespace) = args.unsafe_builtin_table_fingerprint_whitespace {
736        assert!(args.unsafe_mode);
737        let whitespace = "\n".repeat(args.unsafe_builtin_table_fingerprint_whitespace_version);
738        *UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE
739            .lock()
740            .expect("lock poisoned") = Some((fingerprint_whitespace, whitespace));
741    }
742
743    // Start Tokio runtime.
744
745    let ncpus_useful = usize::max(1, cmp::min(num_cpus::get(), num_cpus::get_physical()));
746    let runtime = Arc::new(
747        tokio::runtime::Builder::new_multi_thread()
748            .worker_threads(ncpus_useful)
749            // The default thread name exceeds the Linux limit on thread name
750            // length, so pick something shorter.
751            .thread_name_fn(|| {
752                static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
753                let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
754                format!("tokio:work-{}", id)
755            })
756            .enable_all()
757            .build()?,
758    );
759
760    // Configure tracing to log the service name when using the process
761    // orchestrator, which intermingles log output from multiple services. Other
762    // orchestrators separate log output from different services.
763    args.tracing.log_prefix = if matches!(args.orchestrator, OrchestratorKind::Process) {
764        Some("environmentd".to_string())
765    } else {
766        None
767    };
768
769    let metrics_registry = MetricsRegistry::new();
770    let (tracing_handle, _tracing_guard) = runtime.block_on(args.tracing.configure_tracing(
771        StaticTracingConfig {
772            service_name: "environmentd",
773            build_info: BUILD_INFO,
774        },
775        metrics_registry.clone(),
776    ))?;
777    register_runtime_metrics("main", runtime.metrics(), &metrics_registry);
778
779    let span = tracing::info_span!("environmentd::run").entered();
780
781    info!("startup: envd init: beginning");
782    info!("startup: envd init: preamble beginning");
783
784    let metrics = Metrics::register_into(&metrics_registry, BUILD_INFO);
785
786    runtime.block_on(mz_alloc::register_metrics_into(&metrics_registry));
787    runtime.block_on(mz_metrics::register_metrics_into(
788        &metrics_registry,
789        mz_dyncfgs::all_dyncfgs(),
790    ));
791
792    // Initialize fail crate for failpoint support
793    let _failpoint_scenario = FailScenario::setup();
794
795    // Configure connections.
796    let tls = args.tls.into_config()?;
797    let frontegg = Authenticator::from_args(args.frontegg, &metrics_registry)?;
798
799    // Configure CORS.
800    let allowed_origins = if !args.cors_allowed_origin.is_empty() {
801        args.cors_allowed_origin
802    } else {
803        let port = args.http_listen_addr.port();
804        vec![
805            HeaderValue::from_str(&format!("http://localhost:{}", port)).unwrap(),
806            HeaderValue::from_str(&format!("http://127.0.0.1:{}", port)).unwrap(),
807            HeaderValue::from_str(&format!("http://[::1]:{}", port)).unwrap(),
808            HeaderValue::from_str(&format!("https://localhost:{}", port)).unwrap(),
809            HeaderValue::from_str(&format!("https://127.0.0.1:{}", port)).unwrap(),
810            HeaderValue::from_str(&format!("https://[::1]:{}", port)).unwrap(),
811        ]
812    };
813    let cors_allowed_origin = mz_http_util::build_cors_allowed_origin(&allowed_origins);
814
815    // Configure controller.
816    let entered = info_span!("environmentd::configure_controller").entered();
817    let (orchestrator, secrets_controller, cloud_resource_controller): (
818        Arc<dyn Orchestrator>,
819        Arc<dyn SecretsController>,
820        Option<Arc<dyn CloudResourceController>>,
821    ) = match args.orchestrator {
822        OrchestratorKind::Kubernetes => {
823            if args.orchestrator_process_scratch_directory.is_some() {
824                bail!(
825                    "--orchestrator-process-scratch-directory is \
826                      not currently usable with the kubernetes orchestrator"
827                );
828            }
829
830            let orchestrator = Arc::new(
831                runtime
832                    .block_on(KubernetesOrchestrator::new(KubernetesOrchestratorConfig {
833                        context: args.orchestrator_kubernetes_context.clone(),
834                        scheduler_name: args.orchestrator_kubernetes_scheduler_name,
835                        service_labels: args
836                            .orchestrator_kubernetes_service_label
837                            .into_iter()
838                            .map(|l| (l.key, l.value))
839                            .collect(),
840                        service_node_selector: args
841                            .orchestrator_kubernetes_service_node_selector
842                            .into_iter()
843                            .map(|l| (l.key, l.value))
844                            .collect(),
845                        service_account: args.orchestrator_kubernetes_service_account,
846                        image_pull_policy: args.orchestrator_kubernetes_image_pull_policy,
847                        aws_external_id_prefix: args.aws_external_id_prefix.clone(),
848                        coverage: args.orchestrator_kubernetes_coverage,
849                        ephemeral_volume_storage_class: args
850                            .orchestrator_kubernetes_ephemeral_volume_class
851                            .clone(),
852                        service_fs_group: args.orchestrator_kubernetes_service_fs_group.clone(),
853                        name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
854                        collect_pod_metrics: !args
855                            .orchestrator_kubernetes_disable_pod_metrics_collection,
856                        enable_prometheus_scrape_annotations: args
857                            .orchestrator_kubernetes_enable_prometheus_scrape_annotations,
858                    }))
859                    .context("creating kubernetes orchestrator")?,
860            );
861            let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
862                SecretsControllerKind::Kubernetes => {
863                    let sc = Arc::clone(&orchestrator);
864                    let sc: Arc<dyn SecretsController> = sc;
865                    sc
866                }
867                SecretsControllerKind::AwsSecretsManager => {
868                    Arc::new(
869                        runtime.block_on(AwsSecretsController::new(
870                            // TODO [Alex Hunt] move this to a shared function that can be imported by the
871                            // region-controller.
872                            &aws_secrets_controller_prefix(&args.environment_id),
873                            &aws_secrets_controller_key_alias(&args.environment_id),
874                            args.aws_secrets_controller_tags
875                                .into_iter()
876                                .map(|tag| (tag.key, tag.value))
877                                .collect(),
878                        )),
879                    )
880                }
881                SecretsControllerKind::LocalFile => bail!(
882                    "SecretsControllerKind::LocalFile is not compatible with Orchestrator::Kubernetes."
883                ),
884            };
885            let cloud_resource_controller = Arc::clone(&orchestrator);
886            (
887                orchestrator,
888                secrets_controller,
889                Some(cloud_resource_controller),
890            )
891        }
892        OrchestratorKind::Process => {
893            if args
894                .orchestrator_kubernetes_ephemeral_volume_class
895                .is_some()
896            {
897                bail!(
898                    "--orchestrator-kubernetes-ephemeral-volume-class is \
899                      not usable with the process orchestrator"
900                );
901            }
902            let orchestrator = Arc::new(
903                runtime
904                    .block_on(ProcessOrchestrator::new(ProcessOrchestratorConfig {
905                        // Look for binaries in the same directory as the
906                        // running binary. When running via `cargo run`, this
907                        // means that debug binaries look for other debug
908                        // binaries and release binaries look for other release
909                        // binaries.
910                        image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
911                        suppress_output: false,
912                        environment_id: args.environment_id.to_string(),
913                        secrets_dir: args
914                            .orchestrator_process_secrets_directory
915                            .clone()
916                            .expect("clap enforced"),
917                        command_wrapper: args
918                            .orchestrator_process_wrapper
919                            .map_or(Ok(vec![]), |s| shell_words::split(&s))?,
920                        propagate_crashes: args.orchestrator_process_propagate_crashes,
921                        tcp_proxy: args.orchestrator_process_tcp_proxy_listen_addr.map(
922                            |listen_addr| ProcessOrchestratorTcpProxyConfig {
923                                listen_addr,
924                                prometheus_service_discovery_dir: args
925                                    .orchestrator_process_prometheus_service_discovery_directory,
926                            },
927                        ),
928                        scratch_directory: args
929                            .orchestrator_process_scratch_directory
930                            .expect("process orchestrator requires scratch directory"),
931                    }))
932                    .context("creating process orchestrator")?,
933            );
934            let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
935                SecretsControllerKind::Kubernetes => bail!(
936                    "SecretsControllerKind::Kubernetes is not compatible with Orchestrator::Process."
937                ),
938                SecretsControllerKind::AwsSecretsManager => Arc::new(
939                    runtime.block_on(AwsSecretsController::new(
940                        &aws_secrets_controller_prefix(&args.environment_id),
941                        &aws_secrets_controller_key_alias(&args.environment_id),
942                        args.aws_secrets_controller_tags
943                            .into_iter()
944                            .map(|tag| (tag.key, tag.value))
945                            .collect(),
946                    )),
947                ),
948                SecretsControllerKind::LocalFile => {
949                    let sc = Arc::clone(&orchestrator);
950                    let sc: Arc<dyn SecretsController> = sc;
951                    sc
952                }
953            };
954            (orchestrator, secrets_controller, None)
955        }
956    };
957    drop(entered);
958    let cloud_resource_reader = cloud_resource_controller.as_ref().map(|c| c.reader());
959    let secrets_reader = secrets_controller.reader();
960    let now = SYSTEM_TIME.clone();
961
962    let mut persist_config =
963        PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
964    // Start with compaction disabled, later enable it if we're not in read-only mode.
965    persist_config.disable_compaction();
966
967    let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
968    let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
969
970    match args.persist_isolated_runtime_threads {
971        // Use the default.
972        None | Some(0) => (),
973        Some(x @ ..=-1) => {
974            let threads = num_cpus::get().saturating_add_signed(x).max(1);
975            persist_config.isolated_runtime_worker_threads = threads;
976        }
977        Some(x @ 1..) => {
978            let threads = usize::try_from(x).expect("pattern matched a positive value");
979            persist_config.isolated_runtime_worker_threads = threads;
980        }
981    };
982
983    let _server = runtime.spawn_named(
984        || "persist::rpc::server",
985        async move {
986            info!(
987                "listening for Persist PubSub connections on {}",
988                args.internal_persist_pubsub_listen_addr
989            );
990            // Intentionally do not bubble up errors here, we don't want to take
991            // down environmentd if there are any issues with the pubsub server.
992            let res = persist_pubsub_server
993                .serve(args.internal_persist_pubsub_listen_addr)
994                .await;
995            error!("Persist Pubsub server exited {:?}", res);
996        }
997        .instrument(tracing::info_span!("persist::rpc::server")),
998    );
999
1000    let persist_clients = {
1001        // PersistClientCache may spawn tasks, so run within a tokio runtime context
1002        let _tokio_guard = runtime.enter();
1003        PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
1004            let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
1005                cfg,
1006                persist_pubsub_client.sender,
1007                metrics,
1008            ));
1009            PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
1010        })
1011    };
1012
1013    let consensus_uri = args.persist_consensus_url.unwrap_or_else(|| {
1014        args.metadata_backend_url
1015            .as_ref()
1016            .map(|metadata_backend_url| {
1017                SensitiveUrl(
1018                    Url::parse_with_params(
1019                        metadata_backend_url.0.as_ref(),
1020                        &[("options", "--search_path=consensus")],
1021                    )
1022                    .unwrap(),
1023                )
1024            })
1025            .expect("either --persist-consensus-url or --metadata-backend-url must be provided")
1026    });
1027    let timestamp_oracle_url = args.timestamp_oracle_url.or_else(|| {
1028        args.metadata_backend_url
1029            .as_ref()
1030            .map(|metadata_backend_url| {
1031                SensitiveUrl(
1032                    Url::parse_with_params(
1033                        metadata_backend_url.0.as_ref(),
1034                        &[("options", "--search_path=tsoracle")],
1035                    )
1036                    .unwrap(),
1037                )
1038            })
1039    });
1040
1041    let persist_clients = Arc::new(persist_clients);
1042    let connection_context = ConnectionContext::from_cli_args(
1043        args.environment_id.to_string(),
1044        &args.tracing.startup_log_filter,
1045        args.aws_external_id_prefix,
1046        args.aws_connection_role_arn,
1047        secrets_reader,
1048        cloud_resource_reader,
1049    );
1050    let orchestrator = Arc::new(TracingOrchestrator::new(orchestrator, args.tracing.clone()));
1051    let controller = ControllerConfig {
1052        build_info: &BUILD_INFO,
1053        orchestrator,
1054        persist_location: PersistLocation {
1055            blob_uri: args.persist_blob_url,
1056            consensus_uri,
1057        },
1058        persist_clients: Arc::clone(&persist_clients),
1059        clusterd_image: args.clusterd_image.expect("clap enforced"),
1060        init_container_image: args.orchestrator_kubernetes_init_container_image,
1061        deploy_generation: args.deploy_generation,
1062        now: SYSTEM_TIME.clone(),
1063        metrics_registry: metrics_registry.clone(),
1064        persist_pubsub_url: args.persist_pubsub_url,
1065        connection_context,
1066        // When serialized to args in the controller, only the relevant flags will be passed
1067        // through, so we just set all of them
1068        secrets_args: SecretsReaderCliArgs {
1069            secrets_reader: args.secrets_controller,
1070            secrets_reader_local_file_dir: args.orchestrator_process_secrets_directory,
1071            secrets_reader_kubernetes_context: Some(args.orchestrator_kubernetes_context),
1072            secrets_reader_aws_prefix: Some(aws_secrets_controller_prefix(&args.environment_id)),
1073            secrets_reader_name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
1074        },
1075    };
1076
1077    let cluster_replica_sizes = ClusterReplicaSizeMap::parse_from_str(
1078        &args.cluster_replica_sizes,
1079        !license_key.allow_credit_consumption_override,
1080    )
1081    .context("parsing replica size map")?;
1082
1083    emit_boot_diagnostics!(&BUILD_INFO);
1084    sys::adjust_rlimits();
1085
1086    info!(
1087        "startup: envd init: preamble complete in {:?}",
1088        envd_start.elapsed()
1089    );
1090
1091    let serve_start = Instant::now();
1092    info!("startup: envd init: serving beginning");
1093    let server = runtime.block_on(async {
1094        let listeners = Listeners::bind(ListenersConfig {
1095            sql_listen_addr: args.sql_listen_addr,
1096            http_listen_addr: args.http_listen_addr,
1097            internal_sql_listen_addr: args.internal_sql_listen_addr,
1098            internal_http_listen_addr: args.internal_http_listen_addr,
1099        })
1100        .await?;
1101        let catalog_config = CatalogConfig {
1102            persist_clients,
1103            metrics: Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry)),
1104        };
1105        let server = listeners
1106            .serve(crate::Config {
1107                // Special modes.
1108                unsafe_mode: args.unsafe_mode,
1109                all_features: args.all_features,
1110                // Connection options.
1111                tls,
1112                tls_reload_certs: mz_server_core::default_cert_reload_ticker(),
1113                frontegg,
1114                cors_allowed_origin,
1115                egress_addresses: args.announce_egress_address,
1116                http_host_name: args.http_host_name,
1117                internal_console_redirect_url: args.internal_console_redirect_url,
1118                self_hosted_auth: args.enable_self_hosted_auth,
1119                self_hosted_auth_internal: args.enable_self_hosted_auth_internal,
1120                // Controller options.
1121                controller,
1122                secrets_controller,
1123                cloud_resource_controller,
1124                // Storage options.
1125                storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
1126                storage_usage_retention_period: args.storage_usage_retention_period,
1127                // Adapter options.
1128                catalog_config,
1129                availability_zones: args.availability_zone,
1130                cluster_replica_sizes,
1131                timestamp_oracle_url,
1132                segment_api_key: args.segment_api_key,
1133                segment_client_side: args.segment_client_side,
1134                launchdarkly_sdk_key: args.launchdarkly_sdk_key,
1135                launchdarkly_key_map: args
1136                    .launchdarkly_key_map
1137                    .into_iter()
1138                    .map(|kv| (kv.key, kv.value))
1139                    .collect(),
1140                config_sync_timeout: args.config_sync_timeout,
1141                config_sync_loop_interval: args.config_sync_loop_interval,
1142                // Bootstrap options.
1143                environment_id: args.environment_id,
1144                bootstrap_role: args.bootstrap_role,
1145                bootstrap_default_cluster_replica_size: args.bootstrap_default_cluster_replica_size,
1146                bootstrap_default_cluster_replication_factor: args
1147                    .bootstrap_default_cluster_replication_factor,
1148                bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1149                    size: args.bootstrap_builtin_system_cluster_replica_size,
1150                    replication_factor: args.bootstrap_builtin_system_cluster_replication_factor,
1151                },
1152                bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1153                    size: args.bootstrap_builtin_catalog_server_cluster_replica_size,
1154                    replication_factor: args
1155                        .bootstrap_builtin_catalog_server_cluster_replication_factor,
1156                },
1157                bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1158                    size: args.bootstrap_builtin_probe_cluster_replica_size,
1159                    replication_factor: args.bootstrap_builtin_probe_cluster_replication_factor,
1160                },
1161                bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1162                    size: args.bootstrap_builtin_support_cluster_replica_size,
1163                    replication_factor: args.bootstrap_builtin_support_cluster_replication_factor,
1164                },
1165                bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1166                    size: args.bootstrap_builtin_analytics_cluster_replica_size,
1167                    replication_factor: args.bootstrap_builtin_analytics_cluster_replication_factor,
1168                },
1169                system_parameter_defaults: args
1170                    .system_parameter_default
1171                    .into_iter()
1172                    .map(|kv| (kv.key, kv.value))
1173                    .collect(),
1174                helm_chart_version: args.helm_chart_version.clone(),
1175                license_key,
1176                // AWS options.
1177                aws_account_id: args.aws_account_id,
1178                aws_privatelink_availability_zones: args.aws_privatelink_availability_zones,
1179                // Observability options.
1180                metrics_registry,
1181                tracing_handle,
1182                // Testing options.
1183                now,
1184            })
1185            .await
1186            .maybe_terminate("booting server")?;
1187        Ok::<_, anyhow::Error>(server)
1188    })?;
1189    info!(
1190        "startup: envd init: serving complete in {:?}",
1191        serve_start.elapsed()
1192    );
1193
1194    let start_duration = envd_start.elapsed();
1195    metrics
1196        .start_time_environmentd
1197        .set(start_duration.as_millis().try_into().expect("must fit"));
1198    let span = span.exit();
1199    let id = span.context().span().span_context().trace_id();
1200    drop(span);
1201
1202    info!("startup: envd init: complete in {start_duration:?}");
1203
1204    println!(
1205        "environmentd {} listening...",
1206        BUILD_INFO.human_version(args.helm_chart_version)
1207    );
1208    println!(" SQL address: {}", server.sql_local_addr());
1209    println!(" HTTP address: {}", server.http_local_addr());
1210    println!(
1211        " Internal SQL address: {}",
1212        server.internal_sql_local_addr()
1213    );
1214    println!(
1215        " Internal HTTP address: {}",
1216        server.internal_http_local_addr()
1217    );
1218    println!(
1219        " Internal Persist PubSub address: {}",
1220        args.internal_persist_pubsub_listen_addr
1221    );
1222
1223    println!(" Root trace ID: {id}");
1224
1225    // Block forever.
1226    loop {
1227        thread::park();
1228    }
1229}
1230
1231fn build_info() -> Vec<String> {
1232    let openssl_version =
1233        unsafe { CStr::from_ptr(openssl_sys::OpenSSL_version(openssl_sys::OPENSSL_VERSION)) };
1234    let rdkafka_version = unsafe { CStr::from_ptr(rdkafka_sys::bindings::rd_kafka_version_str()) };
1235    vec![
1236        openssl_version.to_string_lossy().into_owned(),
1237        format!("librdkafka v{}", rdkafka_version.to_string_lossy()),
1238    ]
1239}
1240
1241#[derive(Debug, Clone)]
1242struct Metrics {
1243    pub start_time_environmentd: IntGauge,
1244}
1245
1246impl Metrics {
1247    pub fn register_into(registry: &MetricsRegistry, build_info: BuildInfo) -> Metrics {
1248        Metrics {
1249            start_time_environmentd: registry.register(metric!(
1250                name: "mz_start_time_environmentd",
1251                help: "Time in milliseconds from environmentd start until the adapter is ready.",
1252                const_labels: {
1253                    "version" => build_info.version,
1254                    "build_type" => if cfg!(release) { "release" } else { "debug" }
1255                },
1256            )),
1257        }
1258    }
1259}