mz_environmentd/environmentd/
main.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Manages a single Materialize environment.
11//!
12//! It listens for SQL connections on port 6875 (MTRL) and for HTTP connections
13//! on port 6876.
14
15use std::ffi::CStr;
16use std::fs::File;
17use std::net::{IpAddr, SocketAddr};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::LazyLock;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use std::{cmp, env, iter, thread};
24
25use anyhow::{Context, bail};
26use clap::{ArgAction, Parser, ValueEnum};
27use fail::FailScenario;
28use http::header::HeaderValue;
29use ipnet::IpNet;
30use itertools::Itertools;
31use mz_adapter::ResultExt;
32use mz_adapter_types::bootstrap_builtin_cluster_config::{
33    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
34    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, DEFAULT_REPLICATION_FACTOR,
35    PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR, SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
36    SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
37};
38use mz_auth::password::Password;
39use mz_aws_secrets_controller::AwsSecretsController;
40use mz_build_info::BuildInfo;
41use mz_catalog::builtin::{
42    UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE,
43    UnsafeBuiltinTableFingerprintWhitespace,
44};
45use mz_catalog::config::ClusterReplicaSizeMap;
46use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController};
47use mz_controller::ControllerConfig;
48use mz_frontegg_auth::{Authenticator as FronteggAuthenticator, FronteggCliArgs};
49use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
50use mz_orchestrator::Orchestrator;
51use mz_orchestrator_kubernetes::{
52    KubernetesImagePullPolicy, KubernetesOrchestrator, KubernetesOrchestratorConfig,
53};
54use mz_orchestrator_process::{
55    ProcessOrchestrator, ProcessOrchestratorConfig, ProcessOrchestratorTcpProxyConfig,
56};
57use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs, TracingOrchestrator};
58use mz_ore::cli::{self, CliConfig, KeyValueArg};
59use mz_ore::error::ErrorExt;
60use mz_ore::metric;
61use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
62use mz_ore::now::SYSTEM_TIME;
63use mz_ore::task::RuntimeExt;
64use mz_ore::url::SensitiveUrl;
65use mz_persist_client::PersistLocation;
66use mz_persist_client::cache::PersistClientCache;
67use mz_persist_client::cfg::PersistConfig;
68use mz_persist_client::rpc::{
69    MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
70};
71use mz_secrets::SecretsController;
72use mz_server_core::TlsCliArgs;
73use mz_service::emit_boot_diagnostics;
74use mz_service::secrets::{SecretsControllerKind, SecretsReaderCliArgs};
75use mz_sql::catalog::EnvironmentId;
76use mz_storage_types::connections::ConnectionContext;
77use opentelemetry::trace::TraceContextExt;
78use prometheus::IntGauge;
79use tracing::{Instrument, error, info, info_span, warn};
80use tracing_opentelemetry::OpenTelemetrySpanExt;
81use url::Url;
82
83use crate::environmentd::sys;
84use crate::{BUILD_INFO, CatalogConfig, ListenerConfig, Listeners, ListenersConfig};
85
86static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
87static LONG_VERSION: LazyLock<String> = LazyLock::new(|| {
88    iter::once(BUILD_INFO.human_version(None))
89        .chain(build_info())
90        .join("\n")
91});
92
93/// Manages a single Materialize environment.
94#[derive(Parser, Debug)]
95#[clap(
96    name = "environmentd",
97    next_line_help = true,
98    version = VERSION.as_str(),
99    long_version = LONG_VERSION.as_str(),
100)]
101pub struct Args {
102    // === Special modes. ===
103    /// Enable unsafe features. Unsafe features are those that should never run
104    /// in production but are appropriate for testing/local development.
105    #[clap(long, env = "UNSAFE_MODE")]
106    unsafe_mode: bool,
107    /// Enables all feature flags, meant only as a tool for local development;
108    /// this should never be enabled in CI.
109    #[clap(long, env = "ALL_FEATURES")]
110    all_features: bool,
111
112    // === Connection options. ===
113    /// Path to a file containing the json-formatted configuration of our
114    /// metrics, HTTP, and sql listeners.
115    #[clap(
116        long,
117        env = "LISTENERS_CONFIG_PATH",
118        value_name = "PATH",
119        action = ArgAction::Set,
120    )]
121    listeners_config_path: PathBuf,
122    /// Password for the mz_system user.
123    #[clap(
124        long,
125        env = "EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM",
126        action = ArgAction::Set,
127    )]
128    external_login_password_mz_system: Option<Password>,
129    /// The address on which to listen for Persist PubSub connections.
130    ///
131    /// Connections to this address are not subject to encryption, authentication,
132    /// or access control. Care should be taken to not expose the listen address
133    /// to the public internet or other unauthorized parties.
134    #[clap(
135        long,
136        value_name = "HOST:PORT",
137        env = "INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR",
138        default_value = "127.0.0.1:6879",
139        action = ArgAction::Set,
140    )]
141    internal_persist_pubsub_listen_addr: SocketAddr,
142    /// Enable cross-origin resource sharing (CORS) for HTTP requests from the
143    /// specified origin.
144    ///
145    /// The default allows all local connections.
146    /// "*" allows all.
147    /// "*.domain.com" allows connections from any matching subdomain.
148    ///
149    /// Wildcards in other positions (e.g., "https://*.foo.com" or "https://foo.*.com") have no effect.
150    #[structopt(long, env = "CORS_ALLOWED_ORIGIN")]
151    cors_allowed_origin: Vec<HeaderValue>,
152    /// Public CIDR which the cloud environment has configured for
153    /// egress.
154    #[clap(
155        long,
156        env = "ANNOUNCE_EGRESS_ADDRESS",
157        action = ArgAction::Append,
158        use_value_delimiter = true
159    )]
160    announce_egress_address: Vec<IpNet>,
161    /// The external host name to connect to the HTTP server of this
162    /// environment.
163    ///
164    /// Presently used to render webhook URLs for end users in notices and the
165    /// system catalog. Not used to establish connections directly.
166    #[clap(long, env = "HTTP_HOST_NAME")]
167    http_host_name: Option<String>,
168    /// The URL of the Materialize console to proxy from the /internal-console
169    /// endpoint on the internal HTTP server.
170    #[clap(long, env = "INTERNAL_CONSOLE_REDIRECT_URL")]
171    internal_console_redirect_url: Option<String>,
172    /// TLS arguments.
173    #[clap(flatten)]
174    tls: TlsCliArgs,
175    /// Frontegg arguments.
176    #[clap(flatten)]
177    frontegg: FronteggCliArgs,
178    // === Orchestrator options. ===
179    /// The service orchestrator implementation to use.
180    #[structopt(long, value_enum, env = "ORCHESTRATOR")]
181    orchestrator: OrchestratorKind,
182    /// Name of a non-default Kubernetes scheduler, if any.
183    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SCHEDULER_NAME")]
184    orchestrator_kubernetes_scheduler_name: Option<String>,
185    /// Annotations to apply to all services created by the Kubernetes orchestrator
186    /// in the form `KEY=VALUE`.
187    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ANNOTATION")]
188    orchestrator_kubernetes_service_annotation: Vec<KeyValueArg<String, String>>,
189    /// Labels to apply to all services created by the Kubernetes orchestrator
190    /// in the form `KEY=VALUE`.
191    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_LABEL")]
192    orchestrator_kubernetes_service_label: Vec<KeyValueArg<String, String>>,
193    /// Node selector to apply to all services created by the Kubernetes
194    /// orchestrator in the form `KEY=VALUE`.
195    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_NODE_SELECTOR")]
196    orchestrator_kubernetes_service_node_selector: Vec<KeyValueArg<String, String>>,
197    /// Affinity to apply to all services created by the Kubernetes
198    /// orchestrator as a JSON string.
199    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_AFFINITY")]
200    orchestrator_kubernetes_service_affinity: Option<String>,
201    /// Tolerations to apply to all services created by the Kubernetes
202    /// orchestrator as a JSON string.
203    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_TOLERATIONS")]
204    orchestrator_kubernetes_service_tolerations: Option<String>,
205    /// The name of a service account to apply to all services created by the
206    /// Kubernetes orchestrator.
207    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ACCOUNT")]
208    orchestrator_kubernetes_service_account: Option<String>,
209    /// The Kubernetes context to use with the Kubernetes orchestrator.
210    ///
211    /// This defaults to `minikube` to prevent disaster (e.g., connecting to a
212    /// production cluster that happens to be the active Kubernetes context.)
213    #[structopt(
214        long,
215        env = "ORCHESTRATOR_KUBERNETES_CONTEXT",
216        default_value = "minikube"
217    )]
218    orchestrator_kubernetes_context: String,
219    /// The image pull policy to use for services created by the Kubernetes
220    /// orchestrator.
221    #[structopt(
222        long,
223        env = "ORCHESTRATOR_KUBERNETES_IMAGE_PULL_POLICY",
224        default_value = "always",
225        value_enum
226    )]
227    orchestrator_kubernetes_image_pull_policy: KubernetesImagePullPolicy,
228    /// The init container for services created by the Kubernetes orchestrator.
229    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_INIT_CONTAINER_IMAGE")]
230    orchestrator_kubernetes_init_container_image: Option<String>,
231    /// The Kubernetes StorageClass to use for the ephemeral volume attached to
232    /// services that request disk.
233    ///
234    /// If unspecified, the Kubernetes orchestrator will refuse to create
235    /// services that request disk.
236    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_EPHEMERAL_VOLUME_CLASS")]
237    orchestrator_kubernetes_ephemeral_volume_class: Option<String>,
238    /// The optional fs group for service's pods' `securityContext`.
239    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_FS_GROUP")]
240    orchestrator_kubernetes_service_fs_group: Option<i64>,
241    /// The prefix to prepend to all kubernetes object names.
242    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_NAME_PREFIX")]
243    orchestrator_kubernetes_name_prefix: Option<String>,
244    /// Whether to enable pod metrics collection.
245    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_DISABLE_POD_METRICS_COLLECTION")]
246    orchestrator_kubernetes_disable_pod_metrics_collection: bool,
247    /// Whether to annotate pods for prometheus service discovery.
248    #[clap(
249        long,
250        env = "ORCHESTRATOR_KUBERNETES_ENABLE_PROMETHEUS_SCRAPE_ANNOTATIONS"
251    )]
252    orchestrator_kubernetes_enable_prometheus_scrape_annotations: bool,
253    #[clap(long, env = "ORCHESTRATOR_PROCESS_WRAPPER")]
254    orchestrator_process_wrapper: Option<String>,
255    /// Where the process orchestrator should store secrets.
256    #[clap(
257        long,
258        env = "ORCHESTRATOR_PROCESS_SECRETS_DIRECTORY",
259        value_name = "PATH",
260        required_if_eq("orchestrator", "process")
261    )]
262    orchestrator_process_secrets_directory: Option<PathBuf>,
263    /// Whether the process orchestrator should handle crashes in child
264    /// processes by crashing the parent process.
265    #[clap(long, env = "ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES")]
266    orchestrator_process_propagate_crashes: bool,
267    /// An IP address on which the process orchestrator should bind TCP proxies
268    /// for Unix domain sockets.
269    ///
270    /// When specified, for each named port of each created service, the process
271    /// orchestrator will bind a TCP listener to the specified address that
272    /// proxies incoming connections to the underlying Unix domain socket. The
273    /// allocated TCP port will be emitted as a tracing event.
274    ///
275    /// The primary use is live debugging the running child services via tools
276    /// that do not support Unix domain sockets (e.g., Prometheus, web
277    /// browsers).
278    #[clap(long, env = "ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR")]
279    orchestrator_process_tcp_proxy_listen_addr: Option<IpAddr>,
280    /// A directory in which the process orchestrator should write Prometheus
281    /// scrape targets, for use with Prometheus's file-based service discovery.
282    ///
283    /// Each namespaced orchestrator will maintain a single JSON file into the
284    /// directory named `NAMESPACE.json` containing the scrape targets for all
285    /// extant services. The scrape targets will use the TCP proxy address, as
286    /// Prometheus does not support scraping over Unix domain sockets.
287    ///
288    /// This option is ignored unless
289    /// `--orchestrator-process-tcp-proxy-listen-addr` is set.
290    ///
291    /// See also: <https://prometheus.io/docs/guides/file-sd/>
292    #[clap(
293        long,
294        env = "ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY"
295    )]
296    orchestrator_process_prometheus_service_discovery_directory: Option<PathBuf>,
297    /// A scratch directory that orchestrated processes can use for ephemeral storage.
298    #[clap(
299        long,
300        env = "ORCHESTRATOR_PROCESS_SCRATCH_DIRECTORY",
301        value_name = "PATH"
302    )]
303    orchestrator_process_scratch_directory: Option<PathBuf>,
304    /// Whether to use coverage build and collect coverage information. Not to be used for
305    /// production, only testing.
306    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_COVERAGE")]
307    orchestrator_kubernetes_coverage: bool,
308    /// The secrets controller implementation to use.
309    #[structopt(
310        long,
311        value_enum,
312        env = "SECRETS_CONTROLLER",
313        default_value_ifs([
314            ("orchestrator", "kubernetes", Some("kubernetes")),
315            ("orchestrator", "process", Some("local-file"))
316        ]),
317        default_value("kubernetes"), // This shouldn't be possible, but it makes clap happy.
318    )]
319    secrets_controller: SecretsControllerKind,
320    /// The list of tags to be set on AWS Secrets Manager secrets created by the
321    /// AWS secrets controller.
322    #[clap(
323        long,
324        env = "AWS_SECRETS_CONTROLLER_TAGS",
325        action = ArgAction::Append,
326        value_delimiter = ';',
327        required_if_eq("secrets_controller", "aws-secrets-manager")
328    )]
329    aws_secrets_controller_tags: Vec<KeyValueArg<String, String>>,
330    /// The clusterd image reference to use.
331    #[structopt(
332        long,
333        env = "CLUSTERD_IMAGE",
334        required_if_eq("orchestrator", "kubernetes"),
335        default_value_if("orchestrator", "process", Some("clusterd"))
336    )]
337    clusterd_image: Option<String>,
338    /// A number representing the environment's generation.
339    ///
340    /// This is incremented to request that the new process perform a graceful
341    /// transition of power from the prior generation.
342    #[clap(long, env = "DEPLOY_GENERATION", default_value = "0")]
343    deploy_generation: u64,
344
345    /// Can be provided in place of both persist_consensus_url and
346    /// timestamp_oracle_url in order to point both at the same backend
347    #[clap(
348        long,
349        env = "METADATA_BACKEND_URL",
350        conflicts_with_all = &[
351            "persist_consensus_url",
352            "timestamp_oracle_url",
353        ],
354    )]
355    metadata_backend_url: Option<SensitiveUrl>,
356
357    /// Helm chart version for self-hosted Materialize. This version does not correspond to the
358    /// Materialize (core) version (v0.125.0), but is time-based for our twice-a-year helm chart
359    /// releases: v25.1.Z, v25.2.Z in 2025, then v26.1.Z, v26.2.Z in 2026, and so on. This version
360    /// is displayed in addition in `SELECT mz_version()` if set.
361    #[clap(long, env = "HELM_CHART_VERSION")]
362    helm_chart_version: Option<String>,
363
364    // === Storage options. ===
365    /// Where the persist library should store its blob data.
366    #[clap(long, env = "PERSIST_BLOB_URL")]
367    persist_blob_url: SensitiveUrl,
368    /// Where the persist library should perform consensus.
369    #[clap(long, env = "PERSIST_CONSENSUS_URL")]
370    persist_consensus_url: Option<SensitiveUrl>,
371    /// The Persist PubSub URL.
372    ///
373    /// This URL is passed to `clusterd` for discovery of the Persist PubSub service.
374    #[clap(
375        long,
376        env = "PERSIST_PUBSUB_URL",
377        default_value = "http://localhost:6879"
378    )]
379    persist_pubsub_url: String,
380    /// The number of worker threads created for the IsolatedRuntime used for
381    /// storage related tasks. A negative value will subtract from the number
382    /// of threads returned by [`num_cpus::get`].
383    #[clap(long, env = "PERSIST_ISOLATED_RUNTIME_THREADS")]
384    persist_isolated_runtime_threads: Option<isize>,
385    /// The interval in seconds at which to collect storage usage information.
386    #[clap(
387        long,
388        env = "STORAGE_USAGE_COLLECTION_INTERVAL",
389        value_parser = humantime::parse_duration,
390        default_value = "3600s"
391    )]
392    storage_usage_collection_interval_sec: Duration,
393    /// The period for which to retain usage records. Note that the retention
394    /// period is only evaluated at server start time, so rebooting the server
395    /// is required to discard old records.
396    #[clap(long, env = "STORAGE_USAGE_RETENTION_PERIOD", value_parser = humantime::parse_duration)]
397    storage_usage_retention_period: Option<Duration>,
398
399    // === Adapter options. ===
400    /// The PostgreSQL URL for the Postgres-backed timestamp oracle.
401    #[clap(long, env = "TIMESTAMP_ORACLE_URL", value_name = "POSTGRES_URL")]
402    timestamp_oracle_url: Option<SensitiveUrl>,
403    /// Availability zones in which storage and compute resources may be
404    /// deployed.
405    #[clap(long, env = "AVAILABILITY_ZONE", use_value_delimiter = true)]
406    availability_zone: Vec<String>,
407    /// A map from size name to resource allocations for cluster replicas.
408    #[clap(
409        long,
410        env = "CLUSTER_REPLICA_SIZES",
411        requires = "bootstrap_default_cluster_replica_size"
412    )]
413    cluster_replica_sizes: String,
414    /// An API key for Segment. Enables export of audit events to Segment.
415    #[clap(long, env = "SEGMENT_API_KEY")]
416    segment_api_key: Option<String>,
417    /// Whether the Segment client is being used on the client side
418    /// (rather than the server side).
419    ///
420    /// Enabling this causes the Segment server to record the IP address from
421    /// which the event was sent.
422    #[clap(long, env = "SEGMENT_CLIENT_SIDE")]
423    segment_client_side: bool,
424    /// Only create a dummy segment client when no segment api key is provided, only to get more
425    /// testing coverage.
426    #[clap(long, env = "TEST_ONLY_DUMMY_SEGMENT_CLIENT")]
427    test_only_dummy_segment_client: bool,
428    /// An SDK key for LaunchDarkly.
429    ///
430    /// Setting this in combination with [`Self::config_sync_loop_interval`]
431    /// will enable synchronization of LaunchDarkly features with system
432    /// configuration parameters.
433    #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")]
434    launchdarkly_sdk_key: Option<String>,
435    /// A list of PARAM_NAME=KEY_NAME pairs from system parameter names to
436    /// LaunchDarkly feature keys.
437    ///
438    /// This is used (so far only for testing purposes) when propagating values
439    /// from the latter to the former. The identity map is assumed for absent
440    /// parameter names.
441    #[clap(
442        long,
443        env = "LAUNCHDARKLY_KEY_MAP",
444        action = ArgAction::Append,
445        value_delimiter = ';'
446    )]
447    launchdarkly_key_map: Vec<KeyValueArg<String, String>>,
448    /// The duration at which the system parameter synchronization times out during startup.
449    #[clap(
450        long,
451        env = "CONFIG_SYNC_TIMEOUT",
452        value_parser = humantime::parse_duration,
453        default_value = "30s"
454    )]
455    config_sync_timeout: Duration,
456    /// The interval in seconds at which to synchronize system parameter values.
457    ///
458    /// If this is not explicitly set, the loop that synchronizes LaunchDarkly
459    /// features with system configuration parameters will not run _even if
460    /// [`Self::launchdarkly_sdk_key`] is present_.
461    #[clap(
462        long,
463        env = "CONFIG_SYNC_LOOP_INTERVAL",
464        value_parser = humantime::parse_duration,
465    )]
466    config_sync_loop_interval: Option<Duration>,
467    /// Path to a JSON file containing system parameter values.
468    /// If specified, this file will be used instead of LaunchDarkly for configuration.
469    #[clap(long, env = "CONFIG_SYNC_FILE_PATH", value_name = "PATH")]
470    config_sync_file_path: Option<PathBuf>,
471
472    // === Bootstrap options. ===
473    #[clap(
474        long,
475        env = "ENVIRONMENT_ID",
476        value_name = "<CLOUD>-<REGION>-<ORG-ID>-<ORDINAL>"
477    )]
478    environment_id: EnvironmentId,
479    /// If set, a role with the provided name will be created with `CREATEDB`
480    /// and `CREATECLUSTER` attributes. It will also have `CREATE` privileges on
481    /// the `materialize` database, `materialize.public` schema, and
482    /// `quickstart` cluster.
483    ///
484    /// This option is meant for local development and testing to simplify the
485    /// initial process of granting attributes and privileges to some default
486    /// role.
487    #[clap(long, env = "BOOTSTRAP_ROLE")]
488    bootstrap_role: Option<String>,
489    /// The size of the default cluster replica if bootstrapping.
490    #[clap(
491        long,
492        env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
493        default_value = "scale=1,workers=1"
494    )]
495    bootstrap_default_cluster_replica_size: String,
496    /// The size of the builtin system cluster replicas if bootstrapping.
497    #[clap(
498        long,
499        env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
500        default_value = "scale=1,workers=1"
501    )]
502    bootstrap_builtin_system_cluster_replica_size: String,
503    /// The size of the builtin catalog server cluster replicas if bootstrapping.
504    #[clap(
505        long,
506        env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
507        default_value = "scale=1,workers=1"
508    )]
509    bootstrap_builtin_catalog_server_cluster_replica_size: String,
510    /// The size of the builtin probe cluster replicas if bootstrapping.
511    #[clap(
512        long,
513        env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
514        default_value = "scale=1,workers=1"
515    )]
516    bootstrap_builtin_probe_cluster_replica_size: String,
517    /// The size of the builtin support cluster replicas if bootstrapping.
518    #[clap(
519        long,
520        env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
521        default_value = "scale=1,workers=1"
522    )]
523    bootstrap_builtin_support_cluster_replica_size: String,
524    /// The size of the builtin analytics cluster replicas if bootstrapping.
525    #[clap(
526        long,
527        env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
528        default_value = "scale=1,workers=1"
529    )]
530    bootstrap_builtin_analytics_cluster_replica_size: String,
531    #[clap(
532        long,
533        env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR",
534        default_value = DEFAULT_REPLICATION_FACTOR.to_string(),
535        value_parser = clap::value_parser!(u32).range(0..=2)
536    )]
537    bootstrap_default_cluster_replication_factor: u32,
538    /// The replication factor of the builtin system cluster replicas if bootstrapping.
539    #[clap(
540        long,
541        env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR",
542        default_value = SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
543        value_parser = clap::value_parser!(u32).range(0..=2)
544    )]
545    bootstrap_builtin_system_cluster_replication_factor: u32,
546    /// The replication factor of the builtin catalog server cluster replicas if bootstrapping.
547    #[clap(
548        long,
549        env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICATION_FACTOR",
550        default_value = CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
551        value_parser = clap::value_parser!(u32).range(0..=2)
552    )]
553    bootstrap_builtin_catalog_server_cluster_replication_factor: u32,
554    /// The replication factor of the builtin probe cluster replicas if bootstrapping.
555    #[clap(
556        long,
557        env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR",
558        default_value = PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
559        value_parser = clap::value_parser!(u32).range(0..=2)
560    )]
561    bootstrap_builtin_probe_cluster_replication_factor: u32,
562    /// The replication factor of the builtin support cluster replicas if bootstrapping.
563    #[clap(
564        long,
565        env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICATION_FACTOR",
566        default_value = SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
567        value_parser = clap::value_parser!(u32).range(0..=2)
568    )]
569    bootstrap_builtin_support_cluster_replication_factor: u32,
570    /// The replication factor of the builtin analytics cluster replicas if bootstrapping.
571    #[clap(
572        long,
573        env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICATION_FACTOR",
574        default_value = ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
575        value_parser = clap::value_parser!(u32).range(0..=2)
576    )]
577    bootstrap_builtin_analytics_cluster_replication_factor: u32,
578    /// An list of NAME=VALUE pairs used to override static defaults
579    /// for system parameters.
580    #[clap(
581        long,
582        env = "SYSTEM_PARAMETER_DEFAULT",
583        action = ArgAction::Append,
584        value_delimiter = ';'
585    )]
586    system_parameter_default: Vec<KeyValueArg<String, String>>,
587    /// File containing a valid Materialize license key.
588    #[clap(long, env = "LICENSE_KEY")]
589    license_key: Option<String>,
590
591    // === AWS options. ===
592    /// The AWS account ID, which will be used to generate ARNs for
593    /// Materialize-controlled AWS resources.
594    #[clap(long, env = "AWS_ACCOUNT_ID")]
595    aws_account_id: Option<String>,
596    /// The ARN for a Materialize-controlled role to assume before assuming
597    /// a customer's requested role for an AWS connection.
598    #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
599    aws_connection_role_arn: Option<String>,
600    /// Prefix for an external ID to be supplied to all AWS AssumeRole operations.
601    ///
602    /// Details: <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>
603    #[clap(long, env = "AWS_EXTERNAL_ID_PREFIX", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
604    aws_external_id_prefix: Option<AwsExternalIdPrefix>,
605    /// The list of supported AWS PrivateLink availability zone ids.
606    /// Must be zone IDs, of format e.g. "use-az1".
607    #[clap(
608        long,
609        env = "AWS_PRIVATELINK_AVAILABILITY_ZONES",
610        action = ArgAction::Append,
611        use_value_delimiter = true
612    )]
613    aws_privatelink_availability_zones: Option<Vec<String>>,
614
615    // === Tracing options. ===
616    #[clap(flatten)]
617    tracing: TracingCliArgs,
618
619    // === Testing options. ===
620    /// Injects arbitrary whitespace into builtin table fingerprints, which can
621    /// trigger builtin item migrations. The amount of whitespace is determined
622    /// by
623    /// `unsafe_builtin_table_fingerprint_whitespace_version`.
624    /// This argument is meant for testing only and as the names suggests
625    /// should not be set in production.
626    #[clap(long, value_enum, requires = "unsafe_mode")]
627    unsafe_builtin_table_fingerprint_whitespace: Option<UnsafeBuiltinTableFingerprintWhitespace>,
628    /// Controls the amount of whitespace injected by
629    /// `unsafe_builtin_table_fingerprint_whitespace`.
630    /// Incrementing this value can allow triggering multiple builtin
631    /// migrations from a single test. This argument is meant for testing only
632    /// and as the names suggests should not be set in production.
633    #[clap(long, requires = "unsafe_mode", default_value = "1")]
634    unsafe_builtin_table_fingerprint_whitespace_version: usize,
635}
636
637#[derive(ValueEnum, Debug, Clone)]
638enum OrchestratorKind {
639    Kubernetes,
640    Process,
641}
642
643// TODO [Alex Hunt] move this to a shared function that can be imported by the
644// region-controller.
645fn aws_secrets_controller_prefix(env_id: &EnvironmentId) -> String {
646    format!("/user-managed/{}/", env_id)
647}
648fn aws_secrets_controller_key_alias(env_id: &EnvironmentId) -> String {
649    // TODO [Alex Hunt] move this to a shared function that can be imported by the
650    // region-controller.
651    format!("alias/customer_key_{}", env_id)
652}
653
654pub fn main() {
655    let args = cli::parse_args(CliConfig {
656        env_prefix: Some("MZ_"),
657        enable_version_flag: true,
658    });
659    if let Err(err) = run(args) {
660        panic!("environmentd: fatal: {}", err.display_with_causes());
661    }
662}
663
664fn run(mut args: Args) -> Result<(), anyhow::Error> {
665    mz_ore::panic::install_enhanced_handler();
666    let envd_start = Instant::now();
667
668    // Configure signal handling as soon as possible. We want signals to be
669    // handled to our liking ASAP.
670    sys::enable_sigusr2_coverage_dump()?;
671    sys::enable_termination_signal_cleanup()?;
672
673    let license_key = if let Some(license_key_file) = args.license_key {
674        let license_key_text = std::fs::read_to_string(&license_key_file)
675            .context("failed to open license key file")?;
676        let license_key = mz_license_keys::validate(
677            license_key_text.trim(),
678            &args.environment_id.organization_id().to_string(),
679        )
680        .context("failed to validate license key file")?;
681        if license_key.expired {
682            let message = format!(
683                "The license key provided at {license_key_file} is expired! Please contact Materialize for assistance."
684            );
685            match license_key.expiration_behavior {
686                ExpirationBehavior::Warn | ExpirationBehavior::DisableClusterCreation => {
687                    warn!("{message}");
688                }
689                ExpirationBehavior::Disable => bail!("{message}"),
690            }
691        }
692        license_key
693    } else if matches!(args.orchestrator, OrchestratorKind::Kubernetes) {
694        bail!("--license-key is required when running in Kubernetes");
695    } else {
696        ValidatedLicenseKey::default()
697    };
698
699    // Configure testing options.
700    if let Some(fingerprint_whitespace) = args.unsafe_builtin_table_fingerprint_whitespace {
701        assert!(args.unsafe_mode);
702        let whitespace = "\n".repeat(args.unsafe_builtin_table_fingerprint_whitespace_version);
703        *UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE
704            .lock()
705            .expect("lock poisoned") = Some((fingerprint_whitespace, whitespace));
706    }
707
708    // Start Tokio runtime.
709
710    let ncpus_useful = usize::max(1, cmp::min(num_cpus::get(), num_cpus::get_physical()));
711    let runtime = Arc::new(
712        tokio::runtime::Builder::new_multi_thread()
713            .worker_threads(ncpus_useful)
714            // The default thread name exceeds the Linux limit on thread name
715            // length, so pick something shorter.
716            .thread_name_fn(|| {
717                static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
718                let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
719                format!("tokio:work-{}", id)
720            })
721            .enable_all()
722            .build()?,
723    );
724
725    // Configure tracing to log the service name when using the process
726    // orchestrator, which intermingles log output from multiple services. Other
727    // orchestrators separate log output from different services.
728    args.tracing.log_prefix = if matches!(args.orchestrator, OrchestratorKind::Process) {
729        Some("environmentd".to_string())
730    } else {
731        None
732    };
733
734    let metrics_registry = MetricsRegistry::new();
735    let (tracing_handle, _tracing_guard) = runtime.block_on(args.tracing.configure_tracing(
736        StaticTracingConfig {
737            service_name: "environmentd",
738            build_info: BUILD_INFO,
739        },
740        metrics_registry.clone(),
741    ))?;
742    register_runtime_metrics("main", runtime.metrics(), &metrics_registry);
743
744    let span = tracing::info_span!("environmentd::run").entered();
745
746    info!("startup: envd init: beginning");
747    info!("startup: envd init: preamble beginning");
748
749    let metrics = Metrics::register_into(&metrics_registry, BUILD_INFO);
750
751    runtime.block_on(mz_alloc::register_metrics_into(&metrics_registry));
752    runtime.block_on(mz_metrics::register_metrics_into(
753        &metrics_registry,
754        mz_dyncfgs::all_dyncfgs(),
755    ));
756
757    // Initialize fail crate for failpoint support
758    let _failpoint_scenario = FailScenario::setup();
759
760    // Configure connections.
761    let tls = args.tls.into_config()?;
762    let frontegg = FronteggAuthenticator::from_args(args.frontegg, &metrics_registry)?;
763    let listeners_config: ListenersConfig = {
764        let f = File::open(args.listeners_config_path)?;
765        serde_json::from_reader(f)?
766    };
767
768    // Configure CORS.
769    let allowed_origins = if !args.cors_allowed_origin.is_empty() {
770        args.cors_allowed_origin
771    } else {
772        let mut allowed_origins = Vec::with_capacity(listeners_config.http.len() * 6);
773        for (_, listener) in &listeners_config.http {
774            let port = listener.addr().port();
775            allowed_origins.extend([
776                HeaderValue::from_str(&format!("http://localhost:{}", port)).unwrap(),
777                HeaderValue::from_str(&format!("http://127.0.0.1:{}", port)).unwrap(),
778                HeaderValue::from_str(&format!("http://[::1]:{}", port)).unwrap(),
779                HeaderValue::from_str(&format!("https://localhost:{}", port)).unwrap(),
780                HeaderValue::from_str(&format!("https://127.0.0.1:{}", port)).unwrap(),
781                HeaderValue::from_str(&format!("https://[::1]:{}", port)).unwrap(),
782            ])
783        }
784        allowed_origins
785    };
786    let cors_allowed_origin = mz_http_util::build_cors_allowed_origin(&allowed_origins);
787
788    // Configure controller.
789    let entered = info_span!("environmentd::configure_controller").entered();
790    let (orchestrator, secrets_controller, cloud_resource_controller): (
791        Arc<dyn Orchestrator>,
792        Arc<dyn SecretsController>,
793        Option<Arc<dyn CloudResourceController>>,
794    ) = match args.orchestrator {
795        OrchestratorKind::Kubernetes => {
796            if args.orchestrator_process_scratch_directory.is_some() {
797                bail!(
798                    "--orchestrator-process-scratch-directory is \
799                      not currently usable with the kubernetes orchestrator"
800                );
801            }
802
803            let orchestrator = Arc::new(
804                runtime
805                    .block_on(KubernetesOrchestrator::new(KubernetesOrchestratorConfig {
806                        context: args.orchestrator_kubernetes_context.clone(),
807                        scheduler_name: args.orchestrator_kubernetes_scheduler_name,
808                        service_annotations: args
809                            .orchestrator_kubernetes_service_annotation
810                            .into_iter()
811                            .map(|l| (l.key, l.value))
812                            .collect(),
813                        service_labels: args
814                            .orchestrator_kubernetes_service_label
815                            .into_iter()
816                            .map(|l| (l.key, l.value))
817                            .collect(),
818                        service_node_selector: args
819                            .orchestrator_kubernetes_service_node_selector
820                            .into_iter()
821                            .map(|l| (l.key, l.value))
822                            .collect(),
823                        service_affinity: args.orchestrator_kubernetes_service_affinity,
824                        service_tolerations: args.orchestrator_kubernetes_service_tolerations,
825                        service_account: args.orchestrator_kubernetes_service_account,
826                        image_pull_policy: args.orchestrator_kubernetes_image_pull_policy,
827                        aws_external_id_prefix: args.aws_external_id_prefix.clone(),
828                        coverage: args.orchestrator_kubernetes_coverage,
829                        ephemeral_volume_storage_class: args
830                            .orchestrator_kubernetes_ephemeral_volume_class
831                            .clone(),
832                        service_fs_group: args.orchestrator_kubernetes_service_fs_group.clone(),
833                        name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
834                        collect_pod_metrics: !args
835                            .orchestrator_kubernetes_disable_pod_metrics_collection,
836                        enable_prometheus_scrape_annotations: args
837                            .orchestrator_kubernetes_enable_prometheus_scrape_annotations,
838                    }))
839                    .context("creating kubernetes orchestrator")?,
840            );
841            let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
842                SecretsControllerKind::Kubernetes => {
843                    let sc = Arc::clone(&orchestrator);
844                    let sc: Arc<dyn SecretsController> = sc;
845                    sc
846                }
847                SecretsControllerKind::AwsSecretsManager => {
848                    Arc::new(
849                        runtime.block_on(AwsSecretsController::new(
850                            // TODO [Alex Hunt] move this to a shared function that can be imported by the
851                            // region-controller.
852                            &aws_secrets_controller_prefix(&args.environment_id),
853                            &aws_secrets_controller_key_alias(&args.environment_id),
854                            args.aws_secrets_controller_tags
855                                .into_iter()
856                                .map(|tag| (tag.key, tag.value))
857                                .collect(),
858                        )),
859                    )
860                }
861                SecretsControllerKind::LocalFile => bail!(
862                    "SecretsControllerKind::LocalFile is not compatible with Orchestrator::Kubernetes."
863                ),
864            };
865            let cloud_resource_controller = Arc::clone(&orchestrator);
866            (
867                orchestrator,
868                secrets_controller,
869                Some(cloud_resource_controller),
870            )
871        }
872        OrchestratorKind::Process => {
873            if args
874                .orchestrator_kubernetes_ephemeral_volume_class
875                .is_some()
876            {
877                bail!(
878                    "--orchestrator-kubernetes-ephemeral-volume-class is \
879                      not usable with the process orchestrator"
880                );
881            }
882            let orchestrator = Arc::new(
883                runtime
884                    .block_on(ProcessOrchestrator::new(ProcessOrchestratorConfig {
885                        // Look for binaries in the same directory as the
886                        // running binary. When running via `cargo run`, this
887                        // means that debug binaries look for other debug
888                        // binaries and release binaries look for other release
889                        // binaries.
890                        image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
891                        suppress_output: false,
892                        environment_id: args.environment_id.to_string(),
893                        secrets_dir: args
894                            .orchestrator_process_secrets_directory
895                            .clone()
896                            .expect("clap enforced"),
897                        command_wrapper: args
898                            .orchestrator_process_wrapper
899                            .map_or(Ok(vec![]), |s| shell_words::split(&s))?,
900                        propagate_crashes: args.orchestrator_process_propagate_crashes,
901                        tcp_proxy: args.orchestrator_process_tcp_proxy_listen_addr.map(
902                            |listen_addr| ProcessOrchestratorTcpProxyConfig {
903                                listen_addr,
904                                prometheus_service_discovery_dir: args
905                                    .orchestrator_process_prometheus_service_discovery_directory,
906                            },
907                        ),
908                        scratch_directory: args
909                            .orchestrator_process_scratch_directory
910                            .expect("process orchestrator requires scratch directory"),
911                    }))
912                    .context("creating process orchestrator")?,
913            );
914            let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
915                SecretsControllerKind::Kubernetes => bail!(
916                    "SecretsControllerKind::Kubernetes is not compatible with Orchestrator::Process."
917                ),
918                SecretsControllerKind::AwsSecretsManager => Arc::new(
919                    runtime.block_on(AwsSecretsController::new(
920                        &aws_secrets_controller_prefix(&args.environment_id),
921                        &aws_secrets_controller_key_alias(&args.environment_id),
922                        args.aws_secrets_controller_tags
923                            .into_iter()
924                            .map(|tag| (tag.key, tag.value))
925                            .collect(),
926                    )),
927                ),
928                SecretsControllerKind::LocalFile => {
929                    let sc = Arc::clone(&orchestrator);
930                    let sc: Arc<dyn SecretsController> = sc;
931                    sc
932                }
933            };
934            (orchestrator, secrets_controller, None)
935        }
936    };
937    drop(entered);
938    let cloud_resource_reader = cloud_resource_controller.as_ref().map(|c| c.reader());
939    let secrets_reader = secrets_controller.reader();
940    let now = SYSTEM_TIME.clone();
941
942    let mut persist_config =
943        PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
944    // Start with compaction disabled, later enable it if we're not in read-only mode.
945    persist_config.disable_compaction();
946
947    let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
948    let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
949
950    match args.persist_isolated_runtime_threads {
951        // Use the default.
952        None | Some(0) => (),
953        Some(x @ ..=-1) => {
954            let threads = num_cpus::get().saturating_add_signed(x).max(1);
955            persist_config.isolated_runtime_worker_threads = threads;
956        }
957        Some(x @ 1..) => {
958            let threads = usize::try_from(x).expect("pattern matched a positive value");
959            persist_config.isolated_runtime_worker_threads = threads;
960        }
961    };
962
963    let _server = runtime.spawn_named(
964        || "persist::rpc::server",
965        async move {
966            info!(
967                "listening for Persist PubSub connections on {}",
968                args.internal_persist_pubsub_listen_addr
969            );
970            // Intentionally do not bubble up errors here, we don't want to take
971            // down environmentd if there are any issues with the pubsub server.
972            let res = persist_pubsub_server
973                .serve(args.internal_persist_pubsub_listen_addr)
974                .await;
975            error!("Persist Pubsub server exited {:?}", res);
976        }
977        .instrument(tracing::info_span!("persist::rpc::server")),
978    );
979
980    let persist_clients = {
981        // PersistClientCache may spawn tasks, so run within a tokio runtime context
982        let _tokio_guard = runtime.enter();
983        PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
984            let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
985                cfg,
986                persist_pubsub_client.sender,
987                metrics,
988            ));
989            PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
990        })
991    };
992
993    let consensus_uri = args.persist_consensus_url.unwrap_or_else(|| {
994        args.metadata_backend_url
995            .as_ref()
996            .map(|metadata_backend_url| {
997                SensitiveUrl(
998                    Url::parse_with_params(
999                        metadata_backend_url.0.as_ref(),
1000                        &[("options", "--search_path=consensus")],
1001                    )
1002                    .unwrap(),
1003                )
1004            })
1005            .expect("either --persist-consensus-url or --metadata-backend-url must be provided")
1006    });
1007    let timestamp_oracle_url = args.timestamp_oracle_url.or_else(|| {
1008        args.metadata_backend_url
1009            .as_ref()
1010            .map(|metadata_backend_url| {
1011                SensitiveUrl(
1012                    Url::parse_with_params(
1013                        metadata_backend_url.0.as_ref(),
1014                        &[("options", "--search_path=tsoracle")],
1015                    )
1016                    .unwrap(),
1017                )
1018            })
1019    });
1020
1021    let persist_clients = Arc::new(persist_clients);
1022    let connection_context = ConnectionContext::from_cli_args(
1023        args.environment_id.to_string(),
1024        &args.tracing.startup_log_filter,
1025        args.aws_external_id_prefix,
1026        args.aws_connection_role_arn,
1027        secrets_reader,
1028        cloud_resource_reader,
1029    );
1030    let orchestrator = Arc::new(TracingOrchestrator::new(orchestrator, args.tracing.clone()));
1031    let controller = ControllerConfig {
1032        build_info: &BUILD_INFO,
1033        orchestrator,
1034        persist_location: PersistLocation {
1035            blob_uri: args.persist_blob_url,
1036            consensus_uri,
1037        },
1038        persist_clients: Arc::clone(&persist_clients),
1039        clusterd_image: args.clusterd_image.expect("clap enforced"),
1040        init_container_image: args.orchestrator_kubernetes_init_container_image,
1041        deploy_generation: args.deploy_generation,
1042        now: SYSTEM_TIME.clone(),
1043        metrics_registry: metrics_registry.clone(),
1044        persist_pubsub_url: args.persist_pubsub_url,
1045        connection_context,
1046        // When serialized to args in the controller, only the relevant flags will be passed
1047        // through, so we just set all of them
1048        secrets_args: SecretsReaderCliArgs {
1049            secrets_reader: args.secrets_controller,
1050            secrets_reader_local_file_dir: args.orchestrator_process_secrets_directory,
1051            secrets_reader_kubernetes_context: Some(args.orchestrator_kubernetes_context),
1052            secrets_reader_aws_prefix: Some(aws_secrets_controller_prefix(&args.environment_id)),
1053            secrets_reader_name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
1054        },
1055    };
1056
1057    let cluster_replica_sizes = ClusterReplicaSizeMap::parse_from_str(
1058        &args.cluster_replica_sizes,
1059        !license_key.allow_credit_consumption_override,
1060    )
1061    .context("parsing replica size map")?;
1062
1063    emit_boot_diagnostics!(&BUILD_INFO);
1064    sys::adjust_rlimits();
1065
1066    info!(
1067        "startup: envd init: preamble complete in {:?}",
1068        envd_start.elapsed()
1069    );
1070
1071    let serve_start = Instant::now();
1072    info!("startup: envd init: serving beginning");
1073    let server = runtime.block_on(async {
1074        let listeners = Listeners::bind(listeners_config).await?;
1075        let catalog_config = CatalogConfig {
1076            persist_clients,
1077            metrics: Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry)),
1078        };
1079        let server = listeners
1080            .serve(crate::Config {
1081                // Special modes.
1082                unsafe_mode: args.unsafe_mode,
1083                all_features: args.all_features,
1084                // Connection options.
1085                tls,
1086                tls_reload_certs: mz_server_core::default_cert_reload_ticker(),
1087                external_login_password_mz_system: args.external_login_password_mz_system,
1088                frontegg,
1089                cors_allowed_origin,
1090                egress_addresses: args.announce_egress_address,
1091                http_host_name: args.http_host_name,
1092                internal_console_redirect_url: args.internal_console_redirect_url,
1093                // Controller options.
1094                controller,
1095                secrets_controller,
1096                cloud_resource_controller,
1097                // Storage options.
1098                storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
1099                storage_usage_retention_period: args.storage_usage_retention_period,
1100                // Adapter options.
1101                catalog_config,
1102                availability_zones: args.availability_zone,
1103                cluster_replica_sizes,
1104                timestamp_oracle_url,
1105                segment_api_key: args.segment_api_key,
1106                segment_client_side: args.segment_client_side,
1107                test_only_dummy_segment_client: args.test_only_dummy_segment_client,
1108                launchdarkly_sdk_key: args.launchdarkly_sdk_key,
1109                launchdarkly_key_map: args
1110                    .launchdarkly_key_map
1111                    .into_iter()
1112                    .map(|kv| (kv.key, kv.value))
1113                    .collect(),
1114                config_sync_timeout: args.config_sync_timeout,
1115                config_sync_loop_interval: args.config_sync_loop_interval,
1116                config_sync_file_path: args.config_sync_file_path,
1117
1118                // Bootstrap options.
1119                environment_id: args.environment_id,
1120                bootstrap_role: args.bootstrap_role,
1121                bootstrap_default_cluster_replica_size: args.bootstrap_default_cluster_replica_size,
1122                bootstrap_default_cluster_replication_factor: args
1123                    .bootstrap_default_cluster_replication_factor,
1124                bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1125                    size: args.bootstrap_builtin_system_cluster_replica_size,
1126                    replication_factor: args.bootstrap_builtin_system_cluster_replication_factor,
1127                },
1128                bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1129                    size: args.bootstrap_builtin_catalog_server_cluster_replica_size,
1130                    replication_factor: args
1131                        .bootstrap_builtin_catalog_server_cluster_replication_factor,
1132                },
1133                bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1134                    size: args.bootstrap_builtin_probe_cluster_replica_size,
1135                    replication_factor: args.bootstrap_builtin_probe_cluster_replication_factor,
1136                },
1137                bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1138                    size: args.bootstrap_builtin_support_cluster_replica_size,
1139                    replication_factor: args.bootstrap_builtin_support_cluster_replication_factor,
1140                },
1141                bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1142                    size: args.bootstrap_builtin_analytics_cluster_replica_size,
1143                    replication_factor: args.bootstrap_builtin_analytics_cluster_replication_factor,
1144                },
1145                system_parameter_defaults: args
1146                    .system_parameter_default
1147                    .into_iter()
1148                    .map(|kv| (kv.key, kv.value))
1149                    .collect(),
1150                helm_chart_version: args.helm_chart_version.clone(),
1151                license_key,
1152                // AWS options.
1153                aws_account_id: args.aws_account_id,
1154                aws_privatelink_availability_zones: args.aws_privatelink_availability_zones,
1155                // Observability options.
1156                metrics_registry,
1157                tracing_handle,
1158                // Testing options.
1159                now,
1160            })
1161            .await
1162            .maybe_terminate("booting server")?;
1163        Ok::<_, anyhow::Error>(server)
1164    })?;
1165    info!(
1166        "startup: envd init: serving complete in {:?}",
1167        serve_start.elapsed()
1168    );
1169
1170    let start_duration = envd_start.elapsed();
1171    metrics
1172        .start_time_environmentd
1173        .set(start_duration.as_millis().try_into().expect("must fit"));
1174    let span = span.exit();
1175    let id = span.context().span().span_context().trace_id();
1176    drop(span);
1177
1178    info!("startup: envd init: complete in {start_duration:?}");
1179
1180    println!(
1181        "environmentd {} listening...",
1182        BUILD_INFO.human_version(args.helm_chart_version)
1183    );
1184    for (name, handle) in &server.sql_listener_handles {
1185        println!("{} SQL address: {}", name, handle.local_addr);
1186    }
1187    for (name, handle) in &server.http_listener_handles {
1188        println!("{} HTTP address: {}", name, handle.local_addr);
1189    }
1190    // TODO move persist pubsub address like metrics address?
1191    println!(
1192        " Internal Persist PubSub address: {}",
1193        args.internal_persist_pubsub_listen_addr
1194    );
1195
1196    println!(" Root trace ID: {id}");
1197
1198    // Block forever.
1199    loop {
1200        thread::park();
1201    }
1202}
1203
1204fn build_info() -> Vec<String> {
1205    let openssl_version =
1206        unsafe { CStr::from_ptr(openssl_sys::OpenSSL_version(openssl_sys::OPENSSL_VERSION)) };
1207    let rdkafka_version = unsafe { CStr::from_ptr(rdkafka_sys::bindings::rd_kafka_version_str()) };
1208    vec![
1209        openssl_version.to_string_lossy().into_owned(),
1210        format!("librdkafka v{}", rdkafka_version.to_string_lossy()),
1211    ]
1212}
1213
1214#[derive(Debug, Clone)]
1215struct Metrics {
1216    pub start_time_environmentd: IntGauge,
1217}
1218
1219impl Metrics {
1220    pub fn register_into(registry: &MetricsRegistry, build_info: BuildInfo) -> Metrics {
1221        Metrics {
1222            start_time_environmentd: registry.register(metric!(
1223                name: "mz_start_time_environmentd",
1224                help: "Time in milliseconds from environmentd start until the adapter is ready.",
1225                const_labels: {
1226                    "version" => build_info.version,
1227                    "build_type" => if cfg!(release) { "release" } else { "debug" }
1228                },
1229            )),
1230        }
1231    }
1232}