Skip to main content

mz_environmentd/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A SQL stream processor built on top of [timely dataflow] and
11//! [differential dataflow].
12//!
13//! [differential dataflow]: ../differential_dataflow/index.html
14//! [timely dataflow]: ../timely/index.html
15
16use ::http::HeaderValue;
17use std::collections::BTreeMap;
18use std::panic::AssertUnwindSafe;
19use std::path::PathBuf;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use std::{env, io};
24
25use anyhow::{Context, anyhow};
26use derivative::Derivative;
27use futures::FutureExt;
28use ipnet::IpNet;
29use mz_adapter::config::{
30    SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
31};
32use mz_adapter::webhook::WebhookConcurrencyLimiter;
33use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
34use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
35use mz_adapter_types::dyncfgs::{
36    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
37    WITH_0DT_DEPLOYMENT_MAX_WAIT,
38};
39use mz_auth::password::Password;
40use mz_authenticator::GenericOidcAuthenticator;
41use mz_build_info::{BuildInfo, build_info};
42use mz_catalog::config::ClusterReplicaSizeMap;
43use mz_catalog::durable::BootstrapArgs;
44use mz_cloud_resources::CloudResourceController;
45use mz_controller::ControllerConfig;
46use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
47use mz_license_keys::ValidatedLicenseKey;
48use mz_ore::future::OreFutureExt;
49use mz_ore::metrics::MetricsRegistry;
50use mz_ore::now::NowFn;
51use mz_ore::tracing::TracingHandle;
52use mz_ore::url::SensitiveUrl;
53use mz_ore::{instrument, task};
54use mz_persist_client::cache::PersistClientCache;
55use mz_persist_client::usage::StorageUsageClient;
56use mz_pgwire::MetricsConfig;
57use mz_pgwire_common::ConnectionCounter;
58use mz_repr::strconv;
59use mz_secrets::SecretsController;
60use mz_server_core::listeners::{
61    HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
62};
63use mz_server_core::{
64    ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
65    TlsCertConfig, TlsMode,
66};
67use mz_sql::catalog::EnvironmentId;
68use mz_sql::session::vars::{Value, VarInput};
69use tokio::sync::oneshot;
70use tower_http::cors::AllowOrigin;
71use tracing::{Instrument, info, info_span};
72
73use crate::deployment::preflight::{PreflightInput, PreflightOutput};
74use crate::deployment::state::DeploymentState;
75use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
76
77pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
78
79mod deployment;
80pub mod environmentd;
81pub mod http;
82mod telemetry;
83#[cfg(feature = "test")]
84pub mod test_util;
85
86pub const BUILD_INFO: BuildInfo = build_info!();
87
88/// Configuration for an `environmentd` server.
89#[derive(Derivative)]
90#[derivative(Debug)]
91pub struct Config {
92    // === Special modes. ===
93    /// Whether to permit usage of unsafe features. This is never meant to run
94    /// in production.
95    pub unsafe_mode: bool,
96    /// Whether the environmentd is running on a local dev machine. This is
97    /// never meant to run in production or CI.
98    pub all_features: bool,
99
100    // === Connection options. ===
101    /// TLS encryption and authentication configuration.
102    pub tls: Option<TlsCertConfig>,
103    /// Trigger to attempt to reload TLS certififcates.
104    #[derivative(Debug = "ignore")]
105    pub tls_reload_certs: ReloadTrigger,
106    /// Password of the mz_system user.
107    pub external_login_password_mz_system: Option<Password>,
108    /// Frontegg JWT authenticator.
109    pub frontegg: Option<FronteggAuthenticator>,
110    /// Frontegg workspace URL advertised in MCP OAuth discovery.
111    pub frontegg_oauth_issuer_url: Option<String>,
112    /// Origins for which cross-origin resource sharing (CORS) for HTTP requests
113    /// is permitted.
114    pub cors_allowed_origin: AllowOrigin,
115    /// Raw list of allowed CORS origins. Retained alongside `cors_allowed_origin`
116    /// (which is the computed predicate) so that endpoints like MCP can perform
117    /// server-side Origin validation to defend against DNS rebinding attacks
118    /// (where same-origin requests bypass CORS enforcement).
119    pub cors_allowed_origin_list: Vec<HeaderValue>,
120    /// Public IP addresses which the cloud environment has configured for
121    /// egress.
122    pub egress_addresses: Vec<IpNet>,
123    /// The external host name to connect to the HTTP server of this
124    /// environment.
125    ///
126    /// Presently used to render webhook URLs for end users in notices and the
127    /// system catalog. Not used to establish connections directly.
128    pub http_host_name: Option<String>,
129    /// The URL of the Materialize console to proxy from the /internal-console
130    /// endpoint on the internal HTTP server.
131    pub internal_console_redirect_url: Option<String>,
132
133    // === Controller options. ===
134    /// Storage and compute controller configuration.
135    pub controller: ControllerConfig,
136    /// Secrets controller configuration.
137    pub secrets_controller: Arc<dyn SecretsController>,
138    /// VpcEndpoint controller configuration.
139    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
140
141    // === Storage options. ===
142    /// The interval at which to collect storage usage information.
143    pub storage_usage_collection_interval: Duration,
144    /// How long to retain storage usage records for.
145    pub storage_usage_retention_period: Option<Duration>,
146
147    // === Adapter options. ===
148    /// Catalog configuration.
149    pub catalog_config: CatalogConfig,
150    /// Availability zones in which storage and compute resources may be
151    /// deployed.
152    pub availability_zones: Vec<String>,
153    /// A map from size name to resource allocations for cluster replicas.
154    pub cluster_replica_sizes: ClusterReplicaSizeMap,
155    /// The PostgreSQL URL for the Postgres-backed timestamp oracle.
156    pub timestamp_oracle_url: Option<SensitiveUrl>,
157    /// An API key for Segment. Enables export of audit events to Segment.
158    pub segment_api_key: Option<String>,
159    /// Whether the Segment client is being used on the client side
160    /// (rather than the server side).
161    pub segment_client_side: bool,
162    /// Only create a dummy segment client, only to get more testing coverage.
163    pub test_only_dummy_segment_client: bool,
164    /// An SDK key for LaunchDarkly. Enables system parameter synchronization
165    /// with LaunchDarkly.
166    pub launchdarkly_sdk_key: Option<String>,
167    /// An invertible map from system parameter names to LaunchDarkly feature
168    /// keys to use when propagating values from the latter to the former.
169    pub launchdarkly_key_map: BTreeMap<String, String>,
170    /// The duration at which the system parameter synchronization times out during startup.
171    pub config_sync_timeout: Duration,
172    /// The interval in seconds at which to synchronize system parameter values.
173    pub config_sync_loop_interval: Option<Duration>,
174    /// The path for file based config sync
175    pub config_sync_file_path: Option<PathBuf>,
176
177    // === Bootstrap options. ===
178    /// The cloud ID of this environment.
179    pub environment_id: EnvironmentId,
180    /// What role, if any, should be initially created with elevated privileges.
181    pub bootstrap_role: Option<String>,
182    /// The size of the default cluster replica if bootstrapping.
183    pub bootstrap_default_cluster_replica_size: String,
184    /// The default number of replicas if bootstrapping.
185    pub bootstrap_default_cluster_replication_factor: u32,
186    /// The config of the builtin system cluster replicas if bootstrapping.
187    pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
188    /// The config of the builtin catalog server cluster replicas if bootstrapping.
189    pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
190    /// The config of the builtin probe cluster replicas if bootstrapping.
191    pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
192    /// The config of the builtin support cluster replicas if bootstrapping.
193    pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
194    /// The config of the builtin analytics cluster replicas if bootstrapping.
195    pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
196    /// Values to set for system parameters, if those system parameters have not
197    /// already been set by the system user.
198    pub system_parameter_defaults: BTreeMap<String, String>,
199    /// Helm chart version
200    pub helm_chart_version: Option<String>,
201    /// Configuration managed by license keys
202    pub license_key: ValidatedLicenseKey,
203
204    // === AWS options. ===
205    /// The AWS account ID, which will be used to generate ARNs for
206    /// Materialize-controlled AWS resources.
207    pub aws_account_id: Option<String>,
208    /// Supported AWS PrivateLink availability zone ids.
209    pub aws_privatelink_availability_zones: Option<Vec<String>>,
210
211    // === Observability options. ===
212    /// The metrics registry to use.
213    pub metrics_registry: MetricsRegistry,
214    /// Handle to tracing.
215    pub tracing_handle: TracingHandle,
216
217    // === Testing options. ===
218    /// A now generation function for mocking time.
219    pub now: NowFn,
220    /// If `Some`, force running builtin schema migration using the specified
221    /// migration mechanism ("evolution" or "replacement").
222    pub force_builtin_schema_migration: Option<String>,
223}
224
225/// Configuration for the Catalog.
226#[derive(Debug, Clone)]
227pub struct CatalogConfig {
228    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
229    pub persist_clients: Arc<PersistClientCache>,
230    /// Persist catalog metrics.
231    pub metrics: Arc<mz_catalog::durable::Metrics>,
232}
233
234pub struct Listener<C> {
235    pub handle: ListenerHandle,
236    connection_stream: Pin<Box<dyn ConnectionStream>>,
237    config: C,
238}
239impl<C> Listener<C>
240where
241    C: ListenerConfig,
242{
243    /// Initializes network listeners for a later call to `serve` at the
244    /// specified addresses.
245    ///
246    /// Splitting this function out from `serve` has two benefits:
247    ///
248    ///   * It initializes network listeners as early as possible, so that the OS
249    ///     will queue incoming connections while the server is booting.
250    ///
251    ///   * It allows the caller to communicate with the server via the internal
252    ///     HTTP port while it is booting.
253    ///
254    async fn bind(config: C) -> Result<Self, io::Error> {
255        let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
256        Ok(Self {
257            handle,
258            connection_stream,
259            config,
260        })
261    }
262}
263
264impl Listener<SqlListenerConfig> {
265    #[instrument(name = "environmentd::serve_sql")]
266    pub async fn serve_sql(
267        self,
268        name: String,
269        active_connection_counter: ConnectionCounter,
270        tls_reloading_context: Option<ReloadingSslContext>,
271        frontegg: Option<FronteggAuthenticator>,
272        adapter_client: AdapterClient,
273        oidc: GenericOidcAuthenticator,
274        metrics: MetricsConfig,
275        helm_chart_version: Option<String>,
276    ) -> ListenerHandle {
277        let label: &'static str = Box::leak(name.into_boxed_str());
278        let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
279            context,
280            mode: if self.config.enable_tls {
281                TlsMode::Require
282            } else {
283                TlsMode::Allow
284            },
285        });
286
287        task::spawn(|| format!("{}_sql_server", label), {
288            let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
289                label,
290                tls,
291                adapter_client,
292                authenticator_kind: self.config.authenticator_kind,
293                frontegg,
294                oidc,
295                metrics,
296                active_connection_counter,
297                helm_chart_version,
298                allowed_roles: self.config.allowed_roles,
299            });
300            mz_server_core::serve(ServeConfig {
301                conns: self.connection_stream,
302                server: sql_server,
303                // `environmentd` does not currently need to dynamically
304                // configure graceful termination behavior.
305                dyncfg: None,
306            })
307        });
308        self.handle
309    }
310}
311
312impl Listener<HttpListenerConfig> {
313    #[instrument(name = "environmentd::serve_http")]
314    pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
315        let task_name = format!("{}_http_server", &config.source);
316        task::spawn(|| task_name, {
317            let http_server = HttpServer::new(config);
318            mz_server_core::serve(ServeConfig {
319                conns: self.connection_stream,
320                server: http_server,
321                // `environmentd` does not currently need to dynamically
322                // configure graceful termination behavior.
323                dyncfg: None,
324            })
325        });
326        self.handle
327    }
328}
329
330pub struct Listeners {
331    pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
332    pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
333}
334
335impl Listeners {
336    pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
337        let mut sql = BTreeMap::new();
338        for (name, config) in config.sql {
339            sql.insert(name, Listener::bind(config).await?);
340        }
341
342        let mut http = BTreeMap::new();
343        for (name, config) in config.http {
344            http.insert(name, Listener::bind(config).await?);
345        }
346
347        Ok(Listeners { http, sql })
348    }
349
350    /// Starts an `environmentd` server.
351    ///
352    /// Returns a handle to the server once it is fully booted.
353    #[instrument(name = "environmentd::serve")]
354    pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
355        let serve_start = Instant::now();
356        info!("startup: envd serve: beginning");
357        info!("startup: envd serve: preamble beginning");
358
359        // Validate TLS configuration, if present.
360        let tls_reloading_context = match config.tls {
361            Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
362            None => None,
363        };
364
365        let active_connection_counter = ConnectionCounter::default();
366        let (deployment_state, deployment_state_handle) = DeploymentState::new();
367
368        // Launch HTTP servers.
369        //
370        // We start these servers before we've completed initialization so that
371        // metrics are accessible during initialization. Some HTTP
372        // endpoints require the adapter to be initialized; requests to those
373        // endpoints block until the adapter client is installed.
374        // One of these endpoints is /api/readyz,
375        // which assumes we're ready when the adapter client exists.
376        let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
377        let internal_route_config = Arc::new(InternalRouteConfig {
378            deployment_state_handle,
379            internal_console_redirect_url: config.internal_console_redirect_url,
380        });
381
382        let (authenticator_oidc_tx, authenticator_oidc_rx) = oneshot::channel();
383        let authenticator_oidc_rx = authenticator_oidc_rx.shared();
384        let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
385        let adapter_client_rx = adapter_client_rx.shared();
386
387        let metrics_registry = config.metrics_registry.clone();
388        let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
389        let mcp_metrics = http::mcp_metrics::McpMetrics::register_into(&metrics_registry);
390        let oauth_metadata_metrics =
391            http::oauth_metadata::OauthMetadataMetrics::register_into(&metrics_registry);
392        let mut http_listener_handles = BTreeMap::new();
393        for (name, listener) in self.http {
394            let authenticator_kind = listener.config.authenticator_kind();
395            let source: &'static str = Box::leak(name.clone().into_boxed_str());
396            let tls = if listener.config.enable_tls() {
397                tls_reloading_context.clone()
398            } else {
399                None
400            };
401            let http_config = HttpConfig {
402                adapter_client_rx: adapter_client_rx.clone(),
403                active_connection_counter: active_connection_counter.clone(),
404                helm_chart_version: config.helm_chart_version.clone(),
405                http_host_name: config.http_host_name.clone(),
406                frontegg_oauth_issuer_url: config.frontegg_oauth_issuer_url.clone(),
407                source,
408                tls,
409                authenticator_kind,
410                frontegg: config.frontegg.clone(),
411                oidc_rx: authenticator_oidc_rx.clone(),
412                allowed_origin: config.cors_allowed_origin.clone(),
413                allowed_origin_list: config.cors_allowed_origin_list.clone(),
414                concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
415                metrics: metrics.clone(),
416                metrics_registry: metrics_registry.clone(),
417                mcp_metrics: mcp_metrics.clone(),
418                oauth_metadata_metrics: oauth_metadata_metrics.clone(),
419                allowed_roles: listener.config.allowed_roles(),
420                internal_route_config: Arc::clone(&internal_route_config),
421                routes_enabled: listener.config.routes.clone(),
422                replica_http_locator: Arc::clone(&config.controller.replica_http_locator),
423            };
424            http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
425        }
426
427        info!(
428            "startup: envd serve: preamble complete in {:?}",
429            serve_start.elapsed()
430        );
431
432        let catalog_init_start = Instant::now();
433        info!("startup: envd serve: catalog init beginning");
434
435        // Get the current timestamp so we can record when we booted.
436        let boot_ts = (config.now)().into();
437
438        let persist_client = config
439            .catalog_config
440            .persist_clients
441            .open(config.controller.persist_location.clone())
442            .await
443            .context("opening persist client")?;
444        let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
445            persist_client.clone(),
446            config.environment_id.organization_id(),
447            BUILD_INFO.semver_version(),
448            Some(config.controller.deploy_generation),
449            Arc::clone(&config.catalog_config.metrics),
450        )
451        .await?;
452
453        info!(
454            "startup: envd serve: catalog init complete in {:?}",
455            catalog_init_start.elapsed()
456        );
457
458        let system_param_sync_start = Instant::now();
459        info!("startup: envd serve: system parameter sync beginning");
460        // Initialize the system parameter frontend
461        let system_parameter_sync_config =
462            match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
463                (None, None) => None,
464                (None, Some(f)) => {
465                    info!("Using config file path {:?}", f);
466                    Some(SystemParameterSyncConfig::new(
467                        config.environment_id.clone(),
468                        &BUILD_INFO,
469                        &config.metrics_registry,
470                        config.launchdarkly_key_map,
471                        SystemParameterSyncClientConfig::File { path: f },
472                    ))
473                }
474                (Some(key), None) => Some(SystemParameterSyncConfig::new(
475                    config.environment_id.clone(),
476                    &BUILD_INFO,
477                    &config.metrics_registry,
478                    config.launchdarkly_key_map,
479                    SystemParameterSyncClientConfig::LaunchDarkly {
480                        sdk_key: key,
481                        now_fn: config.now.clone(),
482                    },
483                )),
484
485                (Some(_), Some(_)) => {
486                    panic!("Cannot configure both file and Launchdarkly based config syncing")
487                }
488            };
489
490        let remote_system_parameters = load_remote_system_parameters(
491            &mut openable_adapter_storage,
492            system_parameter_sync_config.clone(),
493            config.config_sync_timeout,
494        )
495        .await?;
496        info!(
497            "startup: envd serve: system parameter sync complete in {:?}",
498            system_param_sync_start.elapsed()
499        );
500
501        let preflight_checks_start = Instant::now();
502        info!("startup: envd serve: preflight checks beginning");
503
504        // Determine the maximum wait time when doing a 0dt deployment.
505        let with_0dt_deployment_max_wait = {
506            let cli_default = config
507                .system_parameter_defaults
508                .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
509                .map(|x| {
510                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
511                        anyhow!(
512                            "failed to parse default for {}: {:?}",
513                            WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
514                            err
515                        )
516                    })
517                })
518                .transpose()?;
519            let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
520            let ld = get_ld_value(
521                WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
522                &remote_system_parameters,
523                |x| {
524                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
525                        format!(
526                            "failed to parse LD value {} for {}: {:?}",
527                            x,
528                            WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
529                            err
530                        )
531                    })
532                },
533            )?;
534            let catalog = openable_adapter_storage
535                .get_0dt_deployment_max_wait()
536                .await?;
537            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
538            info!(
539                ?computed,
540                ?ld,
541                ?catalog,
542                ?cli_default,
543                ?compiled_default,
544                "determined value for {} system parameter",
545                WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
546            );
547            computed
548        };
549        // Determine the DDL check interval when doing a 0dt deployment.
550        let with_0dt_deployment_ddl_check_interval = {
551            let cli_default = config
552                .system_parameter_defaults
553                .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
554                .map(|x| {
555                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
556                        anyhow!(
557                            "failed to parse default for {}: {:?}",
558                            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
559                            err
560                        )
561                    })
562                })
563                .transpose()?;
564            let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
565            let ld = get_ld_value(
566                WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
567                &remote_system_parameters,
568                |x| {
569                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
570                        format!(
571                            "failed to parse LD value {} for {}: {:?}",
572                            x,
573                            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
574                            err
575                        )
576                    })
577                },
578            )?;
579            let catalog = openable_adapter_storage
580                .get_0dt_deployment_ddl_check_interval()
581                .await?;
582            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
583            info!(
584                ?computed,
585                ?ld,
586                ?catalog,
587                ?cli_default,
588                ?compiled_default,
589                "determined value for {} system parameter",
590                WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
591            );
592            computed
593        };
594
595        // Determine whether we should panic if we reach the maximum wait time
596        // without the preflight checks succeeding.
597        let enable_0dt_deployment_panic_after_timeout = {
598            let cli_default = config
599                .system_parameter_defaults
600                .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
601                .map(|x| {
602                    strconv::parse_bool(x).map_err(|err| {
603                        anyhow!(
604                            "failed to parse default for {}: {}",
605                            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
606                            err
607                        )
608                    })
609                })
610                .transpose()?;
611            let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
612            let ld = get_ld_value(
613                "enable_0dt_deployment_panic_after_timeout",
614                &remote_system_parameters,
615                |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
616            )?;
617            let catalog = openable_adapter_storage
618                .get_enable_0dt_deployment_panic_after_timeout()
619                .await?;
620            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
621            info!(
622                %computed,
623                ?ld,
624                ?catalog,
625                ?cli_default,
626                ?compiled_default,
627                "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
628            );
629            computed
630        };
631
632        // Perform preflight checks.
633        //
634        // Preflight checks determine whether to boot in read-only mode or not.
635        let bootstrap_args = BootstrapArgs {
636            default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
637            default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
638            bootstrap_role: config.bootstrap_role.clone(),
639            cluster_replica_size_map: config.cluster_replica_sizes.clone(),
640        };
641        let preflight_config = PreflightInput {
642            boot_ts,
643            environment_id: config.environment_id.clone(),
644            persist_client,
645            deploy_generation: config.controller.deploy_generation,
646            deployment_state: deployment_state.clone(),
647            openable_adapter_storage,
648            catalog_metrics: Arc::clone(&config.catalog_config.metrics),
649            caught_up_max_wait: with_0dt_deployment_max_wait,
650            panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
651            bootstrap_args,
652            ddl_check_interval: with_0dt_deployment_ddl_check_interval,
653        };
654        let PreflightOutput {
655            openable_adapter_storage,
656            read_only,
657            caught_up_trigger,
658        } = deployment::preflight::preflight_0dt(preflight_config).await?;
659
660        info!(
661            "startup: envd serve: preflight checks complete in {:?}",
662            preflight_checks_start.elapsed()
663        );
664
665        let catalog_open_start = Instant::now();
666        info!("startup: envd serve: durable catalog open beginning");
667
668        let bootstrap_args = BootstrapArgs {
669            default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
670            default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
671            bootstrap_role: config.bootstrap_role,
672            cluster_replica_size_map: config.cluster_replica_sizes.clone(),
673        };
674
675        // Load the adapter durable storage.
676        let (adapter_storage, audit_logs_iterator) = if read_only {
677            // TODO: behavior of migrations when booting in savepoint mode is
678            // not well defined.
679            let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
680                .open_savepoint(boot_ts, &bootstrap_args)
681                .await?;
682            // In read-only mode, we intentionally do not call `set_is_leader`,
683            // because we are by definition not the leader if we are in
684            // read-only mode.
685
686            (adapter_storage, audit_logs_iterator)
687        } else {
688            let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
689                .open(boot_ts, &bootstrap_args)
690                .await?;
691
692            // Once we have successfully opened the adapter storage in
693            // read/write mode, we can announce we are the leader, as we've
694            // fenced out all other environments using the adapter storage.
695            deployment_state.set_is_leader();
696
697            (adapter_storage, audit_logs_iterator)
698        };
699
700        // Enable Persist compaction if we're not in read only.
701        if !read_only {
702            config.controller.persist_clients.cfg().enable_compaction();
703        }
704
705        info!(
706            "startup: envd serve: durable catalog open complete in {:?}",
707            catalog_open_start.elapsed()
708        );
709
710        let coord_init_start = Instant::now();
711        info!("startup: envd serve: coordinator init beginning");
712
713        if !config
714            .cluster_replica_sizes
715            .0
716            .contains_key(&config.bootstrap_default_cluster_replica_size)
717        {
718            return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
719        }
720        let envd_epoch = adapter_storage.epoch();
721
722        // Initialize storage usage client.
723        let storage_usage_client = StorageUsageClient::open(
724            config
725                .controller
726                .persist_clients
727                .open(config.controller.persist_location.clone())
728                .await
729                .context("opening storage usage client")?,
730        );
731
732        // Initialize adapter.
733        let segment_client = config.segment_api_key.map(|api_key| {
734            mz_segment::Client::new(mz_segment::Config {
735                api_key,
736                client_side: config.segment_client_side,
737            })
738        });
739        let connection_limiter = active_connection_counter.clone();
740        let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
741            connection_limiter.update_limit(limit);
742            connection_limiter.update_superuser_reserved(superuser_reserved);
743        });
744
745        let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
746            connection_context: config.controller.connection_context.clone(),
747            connection_limit_callback,
748            controller_config: config.controller,
749            controller_envd_epoch: envd_epoch,
750            storage: adapter_storage,
751            audit_logs_iterator,
752            timestamp_oracle_url: config.timestamp_oracle_url,
753            unsafe_mode: config.unsafe_mode,
754            all_features: config.all_features,
755            build_info: &BUILD_INFO,
756            environment_id: config.environment_id.clone(),
757            metrics_registry: config.metrics_registry.clone(),
758            now: config.now,
759            secrets_controller: config.secrets_controller,
760            cloud_resource_controller: config.cloud_resource_controller,
761            cluster_replica_sizes: config.cluster_replica_sizes,
762            builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
763            builtin_catalog_server_cluster_config: config
764                .bootstrap_builtin_catalog_server_cluster_config,
765            builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
766            builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
767            builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
768            availability_zones: config.availability_zones,
769            system_parameter_defaults: config.system_parameter_defaults,
770            storage_usage_client,
771            storage_usage_collection_interval: config.storage_usage_collection_interval,
772            storage_usage_retention_period: config.storage_usage_retention_period,
773            segment_client: segment_client.clone(),
774            egress_addresses: config.egress_addresses,
775            remote_system_parameters,
776            aws_account_id: config.aws_account_id,
777            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
778            webhook_concurrency_limit: webhook_concurrency_limit.clone(),
779            http_host_name: config.http_host_name,
780            tracing_handle: config.tracing_handle,
781            read_only_controllers: read_only,
782            caught_up_trigger,
783            helm_chart_version: config.helm_chart_version.clone(),
784            license_key: config.license_key,
785            external_login_password_mz_system: config.external_login_password_mz_system,
786            force_builtin_schema_migration: config.force_builtin_schema_migration,
787        })
788        .instrument(info_span!("adapter::serve"))
789        .await?;
790
791        // Initialize the OIDC authenticator, shared between the HTTP and SQL servers.
792        let oidc = GenericOidcAuthenticator::new(adapter_client.clone());
793
794        info!(
795            "startup: envd serve: coordinator init complete in {:?}",
796            coord_init_start.elapsed()
797        );
798
799        let serve_postamble_start = Instant::now();
800        info!("startup: envd serve: postamble beginning");
801
802        // Send adapter client and OIDC authenticator to the HTTP servers.
803        authenticator_oidc_tx
804            .send(oidc.clone())
805            .expect("rx known to be live");
806        adapter_client_tx
807            .send(adapter_client.clone())
808            .expect("internal HTTP server should not drop first");
809
810        let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
811
812        // Launch SQL server.
813        let mut sql_listener_handles = BTreeMap::new();
814        for (name, listener) in self.sql {
815            sql_listener_handles.insert(
816                name.clone(),
817                listener
818                    .serve_sql(
819                        name,
820                        active_connection_counter.clone(),
821                        tls_reloading_context.clone(),
822                        config.frontegg.clone(),
823                        adapter_client.clone(),
824                        oidc.clone(),
825                        metrics.clone(),
826                        config.helm_chart_version.clone(),
827                    )
828                    .await,
829            );
830        }
831
832        // Start telemetry reporting loop.
833        if let Some(segment_client) = segment_client {
834            telemetry::start_reporting(telemetry::Config {
835                segment_client,
836                adapter_client: adapter_client.clone(),
837                environment_id: config.environment_id,
838                report_interval: Duration::from_secs(3600),
839            });
840        } else if config.test_only_dummy_segment_client {
841            // We only have access to a segment client in production but we
842            // still want to exercise the telemetry reporting code to a degree.
843            // So we create a dummy client and report telemetry into the void.
844            // This way we at least run the telemetry queries the way a
845            // production environment would.
846            tracing::debug!("starting telemetry reporting with a dummy segment client");
847            let segment_client = mz_segment::Client::new_dummy_client();
848            telemetry::start_reporting(telemetry::Config {
849                segment_client,
850                adapter_client: adapter_client.clone(),
851                environment_id: config.environment_id,
852                report_interval: Duration::from_secs(180),
853            });
854        }
855
856        // If system_parameter_sync_config and config_sync_loop_interval are present,
857        // start the system_parameter_sync loop.
858        if let Some(system_parameter_sync_config) = system_parameter_sync_config {
859            task::spawn(
860                || "system_parameter_sync",
861                AssertUnwindSafe(system_parameter_sync(
862                    system_parameter_sync_config,
863                    adapter_client.clone(),
864                    config.config_sync_loop_interval,
865                ))
866                .ore_catch_unwind(),
867            );
868        }
869
870        info!(
871            "startup: envd serve: postamble complete in {:?}",
872            serve_postamble_start.elapsed()
873        );
874        info!(
875            "startup: envd serve: complete in {:?}",
876            serve_start.elapsed()
877        );
878
879        Ok(Server {
880            sql_listener_handles,
881            http_listener_handles,
882            _adapter_handle: adapter_handle,
883        })
884    }
885}
886
887fn get_ld_value<V>(
888    name: &str,
889    remote_system_parameters: &Option<BTreeMap<String, String>>,
890    parse: impl Fn(&str) -> Result<V, String>,
891) -> Result<Option<V>, anyhow::Error> {
892    remote_system_parameters
893        .as_ref()
894        .and_then(|params| params.get(name))
895        .map(|x| {
896            parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
897        })
898        .transpose()
899}
900
901/// A running `environmentd` server.
902pub struct Server {
903    // Drop order matters for these fields.
904    pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
905    pub http_listener_handles: BTreeMap<String, ListenerHandle>,
906    _adapter_handle: mz_adapter::Handle,
907}