mz_environmentd/environmentd/
main.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Manages a single Materialize environment.
11//!
12//! It listens for SQL connections on port 6875 (MTRL) and for HTTP connections
13//! on port 6876.
14
15use std::ffi::CStr;
16use std::fs::File;
17use std::net::{IpAddr, SocketAddr};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::LazyLock;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use std::{cmp, env, iter, thread};
24
25use anyhow::{Context, bail};
26use clap::{ArgAction, Parser, ValueEnum};
27use fail::FailScenario;
28use http::header::HeaderValue;
29use ipnet::IpNet;
30use itertools::Itertools;
31use mz_adapter::ResultExt;
32use mz_adapter_types::bootstrap_builtin_cluster_config::{
33    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
34    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, DEFAULT_REPLICATION_FACTOR,
35    PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR, SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
36    SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
37};
38use mz_auth::password::Password;
39use mz_aws_secrets_controller::AwsSecretsController;
40use mz_build_info::BuildInfo;
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController};
43use mz_controller::ControllerConfig;
44use mz_frontegg_auth::{Authenticator as FronteggAuthenticator, FronteggCliArgs};
45use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
46use mz_orchestrator::Orchestrator;
47use mz_orchestrator_kubernetes::{
48    KubernetesImagePullPolicy, KubernetesOrchestrator, KubernetesOrchestratorConfig,
49};
50use mz_orchestrator_process::{
51    ProcessOrchestrator, ProcessOrchestratorConfig, ProcessOrchestratorTcpProxyConfig,
52};
53use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs, TracingOrchestrator};
54use mz_ore::cli::{self, CliConfig, KeyValueArg};
55use mz_ore::error::ErrorExt;
56use mz_ore::metric;
57use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
58use mz_ore::now::SYSTEM_TIME;
59use mz_ore::task::RuntimeExt;
60use mz_ore::url::SensitiveUrl;
61use mz_persist_client::PersistLocation;
62use mz_persist_client::cache::PersistClientCache;
63use mz_persist_client::cfg::PersistConfig;
64use mz_persist_client::rpc::{
65    MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
66};
67use mz_secrets::SecretsController;
68use mz_server_core::TlsCliArgs;
69use mz_service::emit_boot_diagnostics;
70use mz_service::secrets::{SecretsControllerKind, SecretsReaderCliArgs};
71use mz_sql::catalog::EnvironmentId;
72use mz_storage_types::connections::ConnectionContext;
73use opentelemetry::trace::TraceContextExt;
74use prometheus::IntGauge;
75use tracing::{Instrument, error, info, info_span, warn};
76use tracing_opentelemetry::OpenTelemetrySpanExt;
77use url::Url;
78
79use crate::environmentd::sys;
80use crate::{BUILD_INFO, CatalogConfig, ListenerConfig, Listeners, ListenersConfig};
81
82static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
83static LONG_VERSION: LazyLock<String> = LazyLock::new(|| {
84    iter::once(BUILD_INFO.human_version(None))
85        .chain(build_info())
86        .join("\n")
87});
88
89/// Manages a single Materialize environment.
90#[derive(Parser, Debug)]
91#[clap(
92    name = "environmentd",
93    next_line_help = true,
94    version = VERSION.as_str(),
95    long_version = LONG_VERSION.as_str(),
96)]
97pub struct Args {
98    // === Special modes. ===
99    /// Enable unsafe features. Unsafe features are those that should never run
100    /// in production but are appropriate for testing/local development.
101    #[clap(long, env = "UNSAFE_MODE")]
102    unsafe_mode: bool,
103    /// Enables all feature flags, meant only as a tool for local development;
104    /// this should never be enabled in CI.
105    #[clap(long, env = "ALL_FEATURES")]
106    all_features: bool,
107
108    // === Connection options. ===
109    /// Path to a file containing the json-formatted configuration of our
110    /// metrics, HTTP, and sql listeners.
111    #[clap(
112        long,
113        env = "LISTENERS_CONFIG_PATH",
114        value_name = "PATH",
115        action = ArgAction::Set,
116    )]
117    listeners_config_path: PathBuf,
118    /// Password for the mz_system user.
119    #[clap(
120        long,
121        env = "EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM",
122        action = ArgAction::Set,
123    )]
124    external_login_password_mz_system: Option<Password>,
125    /// The address on which to listen for Persist PubSub connections.
126    ///
127    /// Connections to this address are not subject to encryption, authentication,
128    /// or access control. Care should be taken to not expose the listen address
129    /// to the public internet or other unauthorized parties.
130    #[clap(
131        long,
132        value_name = "HOST:PORT",
133        env = "INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR",
134        default_value = "127.0.0.1:6879",
135        action = ArgAction::Set,
136    )]
137    internal_persist_pubsub_listen_addr: SocketAddr,
138    /// Enable cross-origin resource sharing (CORS) for HTTP requests from the
139    /// specified origin.
140    ///
141    /// The default allows all local connections.
142    /// "*" allows all.
143    /// "*.domain.com" allows connections from any matching subdomain.
144    ///
145    /// Wildcards in other positions (e.g., "https://*.foo.com" or "https://foo.*.com") have no effect.
146    #[structopt(long, env = "CORS_ALLOWED_ORIGIN")]
147    cors_allowed_origin: Vec<HeaderValue>,
148    /// Public CIDR which the cloud environment has configured for
149    /// egress.
150    #[clap(
151        long,
152        env = "ANNOUNCE_EGRESS_ADDRESS",
153        action = ArgAction::Append,
154        use_value_delimiter = true
155    )]
156    announce_egress_address: Vec<IpNet>,
157    /// The external host name to connect to the HTTP server of this
158    /// environment.
159    ///
160    /// Presently used to render webhook URLs for end users in notices and the
161    /// system catalog. Not used to establish connections directly.
162    #[clap(long, env = "HTTP_HOST_NAME")]
163    http_host_name: Option<String>,
164    /// The URL of the Materialize console to proxy from the /internal-console
165    /// endpoint on the internal HTTP server.
166    #[clap(long, env = "INTERNAL_CONSOLE_REDIRECT_URL")]
167    internal_console_redirect_url: Option<String>,
168    /// TLS arguments.
169    #[clap(flatten)]
170    tls: TlsCliArgs,
171    /// Frontegg arguments.
172    #[clap(flatten)]
173    frontegg: FronteggCliArgs,
174    // === Orchestrator options. ===
175    /// The service orchestrator implementation to use.
176    #[structopt(long, value_enum, env = "ORCHESTRATOR")]
177    orchestrator: OrchestratorKind,
178    /// Name of a non-default Kubernetes scheduler, if any.
179    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SCHEDULER_NAME")]
180    orchestrator_kubernetes_scheduler_name: Option<String>,
181    /// Annotations to apply to all services created by the Kubernetes orchestrator
182    /// in the form `KEY=VALUE`.
183    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ANNOTATION")]
184    orchestrator_kubernetes_service_annotation: Vec<KeyValueArg<String, String>>,
185    /// Labels to apply to all services created by the Kubernetes orchestrator
186    /// in the form `KEY=VALUE`.
187    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_LABEL")]
188    orchestrator_kubernetes_service_label: Vec<KeyValueArg<String, String>>,
189    /// Node selector to apply to all services created by the Kubernetes
190    /// orchestrator in the form `KEY=VALUE`.
191    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_NODE_SELECTOR")]
192    orchestrator_kubernetes_service_node_selector: Vec<KeyValueArg<String, String>>,
193    /// Affinity to apply to all services created by the Kubernetes
194    /// orchestrator as a JSON string.
195    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_AFFINITY")]
196    orchestrator_kubernetes_service_affinity: Option<String>,
197    /// Tolerations to apply to all services created by the Kubernetes
198    /// orchestrator as a JSON string.
199    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_TOLERATIONS")]
200    orchestrator_kubernetes_service_tolerations: Option<String>,
201    /// The name of a service account to apply to all services created by the
202    /// Kubernetes orchestrator.
203    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ACCOUNT")]
204    orchestrator_kubernetes_service_account: Option<String>,
205    /// The Kubernetes context to use with the Kubernetes orchestrator.
206    ///
207    /// This defaults to `minikube` to prevent disaster (e.g., connecting to a
208    /// production cluster that happens to be the active Kubernetes context.)
209    #[structopt(
210        long,
211        env = "ORCHESTRATOR_KUBERNETES_CONTEXT",
212        default_value = "minikube"
213    )]
214    orchestrator_kubernetes_context: String,
215    /// The image pull policy to use for services created by the Kubernetes
216    /// orchestrator.
217    #[structopt(
218        long,
219        env = "ORCHESTRATOR_KUBERNETES_IMAGE_PULL_POLICY",
220        default_value = "always",
221        value_enum
222    )]
223    orchestrator_kubernetes_image_pull_policy: KubernetesImagePullPolicy,
224    /// The init container for services created by the Kubernetes orchestrator.
225    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_INIT_CONTAINER_IMAGE")]
226    orchestrator_kubernetes_init_container_image: Option<String>,
227    /// The Kubernetes StorageClass to use for the ephemeral volume attached to
228    /// services that request disk.
229    ///
230    /// If unspecified, the Kubernetes orchestrator will refuse to create
231    /// services that request disk.
232    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_EPHEMERAL_VOLUME_CLASS")]
233    orchestrator_kubernetes_ephemeral_volume_class: Option<String>,
234    /// The optional fs group for service's pods' `securityContext`.
235    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_FS_GROUP")]
236    orchestrator_kubernetes_service_fs_group: Option<i64>,
237    /// The prefix to prepend to all kubernetes object names.
238    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_NAME_PREFIX")]
239    orchestrator_kubernetes_name_prefix: Option<String>,
240    /// Whether to enable pod metrics collection.
241    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_DISABLE_POD_METRICS_COLLECTION")]
242    orchestrator_kubernetes_disable_pod_metrics_collection: bool,
243    /// Whether to annotate pods for prometheus service discovery.
244    #[clap(
245        long,
246        env = "ORCHESTRATOR_KUBERNETES_ENABLE_PROMETHEUS_SCRAPE_ANNOTATIONS"
247    )]
248    orchestrator_kubernetes_enable_prometheus_scrape_annotations: bool,
249    #[clap(long, env = "ORCHESTRATOR_PROCESS_WRAPPER")]
250    orchestrator_process_wrapper: Option<String>,
251    /// Where the process orchestrator should store secrets.
252    #[clap(
253        long,
254        env = "ORCHESTRATOR_PROCESS_SECRETS_DIRECTORY",
255        value_name = "PATH",
256        required_if_eq("orchestrator", "process")
257    )]
258    orchestrator_process_secrets_directory: Option<PathBuf>,
259    /// Whether the process orchestrator should handle crashes in child
260    /// processes by crashing the parent process.
261    #[clap(long, env = "ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES")]
262    orchestrator_process_propagate_crashes: bool,
263    /// An IP address on which the process orchestrator should bind TCP proxies
264    /// for Unix domain sockets.
265    ///
266    /// When specified, for each named port of each created service, the process
267    /// orchestrator will bind a TCP listener to the specified address that
268    /// proxies incoming connections to the underlying Unix domain socket. The
269    /// allocated TCP port will be emitted as a tracing event.
270    ///
271    /// The primary use is live debugging the running child services via tools
272    /// that do not support Unix domain sockets (e.g., Prometheus, web
273    /// browsers).
274    #[clap(long, env = "ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR")]
275    orchestrator_process_tcp_proxy_listen_addr: Option<IpAddr>,
276    /// A directory in which the process orchestrator should write Prometheus
277    /// scrape targets, for use with Prometheus's file-based service discovery.
278    ///
279    /// Each namespaced orchestrator will maintain a single JSON file into the
280    /// directory named `NAMESPACE.json` containing the scrape targets for all
281    /// extant services. The scrape targets will use the TCP proxy address, as
282    /// Prometheus does not support scraping over Unix domain sockets.
283    ///
284    /// This option is ignored unless
285    /// `--orchestrator-process-tcp-proxy-listen-addr` is set.
286    ///
287    /// See also: <https://prometheus.io/docs/guides/file-sd/>
288    #[clap(
289        long,
290        env = "ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY"
291    )]
292    orchestrator_process_prometheus_service_discovery_directory: Option<PathBuf>,
293    /// A scratch directory that orchestrated processes can use for ephemeral storage.
294    #[clap(
295        long,
296        env = "ORCHESTRATOR_PROCESS_SCRATCH_DIRECTORY",
297        value_name = "PATH"
298    )]
299    orchestrator_process_scratch_directory: Option<PathBuf>,
300    /// Whether to use coverage build and collect coverage information. Not to be used for
301    /// production, only testing.
302    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_COVERAGE")]
303    orchestrator_kubernetes_coverage: bool,
304    /// The secrets controller implementation to use.
305    #[structopt(
306        long,
307        value_enum,
308        env = "SECRETS_CONTROLLER",
309        default_value_ifs([
310            ("orchestrator", "kubernetes", Some("kubernetes")),
311            ("orchestrator", "process", Some("local-file"))
312        ]),
313        default_value("kubernetes"), // This shouldn't be possible, but it makes clap happy.
314    )]
315    secrets_controller: SecretsControllerKind,
316    /// The list of tags to be set on AWS Secrets Manager secrets created by the
317    /// AWS secrets controller.
318    #[clap(
319        long,
320        env = "AWS_SECRETS_CONTROLLER_TAGS",
321        action = ArgAction::Append,
322        value_delimiter = ';',
323        required_if_eq("secrets_controller", "aws-secrets-manager")
324    )]
325    aws_secrets_controller_tags: Vec<KeyValueArg<String, String>>,
326    /// The clusterd image reference to use.
327    #[structopt(
328        long,
329        env = "CLUSTERD_IMAGE",
330        required_if_eq("orchestrator", "kubernetes"),
331        default_value_if("orchestrator", "process", Some("clusterd"))
332    )]
333    clusterd_image: Option<String>,
334    /// A number representing the environment's generation.
335    ///
336    /// This is incremented to request that the new process perform a graceful
337    /// transition of power from the prior generation.
338    #[clap(long, env = "DEPLOY_GENERATION", default_value = "0")]
339    deploy_generation: u64,
340
341    /// Can be provided in place of both persist_consensus_url and
342    /// timestamp_oracle_url in order to point both at the same backend
343    #[clap(
344        long,
345        env = "METADATA_BACKEND_URL",
346        conflicts_with_all = &[
347            "persist_consensus_url",
348            "timestamp_oracle_url",
349        ],
350    )]
351    metadata_backend_url: Option<SensitiveUrl>,
352
353    /// Helm chart version for self-hosted Materialize. This version does not correspond to the
354    /// Materialize (core) version (v0.125.0), but is time-based for our twice-a-year helm chart
355    /// releases: v25.1.Z, v25.2.Z in 2025, then v26.1.Z, v26.2.Z in 2026, and so on. This version
356    /// is displayed in addition in `SELECT mz_version()` if set.
357    #[clap(long, env = "HELM_CHART_VERSION")]
358    helm_chart_version: Option<String>,
359
360    // === Storage options. ===
361    /// Where the persist library should store its blob data.
362    #[clap(long, env = "PERSIST_BLOB_URL")]
363    persist_blob_url: SensitiveUrl,
364    /// Where the persist library should perform consensus.
365    #[clap(long, env = "PERSIST_CONSENSUS_URL")]
366    persist_consensus_url: Option<SensitiveUrl>,
367    /// The Persist PubSub URL.
368    ///
369    /// This URL is passed to `clusterd` for discovery of the Persist PubSub service.
370    #[clap(
371        long,
372        env = "PERSIST_PUBSUB_URL",
373        default_value = "http://localhost:6879"
374    )]
375    persist_pubsub_url: String,
376    /// The number of worker threads created for the IsolatedRuntime used for
377    /// storage related tasks. A negative value will subtract from the number
378    /// of threads returned by [`num_cpus::get`].
379    #[clap(long, env = "PERSIST_ISOLATED_RUNTIME_THREADS")]
380    persist_isolated_runtime_threads: Option<isize>,
381    /// The interval in seconds at which to collect storage usage information.
382    #[clap(
383        long,
384        env = "STORAGE_USAGE_COLLECTION_INTERVAL",
385        value_parser = humantime::parse_duration,
386        default_value = "3600s"
387    )]
388    storage_usage_collection_interval_sec: Duration,
389    /// The period for which to retain usage records. Note that the retention
390    /// period is only evaluated at server start time, so rebooting the server
391    /// is required to discard old records.
392    #[clap(long, env = "STORAGE_USAGE_RETENTION_PERIOD", value_parser = humantime::parse_duration)]
393    storage_usage_retention_period: Option<Duration>,
394
395    // === Adapter options. ===
396    /// The PostgreSQL URL for the Postgres-backed timestamp oracle.
397    #[clap(long, env = "TIMESTAMP_ORACLE_URL", value_name = "POSTGRES_URL")]
398    timestamp_oracle_url: Option<SensitiveUrl>,
399    /// Availability zones in which storage and compute resources may be
400    /// deployed.
401    #[clap(long, env = "AVAILABILITY_ZONE", use_value_delimiter = true)]
402    availability_zone: Vec<String>,
403    /// A map from size name to resource allocations for cluster replicas.
404    #[clap(
405        long,
406        env = "CLUSTER_REPLICA_SIZES",
407        requires = "bootstrap_default_cluster_replica_size"
408    )]
409    cluster_replica_sizes: String,
410    /// An API key for Segment. Enables export of audit events to Segment.
411    #[clap(long, env = "SEGMENT_API_KEY")]
412    segment_api_key: Option<String>,
413    /// Whether the Segment client is being used on the client side
414    /// (rather than the server side).
415    ///
416    /// Enabling this causes the Segment server to record the IP address from
417    /// which the event was sent.
418    #[clap(long, env = "SEGMENT_CLIENT_SIDE")]
419    segment_client_side: bool,
420    /// Only create a dummy segment client when no segment api key is provided, only to get more
421    /// testing coverage.
422    #[clap(long, env = "TEST_ONLY_DUMMY_SEGMENT_CLIENT")]
423    test_only_dummy_segment_client: bool,
424    /// An SDK key for LaunchDarkly.
425    ///
426    /// Setting this in combination with [`Self::config_sync_loop_interval`]
427    /// will enable synchronization of LaunchDarkly features with system
428    /// configuration parameters.
429    #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")]
430    launchdarkly_sdk_key: Option<String>,
431    /// A list of PARAM_NAME=KEY_NAME pairs from system parameter names to
432    /// LaunchDarkly feature keys.
433    ///
434    /// This is used (so far only for testing purposes) when propagating values
435    /// from the latter to the former. The identity map is assumed for absent
436    /// parameter names.
437    #[clap(
438        long,
439        env = "LAUNCHDARKLY_KEY_MAP",
440        action = ArgAction::Append,
441        value_delimiter = ';'
442    )]
443    launchdarkly_key_map: Vec<KeyValueArg<String, String>>,
444    /// The duration at which the system parameter synchronization times out during startup.
445    #[clap(
446        long,
447        env = "CONFIG_SYNC_TIMEOUT",
448        value_parser = humantime::parse_duration,
449        default_value = "30s"
450    )]
451    config_sync_timeout: Duration,
452    /// The interval in seconds at which to synchronize system parameter values.
453    ///
454    /// If this is not explicitly set, the loop that synchronizes LaunchDarkly
455    /// features with system configuration parameters will not run _even if
456    /// [`Self::launchdarkly_sdk_key`] is present_.
457    #[clap(
458        long,
459        env = "CONFIG_SYNC_LOOP_INTERVAL",
460        value_parser = humantime::parse_duration,
461    )]
462    config_sync_loop_interval: Option<Duration>,
463    /// Path to a JSON file containing system parameter values.
464    /// If specified, this file will be used instead of LaunchDarkly for configuration.
465    #[clap(long, env = "CONFIG_SYNC_FILE_PATH", value_name = "PATH")]
466    config_sync_file_path: Option<PathBuf>,
467
468    // === Bootstrap options. ===
469    #[clap(
470        long,
471        env = "ENVIRONMENT_ID",
472        value_name = "<CLOUD>-<REGION>-<ORG-ID>-<ORDINAL>"
473    )]
474    environment_id: EnvironmentId,
475    /// If set, a role with the provided name will be created with `CREATEDB`
476    /// and `CREATECLUSTER` attributes. It will also have `CREATE` privileges on
477    /// the `materialize` database, `materialize.public` schema, and
478    /// `quickstart` cluster.
479    ///
480    /// This option is meant for local development and testing to simplify the
481    /// initial process of granting attributes and privileges to some default
482    /// role.
483    #[clap(long, env = "BOOTSTRAP_ROLE")]
484    bootstrap_role: Option<String>,
485    /// The size of the default cluster replica if bootstrapping.
486    #[clap(
487        long,
488        env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
489        default_value = "scale=1,workers=1"
490    )]
491    bootstrap_default_cluster_replica_size: String,
492    /// The size of the builtin system cluster replicas if bootstrapping.
493    #[clap(
494        long,
495        env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
496        default_value = "scale=1,workers=1"
497    )]
498    bootstrap_builtin_system_cluster_replica_size: String,
499    /// The size of the builtin catalog server cluster replicas if bootstrapping.
500    #[clap(
501        long,
502        env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
503        default_value = "scale=1,workers=1"
504    )]
505    bootstrap_builtin_catalog_server_cluster_replica_size: String,
506    /// The size of the builtin probe cluster replicas if bootstrapping.
507    #[clap(
508        long,
509        env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
510        default_value = "scale=1,workers=1"
511    )]
512    bootstrap_builtin_probe_cluster_replica_size: String,
513    /// The size of the builtin support cluster replicas if bootstrapping.
514    #[clap(
515        long,
516        env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
517        default_value = "scale=1,workers=1"
518    )]
519    bootstrap_builtin_support_cluster_replica_size: String,
520    /// The size of the builtin analytics cluster replicas if bootstrapping.
521    #[clap(
522        long,
523        env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
524        default_value = "scale=1,workers=1"
525    )]
526    bootstrap_builtin_analytics_cluster_replica_size: String,
527    #[clap(
528        long,
529        env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR",
530        default_value = DEFAULT_REPLICATION_FACTOR.to_string(),
531        value_parser = clap::value_parser!(u32).range(0..=2)
532    )]
533    bootstrap_default_cluster_replication_factor: u32,
534    /// The replication factor of the builtin system cluster replicas if bootstrapping.
535    #[clap(
536        long,
537        env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR",
538        default_value = SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
539        value_parser = clap::value_parser!(u32).range(0..=2)
540    )]
541    bootstrap_builtin_system_cluster_replication_factor: u32,
542    /// The replication factor of the builtin catalog server cluster replicas if bootstrapping.
543    #[clap(
544        long,
545        env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICATION_FACTOR",
546        default_value = CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
547        value_parser = clap::value_parser!(u32).range(0..=2)
548    )]
549    bootstrap_builtin_catalog_server_cluster_replication_factor: u32,
550    /// The replication factor of the builtin probe cluster replicas if bootstrapping.
551    #[clap(
552        long,
553        env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR",
554        default_value = PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
555        value_parser = clap::value_parser!(u32).range(0..=2)
556    )]
557    bootstrap_builtin_probe_cluster_replication_factor: u32,
558    /// The replication factor of the builtin support cluster replicas if bootstrapping.
559    #[clap(
560        long,
561        env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICATION_FACTOR",
562        default_value = SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
563        value_parser = clap::value_parser!(u32).range(0..=2)
564    )]
565    bootstrap_builtin_support_cluster_replication_factor: u32,
566    /// The replication factor of the builtin analytics cluster replicas if bootstrapping.
567    #[clap(
568        long,
569        env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICATION_FACTOR",
570        default_value = ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
571        value_parser = clap::value_parser!(u32).range(0..=2)
572    )]
573    bootstrap_builtin_analytics_cluster_replication_factor: u32,
574    /// An list of NAME=VALUE pairs used to override static defaults
575    /// for system parameters.
576    #[clap(
577        long,
578        env = "SYSTEM_PARAMETER_DEFAULT",
579        action = ArgAction::Append,
580        value_delimiter = ';'
581    )]
582    system_parameter_default: Vec<KeyValueArg<String, String>>,
583    /// File containing a valid Materialize license key.
584    #[clap(long, env = "LICENSE_KEY")]
585    license_key: Option<String>,
586
587    // === AWS options. ===
588    /// The AWS account ID, which will be used to generate ARNs for
589    /// Materialize-controlled AWS resources.
590    #[clap(long, env = "AWS_ACCOUNT_ID")]
591    aws_account_id: Option<String>,
592    /// The ARN for a Materialize-controlled role to assume before assuming
593    /// a customer's requested role for an AWS connection.
594    #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
595    aws_connection_role_arn: Option<String>,
596    /// Prefix for an external ID to be supplied to all AWS AssumeRole operations.
597    ///
598    /// Details: <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>
599    #[clap(long, env = "AWS_EXTERNAL_ID_PREFIX", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
600    aws_external_id_prefix: Option<AwsExternalIdPrefix>,
601    /// The list of supported AWS PrivateLink availability zone ids.
602    /// Must be zone IDs, of format e.g. "use-az1".
603    #[clap(
604        long,
605        env = "AWS_PRIVATELINK_AVAILABILITY_ZONES",
606        action = ArgAction::Append,
607        use_value_delimiter = true
608    )]
609    aws_privatelink_availability_zones: Option<Vec<String>>,
610
611    // === Tracing options. ===
612    #[clap(flatten)]
613    tracing: TracingCliArgs,
614
615    // === Testing options. ===
616    /// Forces the migration of all builtin storage collections using the
617    /// specified migration mechanism (either "evolution" or "replacement").
618    ///
619    /// This argument is meant for testing only and as the names suggests
620    /// should not be set in production.
621    #[clap(long, value_enum, requires = "unsafe_mode")]
622    unsafe_force_builtin_schema_migration: Option<String>,
623}
624
625#[derive(ValueEnum, Debug, Clone)]
626enum OrchestratorKind {
627    Kubernetes,
628    Process,
629}
630
631// TODO [Alex Hunt] move this to a shared function that can be imported by the
632// region-controller.
633fn aws_secrets_controller_prefix(env_id: &EnvironmentId) -> String {
634    format!("/user-managed/{}/", env_id)
635}
636fn aws_secrets_controller_key_alias(env_id: &EnvironmentId) -> String {
637    // TODO [Alex Hunt] move this to a shared function that can be imported by the
638    // region-controller.
639    format!("alias/customer_key_{}", env_id)
640}
641
642pub fn main() {
643    let args = cli::parse_args(CliConfig {
644        env_prefix: Some("MZ_"),
645        enable_version_flag: true,
646    });
647    if let Err(err) = run(args) {
648        panic!("environmentd: fatal: {}", err.display_with_causes());
649    }
650}
651
652fn run(mut args: Args) -> Result<(), anyhow::Error> {
653    mz_ore::panic::install_enhanced_handler();
654    let envd_start = Instant::now();
655
656    // Configure signal handling as soon as possible. We want signals to be
657    // handled to our liking ASAP.
658    sys::enable_sigusr2_coverage_dump()?;
659    sys::enable_termination_signal_cleanup()?;
660
661    let license_key = if let Some(license_key_file) = args.license_key {
662        let license_key_text = std::fs::read_to_string(&license_key_file)
663            .context("failed to open license key file")?;
664        let license_key = mz_license_keys::validate(license_key_text.trim())
665            .context("failed to validate license key file")?;
666        if license_key.expired {
667            let message = format!(
668                "The license key provided at {license_key_file} is expired! Please contact Materialize for assistance."
669            );
670            match license_key.expiration_behavior {
671                ExpirationBehavior::Warn | ExpirationBehavior::DisableClusterCreation => {
672                    warn!("{message}");
673                }
674                ExpirationBehavior::Disable => bail!("{message}"),
675            }
676        }
677        license_key
678    } else if matches!(args.orchestrator, OrchestratorKind::Kubernetes) {
679        bail!("--license-key is required when running in Kubernetes");
680    } else {
681        // license key checks are optional for the emulator
682        ValidatedLicenseKey::disabled()
683    };
684
685    // Configure testing options.
686    let force_builtin_schema_migration = args
687        .unsafe_force_builtin_schema_migration
688        .inspect(|_mechanism| assert!(args.unsafe_mode));
689
690    // Start Tokio runtime.
691
692    let ncpus_useful = usize::max(1, cmp::min(num_cpus::get(), num_cpus::get_physical()));
693    let runtime = Arc::new(
694        tokio::runtime::Builder::new_multi_thread()
695            .worker_threads(ncpus_useful)
696            .thread_stack_size(3 * 1024 * 1024) // 3 MiB
697            // The default thread name exceeds the Linux limit on thread name
698            // length, so pick something shorter.
699            .thread_name_fn(|| {
700                static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
701                let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
702                format!("tokio:work-{}", id)
703            })
704            .enable_all()
705            .build()?,
706    );
707
708    // Configure tracing to log the service name when using the process
709    // orchestrator, which intermingles log output from multiple services. Other
710    // orchestrators separate log output from different services.
711    args.tracing.log_prefix = if matches!(args.orchestrator, OrchestratorKind::Process) {
712        Some("environmentd".to_string())
713    } else {
714        None
715    };
716
717    let metrics_registry = MetricsRegistry::new();
718    let tracing_handle = runtime.block_on(args.tracing.configure_tracing(
719        StaticTracingConfig {
720            service_name: "environmentd",
721            build_info: BUILD_INFO,
722        },
723        metrics_registry.clone(),
724    ))?;
725    register_runtime_metrics("main", runtime.metrics(), &metrics_registry);
726
727    let span = tracing::info_span!("environmentd::run").entered();
728
729    info!("startup: envd init: beginning");
730    info!("startup: envd init: preamble beginning");
731
732    let metrics = Metrics::register_into(&metrics_registry, BUILD_INFO);
733
734    runtime.block_on(mz_alloc::register_metrics_into(&metrics_registry));
735    runtime.block_on(mz_metrics::register_metrics_into(
736        &metrics_registry,
737        mz_dyncfgs::all_dyncfgs(),
738    ));
739
740    // Initialize fail crate for failpoint support
741    let _failpoint_scenario = FailScenario::setup();
742
743    // Configure connections.
744    let tls = args.tls.into_config()?;
745    let frontegg = FronteggAuthenticator::from_args(args.frontegg, &metrics_registry)?;
746    let listeners_config: ListenersConfig = {
747        let f = File::open(args.listeners_config_path)?;
748        serde_json::from_reader(f)?
749    };
750
751    for (_, listener) in &listeners_config.sql {
752        listener
753            .validate()
754            .map_err(|e| anyhow::anyhow!("invalid SQL listener: {}", e))?;
755    }
756
757    for (_, listener) in &listeners_config.http {
758        listener
759            .validate()
760            .map_err(|e| anyhow::anyhow!("invalid HTTP listener: {}", e))?;
761    }
762
763    // Configure CORS.
764    let allowed_origins = if !args.cors_allowed_origin.is_empty() {
765        args.cors_allowed_origin
766    } else {
767        let mut allowed_origins = Vec::with_capacity(listeners_config.http.len() * 6);
768        for (_, listener) in &listeners_config.http {
769            let port = listener.addr().port();
770            allowed_origins.extend([
771                HeaderValue::from_str(&format!("http://localhost:{}", port)).unwrap(),
772                HeaderValue::from_str(&format!("http://127.0.0.1:{}", port)).unwrap(),
773                HeaderValue::from_str(&format!("http://[::1]:{}", port)).unwrap(),
774                HeaderValue::from_str(&format!("https://localhost:{}", port)).unwrap(),
775                HeaderValue::from_str(&format!("https://127.0.0.1:{}", port)).unwrap(),
776                HeaderValue::from_str(&format!("https://[::1]:{}", port)).unwrap(),
777            ])
778        }
779        allowed_origins
780    };
781    let cors_allowed_origin = mz_http_util::build_cors_allowed_origin(&allowed_origins);
782
783    // Configure controller.
784    let entered = info_span!("environmentd::configure_controller").entered();
785    let (orchestrator, secrets_controller, cloud_resource_controller): (
786        Arc<dyn Orchestrator>,
787        Arc<dyn SecretsController>,
788        Option<Arc<dyn CloudResourceController>>,
789    ) = match args.orchestrator {
790        OrchestratorKind::Kubernetes => {
791            if args.orchestrator_process_scratch_directory.is_some() {
792                bail!(
793                    "--orchestrator-process-scratch-directory is \
794                      not currently usable with the kubernetes orchestrator"
795                );
796            }
797
798            let orchestrator = Arc::new(
799                runtime
800                    .block_on(KubernetesOrchestrator::new(KubernetesOrchestratorConfig {
801                        context: args.orchestrator_kubernetes_context.clone(),
802                        scheduler_name: args.orchestrator_kubernetes_scheduler_name,
803                        service_annotations: args
804                            .orchestrator_kubernetes_service_annotation
805                            .into_iter()
806                            .map(|l| (l.key, l.value))
807                            .collect(),
808                        service_labels: args
809                            .orchestrator_kubernetes_service_label
810                            .into_iter()
811                            .map(|l| (l.key, l.value))
812                            .collect(),
813                        service_node_selector: args
814                            .orchestrator_kubernetes_service_node_selector
815                            .into_iter()
816                            .map(|l| (l.key, l.value))
817                            .collect(),
818                        service_affinity: args.orchestrator_kubernetes_service_affinity,
819                        service_tolerations: args.orchestrator_kubernetes_service_tolerations,
820                        service_account: args.orchestrator_kubernetes_service_account,
821                        image_pull_policy: args.orchestrator_kubernetes_image_pull_policy,
822                        aws_external_id_prefix: args.aws_external_id_prefix.clone(),
823                        coverage: args.orchestrator_kubernetes_coverage,
824                        ephemeral_volume_storage_class: args
825                            .orchestrator_kubernetes_ephemeral_volume_class
826                            .clone(),
827                        service_fs_group: args.orchestrator_kubernetes_service_fs_group.clone(),
828                        name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
829                        collect_pod_metrics: !args
830                            .orchestrator_kubernetes_disable_pod_metrics_collection,
831                        enable_prometheus_scrape_annotations: args
832                            .orchestrator_kubernetes_enable_prometheus_scrape_annotations,
833                    }))
834                    .context("creating kubernetes orchestrator")?,
835            );
836            let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
837                SecretsControllerKind::Kubernetes => {
838                    let sc = Arc::clone(&orchestrator);
839                    let sc: Arc<dyn SecretsController> = sc;
840                    sc
841                }
842                SecretsControllerKind::AwsSecretsManager => {
843                    Arc::new(
844                        runtime.block_on(AwsSecretsController::new(
845                            // TODO [Alex Hunt] move this to a shared function that can be imported by the
846                            // region-controller.
847                            &aws_secrets_controller_prefix(&args.environment_id),
848                            &aws_secrets_controller_key_alias(&args.environment_id),
849                            args.aws_secrets_controller_tags
850                                .into_iter()
851                                .map(|tag| (tag.key, tag.value))
852                                .collect(),
853                        )),
854                    )
855                }
856                SecretsControllerKind::LocalFile => bail!(
857                    "SecretsControllerKind::LocalFile is not compatible with Orchestrator::Kubernetes."
858                ),
859            };
860            let cloud_resource_controller = Arc::clone(&orchestrator);
861            (
862                orchestrator,
863                secrets_controller,
864                Some(cloud_resource_controller),
865            )
866        }
867        OrchestratorKind::Process => {
868            if args
869                .orchestrator_kubernetes_ephemeral_volume_class
870                .is_some()
871            {
872                bail!(
873                    "--orchestrator-kubernetes-ephemeral-volume-class is \
874                      not usable with the process orchestrator"
875                );
876            }
877            let orchestrator = Arc::new(
878                runtime
879                    .block_on(ProcessOrchestrator::new(ProcessOrchestratorConfig {
880                        // Look for binaries in the same directory as the
881                        // running binary. When running via `cargo run`, this
882                        // means that debug binaries look for other debug
883                        // binaries and release binaries look for other release
884                        // binaries.
885                        image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
886                        suppress_output: false,
887                        environment_id: args.environment_id.to_string(),
888                        secrets_dir: args
889                            .orchestrator_process_secrets_directory
890                            .clone()
891                            .expect("clap enforced"),
892                        command_wrapper: args
893                            .orchestrator_process_wrapper
894                            .map_or(Ok(vec![]), |s| shell_words::split(&s))?,
895                        propagate_crashes: args.orchestrator_process_propagate_crashes,
896                        tcp_proxy: args.orchestrator_process_tcp_proxy_listen_addr.map(
897                            |listen_addr| ProcessOrchestratorTcpProxyConfig {
898                                listen_addr,
899                                prometheus_service_discovery_dir: args
900                                    .orchestrator_process_prometheus_service_discovery_directory,
901                            },
902                        ),
903                        scratch_directory: args
904                            .orchestrator_process_scratch_directory
905                            .expect("process orchestrator requires scratch directory"),
906                    }))
907                    .context("creating process orchestrator")?,
908            );
909            let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
910                SecretsControllerKind::Kubernetes => bail!(
911                    "SecretsControllerKind::Kubernetes is not compatible with Orchestrator::Process."
912                ),
913                SecretsControllerKind::AwsSecretsManager => Arc::new(
914                    runtime.block_on(AwsSecretsController::new(
915                        &aws_secrets_controller_prefix(&args.environment_id),
916                        &aws_secrets_controller_key_alias(&args.environment_id),
917                        args.aws_secrets_controller_tags
918                            .into_iter()
919                            .map(|tag| (tag.key, tag.value))
920                            .collect(),
921                    )),
922                ),
923                SecretsControllerKind::LocalFile => {
924                    let sc = Arc::clone(&orchestrator);
925                    let sc: Arc<dyn SecretsController> = sc;
926                    sc
927                }
928            };
929            (orchestrator, secrets_controller, None)
930        }
931    };
932    drop(entered);
933    let cloud_resource_reader = cloud_resource_controller.as_ref().map(|c| c.reader());
934    let secrets_reader = secrets_controller.reader();
935    let now = SYSTEM_TIME.clone();
936
937    let mut persist_config =
938        PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
939    // Start with compaction disabled, later enable it if we're not in read-only mode.
940    persist_config.disable_compaction();
941
942    let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
943    let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
944
945    match args.persist_isolated_runtime_threads {
946        // Use the default.
947        None | Some(0) => (),
948        Some(x @ ..=-1) => {
949            let threads = num_cpus::get().saturating_add_signed(x).max(1);
950            persist_config.isolated_runtime_worker_threads = threads;
951        }
952        Some(x @ 1..) => {
953            let threads = usize::try_from(x).expect("pattern matched a positive value");
954            persist_config.isolated_runtime_worker_threads = threads;
955        }
956    };
957
958    let _server = runtime.spawn_named(
959        || "persist::rpc::server",
960        async move {
961            info!(
962                "listening for Persist PubSub connections on {}",
963                args.internal_persist_pubsub_listen_addr
964            );
965            // Intentionally do not bubble up errors here, we don't want to take
966            // down environmentd if there are any issues with the pubsub server.
967            let res = persist_pubsub_server
968                .serve(args.internal_persist_pubsub_listen_addr)
969                .await;
970            error!("Persist Pubsub server exited {:?}", res);
971        }
972        .instrument(tracing::info_span!("persist::rpc::server")),
973    );
974
975    let persist_clients = {
976        // PersistClientCache may spawn tasks, so run within a tokio runtime context
977        let _tokio_guard = runtime.enter();
978        PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
979            let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
980                cfg,
981                persist_pubsub_client.sender,
982                metrics,
983            ));
984            PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
985        })
986    };
987
988    let consensus_uri = args.persist_consensus_url.unwrap_or_else(|| {
989        args.metadata_backend_url
990            .as_ref()
991            .map(|metadata_backend_url| {
992                SensitiveUrl(
993                    Url::parse_with_params(
994                        metadata_backend_url.0.as_ref(),
995                        &[("options", "--search_path=consensus")],
996                    )
997                    .unwrap(),
998                )
999            })
1000            .expect("either --persist-consensus-url or --metadata-backend-url must be provided")
1001    });
1002    let timestamp_oracle_url = args.timestamp_oracle_url.or_else(|| {
1003        args.metadata_backend_url
1004            .as_ref()
1005            .map(|metadata_backend_url| {
1006                SensitiveUrl(
1007                    Url::parse_with_params(
1008                        metadata_backend_url.0.as_ref(),
1009                        &[("options", "--search_path=tsoracle")],
1010                    )
1011                    .unwrap(),
1012                )
1013            })
1014    });
1015
1016    let persist_clients = Arc::new(persist_clients);
1017    let connection_context = ConnectionContext::from_cli_args(
1018        args.environment_id.to_string(),
1019        &args.tracing.startup_log_filter,
1020        args.aws_external_id_prefix,
1021        args.aws_connection_role_arn,
1022        secrets_reader,
1023        cloud_resource_reader,
1024    );
1025    let orchestrator = Arc::new(TracingOrchestrator::new(orchestrator, args.tracing.clone()));
1026    let controller = ControllerConfig {
1027        build_info: &BUILD_INFO,
1028        orchestrator,
1029        persist_location: PersistLocation {
1030            blob_uri: args.persist_blob_url,
1031            consensus_uri,
1032        },
1033        persist_clients: Arc::clone(&persist_clients),
1034        clusterd_image: args.clusterd_image.expect("clap enforced"),
1035        init_container_image: args.orchestrator_kubernetes_init_container_image,
1036        deploy_generation: args.deploy_generation,
1037        now: SYSTEM_TIME.clone(),
1038        metrics_registry: metrics_registry.clone(),
1039        persist_pubsub_url: args.persist_pubsub_url,
1040        connection_context,
1041        // When serialized to args in the controller, only the relevant flags will be passed
1042        // through, so we just set all of them
1043        secrets_args: SecretsReaderCliArgs {
1044            secrets_reader: args.secrets_controller,
1045            secrets_reader_local_file_dir: args.orchestrator_process_secrets_directory,
1046            secrets_reader_kubernetes_context: Some(args.orchestrator_kubernetes_context),
1047            secrets_reader_aws_prefix: Some(aws_secrets_controller_prefix(&args.environment_id)),
1048            secrets_reader_name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
1049        },
1050    };
1051
1052    let cluster_replica_sizes = ClusterReplicaSizeMap::parse_from_str(
1053        &args.cluster_replica_sizes,
1054        !license_key.allow_credit_consumption_override,
1055    )
1056    .context("parsing replica size map")?;
1057
1058    emit_boot_diagnostics!(&BUILD_INFO);
1059    sys::adjust_rlimits();
1060
1061    info!(
1062        "startup: envd init: preamble complete in {:?}",
1063        envd_start.elapsed()
1064    );
1065
1066    let serve_start = Instant::now();
1067    info!("startup: envd init: serving beginning");
1068    let server = runtime.block_on(async {
1069        let listeners = Listeners::bind(listeners_config).await?;
1070        let catalog_config = CatalogConfig {
1071            persist_clients,
1072            metrics: Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry)),
1073        };
1074        let server = listeners
1075            .serve(crate::Config {
1076                // Special modes.
1077                unsafe_mode: args.unsafe_mode,
1078                all_features: args.all_features,
1079                // Connection options.
1080                tls,
1081                tls_reload_certs: mz_server_core::default_cert_reload_ticker(),
1082                external_login_password_mz_system: args.external_login_password_mz_system,
1083                frontegg,
1084                cors_allowed_origin,
1085                egress_addresses: args.announce_egress_address,
1086                http_host_name: args.http_host_name,
1087                internal_console_redirect_url: args.internal_console_redirect_url,
1088                // Controller options.
1089                controller,
1090                secrets_controller,
1091                cloud_resource_controller,
1092                // Storage options.
1093                storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
1094                storage_usage_retention_period: args.storage_usage_retention_period,
1095                // Adapter options.
1096                catalog_config,
1097                availability_zones: args.availability_zone,
1098                cluster_replica_sizes,
1099                timestamp_oracle_url,
1100                segment_api_key: args.segment_api_key,
1101                segment_client_side: args.segment_client_side,
1102                test_only_dummy_segment_client: args.test_only_dummy_segment_client,
1103                launchdarkly_sdk_key: args.launchdarkly_sdk_key,
1104                launchdarkly_key_map: args
1105                    .launchdarkly_key_map
1106                    .into_iter()
1107                    .map(|kv| (kv.key, kv.value))
1108                    .collect(),
1109                config_sync_timeout: args.config_sync_timeout,
1110                config_sync_loop_interval: args.config_sync_loop_interval,
1111                config_sync_file_path: args.config_sync_file_path,
1112
1113                // Bootstrap options.
1114                environment_id: args.environment_id,
1115                bootstrap_role: args.bootstrap_role,
1116                bootstrap_default_cluster_replica_size: args.bootstrap_default_cluster_replica_size,
1117                bootstrap_default_cluster_replication_factor: args
1118                    .bootstrap_default_cluster_replication_factor,
1119                bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1120                    size: args.bootstrap_builtin_system_cluster_replica_size,
1121                    replication_factor: args.bootstrap_builtin_system_cluster_replication_factor,
1122                },
1123                bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1124                    size: args.bootstrap_builtin_catalog_server_cluster_replica_size,
1125                    replication_factor: args
1126                        .bootstrap_builtin_catalog_server_cluster_replication_factor,
1127                },
1128                bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1129                    size: args.bootstrap_builtin_probe_cluster_replica_size,
1130                    replication_factor: args.bootstrap_builtin_probe_cluster_replication_factor,
1131                },
1132                bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1133                    size: args.bootstrap_builtin_support_cluster_replica_size,
1134                    replication_factor: args.bootstrap_builtin_support_cluster_replication_factor,
1135                },
1136                bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1137                    size: args.bootstrap_builtin_analytics_cluster_replica_size,
1138                    replication_factor: args.bootstrap_builtin_analytics_cluster_replication_factor,
1139                },
1140                system_parameter_defaults: args
1141                    .system_parameter_default
1142                    .into_iter()
1143                    .map(|kv| (kv.key, kv.value))
1144                    .collect(),
1145                helm_chart_version: args.helm_chart_version.clone(),
1146                license_key,
1147                // AWS options.
1148                aws_account_id: args.aws_account_id,
1149                aws_privatelink_availability_zones: args.aws_privatelink_availability_zones,
1150                // Observability options.
1151                metrics_registry,
1152                tracing_handle,
1153                // Testing options.
1154                now,
1155                force_builtin_schema_migration,
1156            })
1157            .await
1158            .maybe_terminate("booting server")?;
1159        Ok::<_, anyhow::Error>(server)
1160    })?;
1161    info!(
1162        "startup: envd init: serving complete in {:?}",
1163        serve_start.elapsed()
1164    );
1165
1166    let start_duration = envd_start.elapsed();
1167    metrics
1168        .start_time_environmentd
1169        .set(start_duration.as_millis().try_into().expect("must fit"));
1170    let span = span.exit();
1171    let id = span.context().span().span_context().trace_id();
1172    drop(span);
1173
1174    info!("startup: envd init: complete in {start_duration:?}");
1175
1176    println!(
1177        "environmentd {} listening...",
1178        BUILD_INFO.human_version(args.helm_chart_version)
1179    );
1180    for (name, handle) in &server.sql_listener_handles {
1181        println!("{} SQL address: {}", name, handle.local_addr);
1182    }
1183    for (name, handle) in &server.http_listener_handles {
1184        println!("{} HTTP address: {}", name, handle.local_addr);
1185    }
1186    // TODO move persist pubsub address like metrics address?
1187    println!(
1188        " Internal Persist PubSub address: {}",
1189        args.internal_persist_pubsub_listen_addr
1190    );
1191
1192    println!(" Root trace ID: {id}");
1193
1194    // Block forever.
1195    loop {
1196        thread::park();
1197    }
1198}
1199
1200fn build_info() -> Vec<String> {
1201    let openssl_version =
1202        unsafe { CStr::from_ptr(openssl_sys::OpenSSL_version(openssl_sys::OPENSSL_VERSION)) };
1203    let rdkafka_version = unsafe { CStr::from_ptr(rdkafka_sys::bindings::rd_kafka_version_str()) };
1204    vec![
1205        openssl_version.to_string_lossy().into_owned(),
1206        format!("librdkafka v{}", rdkafka_version.to_string_lossy()),
1207    ]
1208}
1209
1210#[derive(Debug, Clone)]
1211struct Metrics {
1212    pub start_time_environmentd: IntGauge,
1213}
1214
1215impl Metrics {
1216    pub fn register_into(registry: &MetricsRegistry, build_info: BuildInfo) -> Metrics {
1217        Metrics {
1218            start_time_environmentd: registry.register(metric!(
1219                name: "mz_start_time_environmentd",
1220                help: "Time in milliseconds from environmentd start until the adapter is ready.",
1221                const_labels: {
1222                    "version" => build_info.version,
1223                    "build_type" => if cfg!(release) { "release" } else { "debug" }
1224                },
1225            )),
1226        }
1227    }
1228}