Skip to main content

mz_environmentd/environmentd/
main.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Manages a single Materialize environment.
11//!
12//! It listens for SQL connections on port 6875 (MTRL) and for HTTP connections
13//! on port 6876.
14
15use std::ffi::CStr;
16use std::fs::File;
17use std::net::{IpAddr, SocketAddr};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::LazyLock;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::time::{Duration, Instant};
23use std::{cmp, env, iter, thread};
24
25use anyhow::{Context, bail};
26use clap::{ArgAction, Parser, ValueEnum};
27use fail::FailScenario;
28use http::header::HeaderValue;
29use ipnet::IpNet;
30use itertools::Itertools;
31use mz_adapter::ResultExt;
32use mz_adapter_types::bootstrap_builtin_cluster_config::{
33    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
34    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, DEFAULT_REPLICATION_FACTOR,
35    PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR, SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
36    SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
37};
38use mz_auth::password::Password;
39use mz_aws_secrets_controller::AwsSecretsController;
40use mz_build_info::BuildInfo;
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceController};
43use mz_controller::{ControllerConfig, ReplicaHttpLocator};
44use mz_frontegg_auth::{Authenticator as FronteggAuthenticator, FronteggCliArgs};
45use mz_license_keys::{ExpirationBehavior, ValidatedLicenseKey};
46use mz_orchestrator::Orchestrator;
47use mz_orchestrator_kubernetes::{
48    KubernetesImagePullPolicy, KubernetesOrchestrator, KubernetesOrchestratorConfig,
49};
50use mz_orchestrator_process::{
51    ProcessOrchestrator, ProcessOrchestratorConfig, ProcessOrchestratorTcpProxyConfig,
52};
53use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs, TracingOrchestrator};
54use mz_ore::cli::{self, CliConfig, KeyValueArg};
55use mz_ore::error::ErrorExt;
56use mz_ore::metric;
57use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
58use mz_ore::now::SYSTEM_TIME;
59use mz_ore::task::RuntimeExt;
60use mz_ore::url::SensitiveUrl;
61use mz_persist_client::PersistLocation;
62use mz_persist_client::cache::PersistClientCache;
63use mz_persist_client::cfg::PersistConfig;
64use mz_persist_client::rpc::{
65    MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
66};
67use mz_secrets::SecretsController;
68use mz_server_core::TlsCliArgs;
69use mz_service::emit_boot_diagnostics;
70use mz_service::secrets::{SecretsControllerKind, SecretsReaderCliArgs};
71use mz_sql::catalog::EnvironmentId;
72use mz_storage_types::connections::ConnectionContext;
73use opentelemetry::trace::TraceContextExt;
74use prometheus::IntGauge;
75use tracing::{Instrument, error, info, info_span, warn};
76use tracing_opentelemetry::OpenTelemetrySpanExt;
77use url::Url;
78
79use crate::environmentd::sys;
80use crate::{BUILD_INFO, CatalogConfig, ListenerConfig, Listeners, ListenersConfig};
81
82static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
83static LONG_VERSION: LazyLock<String> = LazyLock::new(|| {
84    iter::once(BUILD_INFO.human_version(None))
85        .chain(build_info())
86        .join("\n")
87});
88
89/// Manages a single Materialize environment.
90#[derive(Parser, Debug)]
91#[clap(
92    name = "environmentd",
93    next_line_help = true,
94    version = VERSION.as_str(),
95    long_version = LONG_VERSION.as_str(),
96)]
97pub struct Args {
98    // === Special modes. ===
99    /// Enable unsafe features. Unsafe features are those that should never run
100    /// in production but are appropriate for testing/local development.
101    #[clap(long, env = "UNSAFE_MODE")]
102    unsafe_mode: bool,
103    /// Enables all feature flags, meant only as a tool for local development;
104    /// this should never be enabled in CI.
105    #[clap(long, env = "ALL_FEATURES")]
106    all_features: bool,
107
108    // === Connection options. ===
109    /// Path to a file containing the json-formatted configuration of our
110    /// metrics, HTTP, and sql listeners.
111    #[clap(
112        long,
113        env = "LISTENERS_CONFIG_PATH",
114        value_name = "PATH",
115        action = ArgAction::Set,
116    )]
117    listeners_config_path: PathBuf,
118    /// Password for the mz_system user.
119    #[clap(
120        long,
121        env = "EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM",
122        action = ArgAction::Set,
123    )]
124    external_login_password_mz_system: Option<Password>,
125    /// The address on which to listen for Persist PubSub connections.
126    ///
127    /// Connections to this address are not subject to encryption, authentication,
128    /// or access control. Care should be taken to not expose the listen address
129    /// to the public internet or other unauthorized parties.
130    #[clap(
131        long,
132        value_name = "HOST:PORT",
133        env = "INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR",
134        default_value = "127.0.0.1:6879",
135        action = ArgAction::Set,
136    )]
137    internal_persist_pubsub_listen_addr: SocketAddr,
138    /// Enable cross-origin resource sharing (CORS) for HTTP requests from the
139    /// specified origin.
140    ///
141    /// The default allows all local connections.
142    /// "*" allows all.
143    /// "*.domain.com" allows connections from any matching subdomain.
144    ///
145    /// Wildcards in other positions (e.g., "https://*.foo.com" or "https://foo.*.com") have no effect.
146    #[structopt(long, env = "CORS_ALLOWED_ORIGIN")]
147    cors_allowed_origin: Vec<HeaderValue>,
148    /// Public CIDR which the cloud environment has configured for
149    /// egress.
150    #[clap(
151        long,
152        env = "ANNOUNCE_EGRESS_ADDRESS",
153        action = ArgAction::Append,
154        use_value_delimiter = true
155    )]
156    announce_egress_address: Vec<IpNet>,
157    /// The external host name to connect to the HTTP server of this
158    /// environment.
159    ///
160    /// Presently used to render webhook URLs for end users in notices and the
161    /// system catalog. Not used to establish connections directly.
162    #[clap(long, env = "HTTP_HOST_NAME")]
163    http_host_name: Option<String>,
164    /// The URL of the Materialize console to proxy from the /internal-console
165    /// endpoint on the internal HTTP server.
166    #[clap(long, env = "INTERNAL_CONSOLE_REDIRECT_URL")]
167    internal_console_redirect_url: Option<String>,
168    /// TLS arguments.
169    #[clap(flatten)]
170    tls: TlsCliArgs,
171    /// Frontegg arguments.
172    #[clap(flatten)]
173    frontegg: FronteggCliArgs,
174    // === Orchestrator options. ===
175    /// The service orchestrator implementation to use.
176    #[structopt(long, value_enum, env = "ORCHESTRATOR")]
177    orchestrator: OrchestratorKind,
178    /// Name of a non-default Kubernetes scheduler, if any.
179    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SCHEDULER_NAME")]
180    orchestrator_kubernetes_scheduler_name: Option<String>,
181    /// Annotations to apply to all services created by the Kubernetes orchestrator
182    /// in the form `KEY=VALUE`.
183    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ANNOTATION")]
184    orchestrator_kubernetes_service_annotation: Vec<KeyValueArg<String, String>>,
185    /// Labels to apply to all services created by the Kubernetes orchestrator
186    /// in the form `KEY=VALUE`.
187    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_LABEL")]
188    orchestrator_kubernetes_service_label: Vec<KeyValueArg<String, String>>,
189    /// Node selector to apply to all services created by the Kubernetes
190    /// orchestrator in the form `KEY=VALUE`.
191    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_NODE_SELECTOR")]
192    orchestrator_kubernetes_service_node_selector: Vec<KeyValueArg<String, String>>,
193    /// Affinity to apply to all services created by the Kubernetes
194    /// orchestrator as a JSON string.
195    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_AFFINITY")]
196    orchestrator_kubernetes_service_affinity: Option<String>,
197    /// Tolerations to apply to all services created by the Kubernetes
198    /// orchestrator as a JSON string.
199    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_TOLERATIONS")]
200    orchestrator_kubernetes_service_tolerations: Option<String>,
201    /// The name of a service account to apply to all services created by the
202    /// Kubernetes orchestrator.
203    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_ACCOUNT")]
204    orchestrator_kubernetes_service_account: Option<String>,
205    /// The Kubernetes context to use with the Kubernetes orchestrator.
206    ///
207    /// This defaults to `minikube` to prevent disaster (e.g., connecting to a
208    /// production cluster that happens to be the active Kubernetes context.)
209    #[structopt(
210        long,
211        env = "ORCHESTRATOR_KUBERNETES_CONTEXT",
212        default_value = "minikube"
213    )]
214    orchestrator_kubernetes_context: String,
215    /// The image pull policy to use for services created by the Kubernetes
216    /// orchestrator.
217    #[structopt(
218        long,
219        env = "ORCHESTRATOR_KUBERNETES_IMAGE_PULL_POLICY",
220        default_value = "always",
221        value_enum
222    )]
223    orchestrator_kubernetes_image_pull_policy: KubernetesImagePullPolicy,
224    /// The init container for services created by the Kubernetes orchestrator.
225    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_INIT_CONTAINER_IMAGE")]
226    orchestrator_kubernetes_init_container_image: Option<String>,
227    /// The Kubernetes StorageClass to use for the ephemeral volume attached to
228    /// services that request disk.
229    ///
230    /// If unspecified, the Kubernetes orchestrator will refuse to create
231    /// services that request disk.
232    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_EPHEMERAL_VOLUME_CLASS")]
233    orchestrator_kubernetes_ephemeral_volume_class: Option<String>,
234    /// The optional fs group for service's pods' `securityContext`.
235    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_SERVICE_FS_GROUP")]
236    orchestrator_kubernetes_service_fs_group: Option<i64>,
237    /// The prefix to prepend to all kubernetes object names.
238    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_NAME_PREFIX")]
239    orchestrator_kubernetes_name_prefix: Option<String>,
240    /// Whether to enable pod metrics collection.
241    ///
242    /// Required for resource usage graphs in the console.
243    /// Requires metrics-server to be installed.
244    #[clap(long, env = "ORCHESTRATOR_KUBERNETES_DISABLE_POD_METRICS_COLLECTION")]
245    orchestrator_kubernetes_disable_pod_metrics_collection: bool,
246    /// Whether to annotate pods for prometheus service discovery.
247    #[clap(
248        long,
249        env = "ORCHESTRATOR_KUBERNETES_ENABLE_PROMETHEUS_SCRAPE_ANNOTATIONS"
250    )]
251    orchestrator_kubernetes_enable_prometheus_scrape_annotations: bool,
252    #[clap(long, env = "ORCHESTRATOR_PROCESS_WRAPPER")]
253    orchestrator_process_wrapper: Option<String>,
254    /// Where the process orchestrator should store secrets.
255    #[clap(
256        long,
257        env = "ORCHESTRATOR_PROCESS_SECRETS_DIRECTORY",
258        value_name = "PATH",
259        required_if_eq("orchestrator", "process")
260    )]
261    orchestrator_process_secrets_directory: Option<PathBuf>,
262    /// Whether the process orchestrator should handle crashes in child
263    /// processes by crashing the parent process.
264    #[clap(long, env = "ORCHESTRATOR_PROCESS_PROPAGATE_CRASHES")]
265    orchestrator_process_propagate_crashes: bool,
266    /// An IP address on which the process orchestrator should bind TCP proxies
267    /// for Unix domain sockets.
268    ///
269    /// When specified, for each named port of each created service, the process
270    /// orchestrator will bind a TCP listener to the specified address that
271    /// proxies incoming connections to the underlying Unix domain socket. The
272    /// allocated TCP port will be emitted as a tracing event.
273    ///
274    /// The primary use is live debugging the running child services via tools
275    /// that do not support Unix domain sockets (e.g., Prometheus, web
276    /// browsers).
277    #[clap(long, env = "ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR")]
278    orchestrator_process_tcp_proxy_listen_addr: Option<IpAddr>,
279    /// A directory in which the process orchestrator should write Prometheus
280    /// scrape targets, for use with Prometheus's file-based service discovery.
281    ///
282    /// Each namespaced orchestrator will maintain a single JSON file into the
283    /// directory named `NAMESPACE.json` containing the scrape targets for all
284    /// extant services. The scrape targets will use the TCP proxy address, as
285    /// Prometheus does not support scraping over Unix domain sockets.
286    ///
287    /// This option is ignored unless
288    /// `--orchestrator-process-tcp-proxy-listen-addr` is set.
289    ///
290    /// See also: <https://prometheus.io/docs/guides/file-sd/>
291    #[clap(
292        long,
293        env = "ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY"
294    )]
295    orchestrator_process_prometheus_service_discovery_directory: Option<PathBuf>,
296    /// A scratch directory that orchestrated processes can use for ephemeral storage.
297    #[clap(
298        long,
299        env = "ORCHESTRATOR_PROCESS_SCRATCH_DIRECTORY",
300        value_name = "PATH"
301    )]
302    orchestrator_process_scratch_directory: Option<PathBuf>,
303    /// Whether to use coverage build and collect coverage information. Not to be used for
304    /// production, only testing.
305    #[structopt(long, env = "ORCHESTRATOR_KUBERNETES_COVERAGE")]
306    orchestrator_kubernetes_coverage: bool,
307    /// The secrets controller implementation to use.
308    #[structopt(
309        long,
310        value_enum,
311        env = "SECRETS_CONTROLLER",
312        default_value_ifs([
313            ("orchestrator", "kubernetes", Some("kubernetes")),
314            ("orchestrator", "process", Some("local-file"))
315        ]),
316        default_value("kubernetes"), // This shouldn't be possible, but it makes clap happy.
317    )]
318    secrets_controller: SecretsControllerKind,
319    /// The list of tags to be set on AWS Secrets Manager secrets created by the
320    /// AWS secrets controller.
321    #[clap(
322        long,
323        env = "AWS_SECRETS_CONTROLLER_TAGS",
324        action = ArgAction::Append,
325        value_delimiter = ';',
326        required_if_eq("secrets_controller", "aws-secrets-manager")
327    )]
328    aws_secrets_controller_tags: Vec<KeyValueArg<String, String>>,
329    /// The clusterd image reference to use.
330    #[structopt(
331        long,
332        env = "CLUSTERD_IMAGE",
333        required_if_eq("orchestrator", "kubernetes"),
334        default_value_if("orchestrator", "process", Some("clusterd"))
335    )]
336    clusterd_image: Option<String>,
337    /// A number representing the environment's generation.
338    ///
339    /// This is incremented to request that the new process perform a graceful
340    /// transition of power from the prior generation.
341    #[clap(long, env = "DEPLOY_GENERATION", default_value = "0")]
342    deploy_generation: u64,
343
344    /// Can be provided in place of both persist_consensus_url and
345    /// timestamp_oracle_url in order to point both at the same backend
346    #[clap(
347        long,
348        env = "METADATA_BACKEND_URL",
349        conflicts_with_all = &[
350            "persist_consensus_url",
351            "timestamp_oracle_url",
352        ],
353    )]
354    metadata_backend_url: Option<SensitiveUrl>,
355
356    /// Helm chart version for self-hosted Materialize. This version is supposed to correspond to
357    /// the Materialize (core) version. This version is displayed in addition in `SELECT
358    /// mz_version()` if set and if it differs from the Materialize (core) version (which it should
359    /// not!).
360    #[clap(long, env = "HELM_CHART_VERSION")]
361    helm_chart_version: Option<String>,
362
363    // === Storage options. ===
364    /// Where the persist library should store its blob data.
365    #[clap(long, env = "PERSIST_BLOB_URL")]
366    persist_blob_url: SensitiveUrl,
367    /// Where the persist library should perform consensus.
368    #[clap(long, env = "PERSIST_CONSENSUS_URL")]
369    persist_consensus_url: Option<SensitiveUrl>,
370    /// The Persist PubSub URL.
371    ///
372    /// This URL is passed to `clusterd` for discovery of the Persist PubSub service.
373    #[clap(
374        long,
375        env = "PERSIST_PUBSUB_URL",
376        default_value = "http://localhost:6879"
377    )]
378    persist_pubsub_url: String,
379    /// The number of worker threads created for the IsolatedRuntime used for
380    /// storage related tasks. A negative value will subtract from the number
381    /// of threads returned by [`num_cpus::get`].
382    #[clap(long, env = "PERSIST_ISOLATED_RUNTIME_THREADS")]
383    persist_isolated_runtime_threads: Option<isize>,
384    /// The interval in seconds at which to collect storage usage information.
385    #[clap(
386        long,
387        env = "STORAGE_USAGE_COLLECTION_INTERVAL",
388        value_parser = humantime::parse_duration,
389        default_value = "3600s"
390    )]
391    storage_usage_collection_interval_sec: Duration,
392    /// The period for which to retain usage records. Note that the retention
393    /// period is only evaluated at server start time, so rebooting the server
394    /// is required to discard old records.
395    #[clap(long, env = "STORAGE_USAGE_RETENTION_PERIOD", value_parser = humantime::parse_duration)]
396    storage_usage_retention_period: Option<Duration>,
397
398    // === Adapter options. ===
399    /// The PostgreSQL URL for the Postgres-backed timestamp oracle.
400    #[clap(long, env = "TIMESTAMP_ORACLE_URL", value_name = "POSTGRES_URL")]
401    timestamp_oracle_url: Option<SensitiveUrl>,
402    /// Availability zones in which storage and compute resources may be
403    /// deployed.
404    #[clap(long, env = "AVAILABILITY_ZONE", use_value_delimiter = true)]
405    availability_zone: Vec<String>,
406    /// A map from size name to resource allocations for cluster replicas.
407    #[clap(
408        long,
409        env = "CLUSTER_REPLICA_SIZES",
410        requires = "bootstrap_default_cluster_replica_size"
411    )]
412    cluster_replica_sizes: String,
413    /// An API key for Segment. Enables export of audit events to Segment.
414    #[clap(long, env = "SEGMENT_API_KEY")]
415    segment_api_key: Option<String>,
416    /// Whether the Segment client is being used on the client side
417    /// (rather than the server side).
418    ///
419    /// Enabling this causes the Segment server to record the IP address from
420    /// which the event was sent.
421    #[clap(long, env = "SEGMENT_CLIENT_SIDE")]
422    segment_client_side: bool,
423    /// Only create a dummy segment client when no segment api key is provided, only to get more
424    /// testing coverage.
425    #[clap(long, env = "TEST_ONLY_DUMMY_SEGMENT_CLIENT")]
426    test_only_dummy_segment_client: bool,
427    /// An SDK key for LaunchDarkly.
428    ///
429    /// Setting this in combination with [`Self::config_sync_loop_interval`]
430    /// will enable synchronization of LaunchDarkly features with system
431    /// configuration parameters.
432    #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")]
433    launchdarkly_sdk_key: Option<String>,
434    /// A list of PARAM_NAME=KEY_NAME pairs from system parameter names to
435    /// LaunchDarkly feature keys.
436    ///
437    /// This is used (so far only for testing purposes) when propagating values
438    /// from the latter to the former. The identity map is assumed for absent
439    /// parameter names.
440    #[clap(
441        long,
442        env = "LAUNCHDARKLY_KEY_MAP",
443        action = ArgAction::Append,
444        value_delimiter = ';'
445    )]
446    launchdarkly_key_map: Vec<KeyValueArg<String, String>>,
447    /// The duration at which the system parameter synchronization times out during startup.
448    #[clap(
449        long,
450        env = "CONFIG_SYNC_TIMEOUT",
451        value_parser = humantime::parse_duration,
452        default_value = "30s"
453    )]
454    config_sync_timeout: Duration,
455    /// The interval in seconds at which to synchronize system parameter values.
456    ///
457    /// If this is not explicitly set, the loop that synchronizes LaunchDarkly
458    /// features with system configuration parameters will not run _even if
459    /// [`Self::launchdarkly_sdk_key`] is present_.
460    #[clap(
461        long,
462        env = "CONFIG_SYNC_LOOP_INTERVAL",
463        value_parser = humantime::parse_duration,
464    )]
465    config_sync_loop_interval: Option<Duration>,
466    /// Path to a JSON file containing system parameter values.
467    /// If specified, this file will be used instead of LaunchDarkly for configuration.
468    #[clap(long, env = "CONFIG_SYNC_FILE_PATH", value_name = "PATH")]
469    config_sync_file_path: Option<PathBuf>,
470
471    // === Bootstrap options. ===
472    #[clap(
473        long,
474        env = "ENVIRONMENT_ID",
475        value_name = "<CLOUD>-<REGION>-<ORG-ID>-<ORDINAL>"
476    )]
477    environment_id: EnvironmentId,
478    /// If set, a role with the provided name will be created with `CREATEDB`
479    /// and `CREATECLUSTER` attributes. It will also have `CREATE` privileges on
480    /// the `materialize` database, `materialize.public` schema, and
481    /// `quickstart` cluster.
482    ///
483    /// This option is meant for local development and testing to simplify the
484    /// initial process of granting attributes and privileges to some default
485    /// role.
486    #[clap(long, env = "BOOTSTRAP_ROLE")]
487    bootstrap_role: Option<String>,
488    /// The size of the default cluster replica if bootstrapping.
489    #[clap(
490        long,
491        env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
492        default_value = "scale=1,workers=1"
493    )]
494    bootstrap_default_cluster_replica_size: String,
495    /// The size of the builtin system cluster replicas if bootstrapping.
496    #[clap(
497        long,
498        env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
499        default_value = "scale=1,workers=1"
500    )]
501    bootstrap_builtin_system_cluster_replica_size: String,
502    /// The size of the builtin catalog server cluster replicas if bootstrapping.
503    #[clap(
504        long,
505        env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
506        default_value = "scale=1,workers=1"
507    )]
508    bootstrap_builtin_catalog_server_cluster_replica_size: String,
509    /// The size of the builtin probe cluster replicas if bootstrapping.
510    #[clap(
511        long,
512        env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
513        default_value = "scale=1,workers=1"
514    )]
515    bootstrap_builtin_probe_cluster_replica_size: String,
516    /// The size of the builtin support cluster replicas if bootstrapping.
517    #[clap(
518        long,
519        env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
520        default_value = "scale=1,workers=1"
521    )]
522    bootstrap_builtin_support_cluster_replica_size: String,
523    /// The size of the builtin analytics cluster replicas if bootstrapping.
524    #[clap(
525        long,
526        env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
527        default_value = "scale=1,workers=1"
528    )]
529    bootstrap_builtin_analytics_cluster_replica_size: String,
530    #[clap(
531        long,
532        env = "BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR",
533        default_value = DEFAULT_REPLICATION_FACTOR.to_string(),
534        value_parser = clap::value_parser!(u32).range(0..=2)
535    )]
536    bootstrap_default_cluster_replication_factor: u32,
537    /// The replication factor of the builtin system cluster replicas if bootstrapping.
538    #[clap(
539        long,
540        env = "BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR",
541        default_value = SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
542        value_parser = clap::value_parser!(u32).range(0..=2)
543    )]
544    bootstrap_builtin_system_cluster_replication_factor: u32,
545    /// The replication factor of the builtin catalog server cluster replicas if bootstrapping.
546    #[clap(
547        long,
548        env = "BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICATION_FACTOR",
549        default_value = CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
550        value_parser = clap::value_parser!(u32).range(0..=2)
551    )]
552    bootstrap_builtin_catalog_server_cluster_replication_factor: u32,
553    /// The replication factor of the builtin probe cluster replicas if bootstrapping.
554    #[clap(
555        long,
556        env = "BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR",
557        default_value = PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
558        value_parser = clap::value_parser!(u32).range(0..=2)
559    )]
560    bootstrap_builtin_probe_cluster_replication_factor: u32,
561    /// The replication factor of the builtin support cluster replicas if bootstrapping.
562    #[clap(
563        long,
564        env = "BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICATION_FACTOR",
565        default_value = SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
566        value_parser = clap::value_parser!(u32).range(0..=2)
567    )]
568    bootstrap_builtin_support_cluster_replication_factor: u32,
569    /// The replication factor of the builtin analytics cluster replicas if bootstrapping.
570    #[clap(
571        long,
572        env = "BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICATION_FACTOR",
573        default_value = ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR.to_string(),
574        value_parser = clap::value_parser!(u32).range(0..=2)
575    )]
576    bootstrap_builtin_analytics_cluster_replication_factor: u32,
577    /// An list of NAME=VALUE pairs used to override static defaults
578    /// for system parameters.
579    #[clap(
580        long,
581        env = "SYSTEM_PARAMETER_DEFAULT",
582        action = ArgAction::Append,
583        value_delimiter = ';'
584    )]
585    system_parameter_default: Vec<KeyValueArg<String, String>>,
586    /// File containing a valid Materialize license key.
587    #[clap(long, env = "LICENSE_KEY")]
588    license_key: Option<String>,
589
590    // === AWS options. ===
591    /// The AWS account ID, which will be used to generate ARNs for
592    /// Materialize-controlled AWS resources.
593    #[clap(long, env = "AWS_ACCOUNT_ID")]
594    aws_account_id: Option<String>,
595    /// The ARN for a Materialize-controlled role to assume before assuming
596    /// a customer's requested role for an AWS connection.
597    #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
598    aws_connection_role_arn: Option<String>,
599    /// Prefix for an external ID to be supplied to all AWS AssumeRole operations.
600    ///
601    /// Details: <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>
602    #[clap(long, env = "AWS_EXTERNAL_ID_PREFIX", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
603    aws_external_id_prefix: Option<AwsExternalIdPrefix>,
604    /// The list of supported AWS PrivateLink availability zone ids.
605    /// Must be zone IDs, of format e.g. "use-az1".
606    #[clap(
607        long,
608        env = "AWS_PRIVATELINK_AVAILABILITY_ZONES",
609        action = ArgAction::Append,
610        use_value_delimiter = true
611    )]
612    aws_privatelink_availability_zones: Option<Vec<String>>,
613
614    // === Tracing options. ===
615    #[clap(flatten)]
616    tracing: TracingCliArgs,
617
618    // === Testing options. ===
619    /// Forces the migration of all builtin storage collections using the
620    /// specified migration mechanism (either "evolution" or "replacement").
621    ///
622    /// This argument is meant for testing only and as the names suggests
623    /// should not be set in production.
624    #[clap(long, value_enum, requires = "unsafe_mode")]
625    unsafe_force_builtin_schema_migration: Option<String>,
626}
627
628#[derive(ValueEnum, Debug, Clone)]
629enum OrchestratorKind {
630    Kubernetes,
631    Process,
632}
633
634// TODO [Alex Hunt] move this to a shared function that can be imported by the
635// region-controller.
636fn aws_secrets_controller_prefix(env_id: &EnvironmentId) -> String {
637    format!("/user-managed/{}/", env_id)
638}
639fn aws_secrets_controller_key_alias(env_id: &EnvironmentId) -> String {
640    // TODO [Alex Hunt] move this to a shared function that can be imported by the
641    // region-controller.
642    format!("alias/customer_key_{}", env_id)
643}
644
645pub fn main() {
646    let args = cli::parse_args(CliConfig {
647        env_prefix: Some("MZ_"),
648        enable_version_flag: true,
649    });
650    if let Err(err) = run(args) {
651        panic!("environmentd: fatal: {}", err.display_with_causes());
652    }
653}
654
655fn run(mut args: Args) -> Result<(), anyhow::Error> {
656    mz_ore::panic::install_enhanced_handler();
657    let envd_start = Instant::now();
658
659    // Configure signal handling as soon as possible. We want signals to be
660    // handled to our liking ASAP.
661    sys::enable_sigusr2_coverage_dump()?;
662    sys::enable_termination_signal_cleanup()?;
663
664    let license_key = if let Some(license_key_file) = args.license_key {
665        let license_key_text = std::fs::read_to_string(&license_key_file)
666            .context("failed to open license key file")?;
667        let license_key = mz_license_keys::validate(license_key_text.trim())
668            .context("failed to validate license key file")?;
669        if license_key.expired {
670            let message = format!(
671                "The license key provided at {license_key_file} is expired! Please contact Materialize for assistance."
672            );
673            match license_key.expiration_behavior {
674                ExpirationBehavior::Warn | ExpirationBehavior::DisableClusterCreation => {
675                    warn!("{message}");
676                }
677                ExpirationBehavior::Disable => bail!("{message}"),
678            }
679        }
680        license_key
681    } else if matches!(args.orchestrator, OrchestratorKind::Kubernetes) {
682        bail!("--license-key is required when running in Kubernetes");
683    } else {
684        // license key checks are optional for the emulator
685        ValidatedLicenseKey::disabled()
686    };
687
688    // Configure testing options.
689    let force_builtin_schema_migration = args
690        .unsafe_force_builtin_schema_migration
691        .inspect(|_mechanism| assert!(args.unsafe_mode));
692
693    // Start Tokio runtime.
694
695    let ncpus_useful = usize::max(1, cmp::min(num_cpus::get(), num_cpus::get_physical()));
696    let runtime = Arc::new(
697        tokio::runtime::Builder::new_multi_thread()
698            .worker_threads(ncpus_useful)
699            .thread_stack_size(3 * 1024 * 1024) // 3 MiB
700            // The default thread name exceeds the Linux limit on thread name
701            // length, so pick something shorter.
702            .thread_name_fn(|| {
703                static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
704                let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
705                format!("tokio:work-{}", id)
706            })
707            .enable_all()
708            .build()?,
709    );
710
711    // Configure tracing to log the service name when using the process
712    // orchestrator, which intermingles log output from multiple services. Other
713    // orchestrators separate log output from different services.
714    args.tracing.log_prefix = if matches!(args.orchestrator, OrchestratorKind::Process) {
715        Some("environmentd".to_string())
716    } else {
717        None
718    };
719
720    let metrics_registry = MetricsRegistry::new();
721    let tracing_handle = runtime.block_on(args.tracing.configure_tracing(
722        StaticTracingConfig {
723            service_name: "environmentd",
724            build_info: BUILD_INFO,
725        },
726        metrics_registry.clone(),
727    ))?;
728    register_runtime_metrics("main", runtime.metrics(), &metrics_registry);
729
730    let span = tracing::info_span!("environmentd::run").entered();
731
732    info!("startup: envd init: beginning");
733    info!("startup: envd init: preamble beginning");
734
735    let metrics = Metrics::register_into(&metrics_registry, BUILD_INFO);
736
737    runtime.block_on(mz_alloc::register_metrics_into(&metrics_registry));
738    runtime.block_on(mz_metrics::register_metrics_into(
739        &metrics_registry,
740        mz_dyncfgs::all_dyncfgs(),
741    ));
742
743    // Initialize fail crate for failpoint support
744    let _failpoint_scenario = FailScenario::setup();
745
746    // Configure connections.
747    let tls = args.tls.into_config()?;
748    let frontegg = FronteggAuthenticator::from_args(args.frontegg, &metrics_registry)?;
749    let listeners_config: ListenersConfig = {
750        let f = File::open(args.listeners_config_path)?;
751        serde_json::from_reader(f)?
752    };
753
754    for (_, listener) in &listeners_config.sql {
755        listener
756            .validate()
757            .map_err(|e| anyhow::anyhow!("invalid SQL listener: {}", e))?;
758    }
759
760    for (_, listener) in &listeners_config.http {
761        listener
762            .validate()
763            .map_err(|e| anyhow::anyhow!("invalid HTTP listener: {}", e))?;
764    }
765
766    // Configure CORS.
767    let allowed_origins = if !args.cors_allowed_origin.is_empty() {
768        args.cors_allowed_origin
769    } else {
770        let mut allowed_origins = Vec::with_capacity(listeners_config.http.len() * 6);
771        for (_, listener) in &listeners_config.http {
772            let port = listener.addr().port();
773            allowed_origins.extend([
774                HeaderValue::from_str(&format!("http://localhost:{}", port)).unwrap(),
775                HeaderValue::from_str(&format!("http://127.0.0.1:{}", port)).unwrap(),
776                HeaderValue::from_str(&format!("http://[::1]:{}", port)).unwrap(),
777                HeaderValue::from_str(&format!("https://localhost:{}", port)).unwrap(),
778                HeaderValue::from_str(&format!("https://127.0.0.1:{}", port)).unwrap(),
779                HeaderValue::from_str(&format!("https://[::1]:{}", port)).unwrap(),
780            ])
781        }
782        allowed_origins
783    };
784    let cors_allowed_origin = mz_http_util::build_cors_allowed_origin(&allowed_origins);
785    let cors_allowed_origin_list = allowed_origins.clone();
786
787    // Configure controller.
788    let entered = info_span!("environmentd::configure_controller").entered();
789    let (orchestrator, secrets_controller, cloud_resource_controller): (
790        Arc<dyn Orchestrator>,
791        Arc<dyn SecretsController>,
792        Option<Arc<dyn CloudResourceController>>,
793    ) = match args.orchestrator {
794        OrchestratorKind::Kubernetes => {
795            if args.orchestrator_process_scratch_directory.is_some() {
796                bail!(
797                    "--orchestrator-process-scratch-directory is \
798                      not currently usable with the kubernetes orchestrator"
799                );
800            }
801
802            let orchestrator = Arc::new(
803                runtime
804                    .block_on(KubernetesOrchestrator::new(KubernetesOrchestratorConfig {
805                        context: args.orchestrator_kubernetes_context.clone(),
806                        scheduler_name: args.orchestrator_kubernetes_scheduler_name,
807                        service_annotations: args
808                            .orchestrator_kubernetes_service_annotation
809                            .into_iter()
810                            .map(|l| (l.key, l.value))
811                            .collect(),
812                        service_labels: args
813                            .orchestrator_kubernetes_service_label
814                            .into_iter()
815                            .map(|l| (l.key, l.value))
816                            .collect(),
817                        service_node_selector: args
818                            .orchestrator_kubernetes_service_node_selector
819                            .into_iter()
820                            .map(|l| (l.key, l.value))
821                            .collect(),
822                        service_affinity: args.orchestrator_kubernetes_service_affinity,
823                        service_tolerations: args.orchestrator_kubernetes_service_tolerations,
824                        service_account: args.orchestrator_kubernetes_service_account,
825                        image_pull_policy: args.orchestrator_kubernetes_image_pull_policy,
826                        aws_external_id_prefix: args.aws_external_id_prefix.clone(),
827                        coverage: args.orchestrator_kubernetes_coverage,
828                        ephemeral_volume_storage_class: args
829                            .orchestrator_kubernetes_ephemeral_volume_class
830                            .clone(),
831                        service_fs_group: args.orchestrator_kubernetes_service_fs_group.clone(),
832                        name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
833                        collect_pod_metrics: !args
834                            .orchestrator_kubernetes_disable_pod_metrics_collection,
835                        enable_prometheus_scrape_annotations: args
836                            .orchestrator_kubernetes_enable_prometheus_scrape_annotations,
837                    }))
838                    .context("creating kubernetes orchestrator")?,
839            );
840            let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
841                SecretsControllerKind::Kubernetes => {
842                    let sc = Arc::clone(&orchestrator);
843                    let sc: Arc<dyn SecretsController> = sc;
844                    sc
845                }
846                SecretsControllerKind::AwsSecretsManager => {
847                    Arc::new(
848                        runtime.block_on(AwsSecretsController::new(
849                            // TODO [Alex Hunt] move this to a shared function that can be imported by the
850                            // region-controller.
851                            &aws_secrets_controller_prefix(&args.environment_id),
852                            &aws_secrets_controller_key_alias(&args.environment_id),
853                            args.aws_secrets_controller_tags
854                                .into_iter()
855                                .map(|tag| (tag.key, tag.value))
856                                .collect(),
857                        )),
858                    )
859                }
860                SecretsControllerKind::LocalFile => bail!(
861                    "SecretsControllerKind::LocalFile is not compatible with Orchestrator::Kubernetes."
862                ),
863            };
864            let cloud_resource_controller = Arc::clone(&orchestrator);
865            (
866                orchestrator,
867                secrets_controller,
868                Some(cloud_resource_controller),
869            )
870        }
871        OrchestratorKind::Process => {
872            if args
873                .orchestrator_kubernetes_ephemeral_volume_class
874                .is_some()
875            {
876                bail!(
877                    "--orchestrator-kubernetes-ephemeral-volume-class is \
878                      not usable with the process orchestrator"
879                );
880            }
881            let orchestrator = Arc::new(
882                runtime
883                    .block_on(ProcessOrchestrator::new(ProcessOrchestratorConfig {
884                        // Look for binaries in the same directory as the
885                        // running binary. When running via `cargo run`, this
886                        // means that debug binaries look for other debug
887                        // binaries and release binaries look for other release
888                        // binaries.
889                        image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
890                        suppress_output: false,
891                        environment_id: args.environment_id.to_string(),
892                        secrets_dir: args
893                            .orchestrator_process_secrets_directory
894                            .clone()
895                            .expect("clap enforced"),
896                        command_wrapper: args
897                            .orchestrator_process_wrapper
898                            .map_or(Ok(vec![]), |s| shell_words::split(&s))?,
899                        propagate_crashes: args.orchestrator_process_propagate_crashes,
900                        tcp_proxy: args.orchestrator_process_tcp_proxy_listen_addr.map(
901                            |listen_addr| ProcessOrchestratorTcpProxyConfig {
902                                listen_addr,
903                                prometheus_service_discovery_dir: args
904                                    .orchestrator_process_prometheus_service_discovery_directory,
905                            },
906                        ),
907                        scratch_directory: args
908                            .orchestrator_process_scratch_directory
909                            .expect("process orchestrator requires scratch directory"),
910                    }))
911                    .context("creating process orchestrator")?,
912            );
913            let secrets_controller: Arc<dyn SecretsController> = match args.secrets_controller {
914                SecretsControllerKind::Kubernetes => bail!(
915                    "SecretsControllerKind::Kubernetes is not compatible with Orchestrator::Process."
916                ),
917                SecretsControllerKind::AwsSecretsManager => Arc::new(
918                    runtime.block_on(AwsSecretsController::new(
919                        &aws_secrets_controller_prefix(&args.environment_id),
920                        &aws_secrets_controller_key_alias(&args.environment_id),
921                        args.aws_secrets_controller_tags
922                            .into_iter()
923                            .map(|tag| (tag.key, tag.value))
924                            .collect(),
925                    )),
926                ),
927                SecretsControllerKind::LocalFile => {
928                    let sc = Arc::clone(&orchestrator);
929                    let sc: Arc<dyn SecretsController> = sc;
930                    sc
931                }
932            };
933            (orchestrator, secrets_controller, None)
934        }
935    };
936    drop(entered);
937    let cloud_resource_reader = cloud_resource_controller.as_ref().map(|c| c.reader());
938    let secrets_reader = secrets_controller.reader();
939    let now = SYSTEM_TIME.clone();
940
941    let mut persist_config =
942        PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
943    // Start with compaction disabled, later enable it if we're not in read-only mode.
944    persist_config.disable_compaction();
945
946    let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
947    let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
948
949    match args.persist_isolated_runtime_threads {
950        // Use the default.
951        None | Some(0) => (),
952        Some(x @ ..=-1) => {
953            let threads = num_cpus::get().saturating_add_signed(x).max(1);
954            persist_config.isolated_runtime_worker_threads = threads;
955        }
956        Some(x @ 1..) => {
957            let threads = usize::try_from(x).expect("pattern matched a positive value");
958            persist_config.isolated_runtime_worker_threads = threads;
959        }
960    };
961
962    let _server = runtime.spawn_named(
963        || "persist::rpc::server",
964        async move {
965            info!(
966                "listening for Persist PubSub connections on {}",
967                args.internal_persist_pubsub_listen_addr
968            );
969            // Intentionally do not bubble up errors here, we don't want to take
970            // down environmentd if there are any issues with the pubsub server.
971            let res = persist_pubsub_server
972                .serve(args.internal_persist_pubsub_listen_addr)
973                .await;
974            error!("Persist Pubsub server exited {:?}", res);
975        }
976        .instrument(tracing::info_span!("persist::rpc::server")),
977    );
978
979    let persist_clients = {
980        // PersistClientCache may spawn tasks, so run within a tokio runtime context
981        let _tokio_guard = runtime.enter();
982        PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
983            let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
984                cfg,
985                persist_pubsub_client.sender,
986                metrics,
987            ));
988            PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
989        })
990    };
991
992    let consensus_uri = args.persist_consensus_url.unwrap_or_else(|| {
993        args.metadata_backend_url
994            .as_ref()
995            .map(|metadata_backend_url| {
996                SensitiveUrl(
997                    Url::parse_with_params(
998                        metadata_backend_url.0.as_ref(),
999                        &[("options", "--search_path=consensus")],
1000                    )
1001                    .unwrap(),
1002                )
1003            })
1004            .expect("either --persist-consensus-url or --metadata-backend-url must be provided")
1005    });
1006    let timestamp_oracle_url = args.timestamp_oracle_url.or_else(|| {
1007        args.metadata_backend_url
1008            .as_ref()
1009            .map(|metadata_backend_url| {
1010                SensitiveUrl(
1011                    Url::parse_with_params(
1012                        metadata_backend_url.0.as_ref(),
1013                        &[("options", "--search_path=tsoracle")],
1014                    )
1015                    .unwrap(),
1016                )
1017            })
1018    });
1019
1020    let persist_clients = Arc::new(persist_clients);
1021    let connection_context = ConnectionContext::from_cli_args(
1022        args.environment_id.to_string(),
1023        &args.tracing.startup_log_filter,
1024        args.aws_external_id_prefix,
1025        args.aws_connection_role_arn,
1026        secrets_reader,
1027        cloud_resource_reader,
1028    );
1029    let orchestrator = Arc::new(TracingOrchestrator::new(orchestrator, args.tracing.clone()));
1030    let replica_http_locator = Arc::new(ReplicaHttpLocator::default());
1031    let controller = ControllerConfig {
1032        build_info: &BUILD_INFO,
1033        orchestrator,
1034        persist_location: PersistLocation {
1035            blob_uri: args.persist_blob_url,
1036            consensus_uri,
1037        },
1038        persist_clients: Arc::clone(&persist_clients),
1039        clusterd_image: args.clusterd_image.expect("clap enforced"),
1040        init_container_image: args.orchestrator_kubernetes_init_container_image,
1041        deploy_generation: args.deploy_generation,
1042        now: SYSTEM_TIME.clone(),
1043        metrics_registry: metrics_registry.clone(),
1044        persist_pubsub_url: args.persist_pubsub_url,
1045        connection_context,
1046        // When serialized to args in the controller, only the relevant flags will be passed
1047        // through, so we just set all of them
1048        secrets_args: SecretsReaderCliArgs {
1049            secrets_reader: args.secrets_controller,
1050            secrets_reader_local_file_dir: args.orchestrator_process_secrets_directory,
1051            secrets_reader_kubernetes_context: Some(args.orchestrator_kubernetes_context),
1052            secrets_reader_aws_prefix: Some(aws_secrets_controller_prefix(&args.environment_id)),
1053            secrets_reader_name_prefix: args.orchestrator_kubernetes_name_prefix.clone(),
1054        },
1055        replica_http_locator: Arc::clone(&replica_http_locator),
1056    };
1057
1058    let cluster_replica_sizes = ClusterReplicaSizeMap::parse_from_str(
1059        &args.cluster_replica_sizes,
1060        !license_key.allow_credit_consumption_override,
1061    )
1062    .context("parsing replica size map")?;
1063
1064    emit_boot_diagnostics!(&BUILD_INFO);
1065    sys::adjust_rlimits();
1066
1067    info!(
1068        "startup: envd init: preamble complete in {:?}",
1069        envd_start.elapsed()
1070    );
1071
1072    let serve_start = Instant::now();
1073    info!("startup: envd init: serving beginning");
1074    let server = runtime.block_on(async {
1075        let listeners = Listeners::bind(listeners_config).await?;
1076        let catalog_config = CatalogConfig {
1077            persist_clients,
1078            metrics: Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry)),
1079        };
1080        let server = listeners
1081            .serve(crate::Config {
1082                // Special modes.
1083                unsafe_mode: args.unsafe_mode,
1084                all_features: args.all_features,
1085                // Connection options.
1086                tls,
1087                tls_reload_certs: mz_server_core::default_cert_reload_ticker(),
1088                external_login_password_mz_system: args.external_login_password_mz_system,
1089                frontegg,
1090                cors_allowed_origin,
1091                cors_allowed_origin_list,
1092                egress_addresses: args.announce_egress_address,
1093                http_host_name: args.http_host_name,
1094                internal_console_redirect_url: args.internal_console_redirect_url,
1095                // Controller options.
1096                controller,
1097                secrets_controller,
1098                cloud_resource_controller,
1099                // Storage options.
1100                storage_usage_collection_interval: args.storage_usage_collection_interval_sec,
1101                storage_usage_retention_period: args.storage_usage_retention_period,
1102                // Adapter options.
1103                catalog_config,
1104                availability_zones: args.availability_zone,
1105                cluster_replica_sizes,
1106                timestamp_oracle_url,
1107                segment_api_key: args.segment_api_key,
1108                segment_client_side: args.segment_client_side,
1109                test_only_dummy_segment_client: args.test_only_dummy_segment_client,
1110                launchdarkly_sdk_key: args.launchdarkly_sdk_key,
1111                launchdarkly_key_map: args
1112                    .launchdarkly_key_map
1113                    .into_iter()
1114                    .map(|kv| (kv.key, kv.value))
1115                    .collect(),
1116                config_sync_timeout: args.config_sync_timeout,
1117                config_sync_loop_interval: args.config_sync_loop_interval,
1118                config_sync_file_path: args.config_sync_file_path,
1119
1120                // Bootstrap options.
1121                environment_id: args.environment_id,
1122                bootstrap_role: args.bootstrap_role,
1123                bootstrap_default_cluster_replica_size: args.bootstrap_default_cluster_replica_size,
1124                bootstrap_default_cluster_replication_factor: args
1125                    .bootstrap_default_cluster_replication_factor,
1126                bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1127                    size: args.bootstrap_builtin_system_cluster_replica_size,
1128                    replication_factor: args.bootstrap_builtin_system_cluster_replication_factor,
1129                },
1130                bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1131                    size: args.bootstrap_builtin_catalog_server_cluster_replica_size,
1132                    replication_factor: args
1133                        .bootstrap_builtin_catalog_server_cluster_replication_factor,
1134                },
1135                bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1136                    size: args.bootstrap_builtin_probe_cluster_replica_size,
1137                    replication_factor: args.bootstrap_builtin_probe_cluster_replication_factor,
1138                },
1139                bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1140                    size: args.bootstrap_builtin_support_cluster_replica_size,
1141                    replication_factor: args.bootstrap_builtin_support_cluster_replication_factor,
1142                },
1143                bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1144                    size: args.bootstrap_builtin_analytics_cluster_replica_size,
1145                    replication_factor: args.bootstrap_builtin_analytics_cluster_replication_factor,
1146                },
1147                system_parameter_defaults: args
1148                    .system_parameter_default
1149                    .into_iter()
1150                    .map(|kv| (kv.key, kv.value))
1151                    .collect(),
1152                helm_chart_version: args.helm_chart_version.clone(),
1153                license_key,
1154                // AWS options.
1155                aws_account_id: args.aws_account_id,
1156                aws_privatelink_availability_zones: args.aws_privatelink_availability_zones,
1157                // Observability options.
1158                metrics_registry,
1159                tracing_handle,
1160                // Testing options.
1161                now,
1162                force_builtin_schema_migration,
1163            })
1164            .await
1165            .maybe_terminate("booting server")?;
1166        Ok::<_, anyhow::Error>(server)
1167    })?;
1168    info!(
1169        "startup: envd init: serving complete in {:?}",
1170        serve_start.elapsed()
1171    );
1172
1173    let start_duration = envd_start.elapsed();
1174    metrics
1175        .start_time_environmentd
1176        .set(start_duration.as_millis().try_into().expect("must fit"));
1177    let span = span.exit();
1178    let id = span.context().span().span_context().trace_id();
1179    drop(span);
1180
1181    info!("startup: envd init: complete in {start_duration:?}");
1182
1183    println!(
1184        "environmentd {} listening...",
1185        BUILD_INFO.human_version(args.helm_chart_version)
1186    );
1187    for (name, handle) in &server.sql_listener_handles {
1188        println!("{} SQL address: {}", name, handle.local_addr);
1189    }
1190    for (name, handle) in &server.http_listener_handles {
1191        println!("{} HTTP address: {}", name, handle.local_addr);
1192    }
1193    // TODO move persist pubsub address like metrics address?
1194    println!(
1195        " Internal Persist PubSub address: {}",
1196        args.internal_persist_pubsub_listen_addr
1197    );
1198
1199    println!(" Root trace ID: {id}");
1200
1201    // Block forever.
1202    loop {
1203        thread::park();
1204    }
1205}
1206
1207fn build_info() -> Vec<String> {
1208    let openssl_version =
1209        unsafe { CStr::from_ptr(openssl_sys::OpenSSL_version(openssl_sys::OPENSSL_VERSION)) };
1210    let rdkafka_version = unsafe { CStr::from_ptr(rdkafka_sys::bindings::rd_kafka_version_str()) };
1211    vec![
1212        openssl_version.to_string_lossy().into_owned(),
1213        format!("librdkafka v{}", rdkafka_version.to_string_lossy()),
1214    ]
1215}
1216
1217#[derive(Debug, Clone)]
1218struct Metrics {
1219    pub start_time_environmentd: IntGauge,
1220}
1221
1222impl Metrics {
1223    pub fn register_into(registry: &MetricsRegistry, build_info: BuildInfo) -> Metrics {
1224        Metrics {
1225            start_time_environmentd: registry.register(metric!(
1226                name: "mz_start_time_environmentd",
1227                help: "Time in milliseconds from environmentd start until the adapter is ready.",
1228                const_labels: {
1229                    "version" => build_info.version,
1230                    "build_type" => if cfg!(release) { "release" } else { "debug" }
1231                },
1232            )),
1233        }
1234    }
1235}