Skip to main content

mz_environmentd/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A SQL stream processor built on top of [timely dataflow] and
11//! [differential dataflow].
12//!
13//! [differential dataflow]: ../differential_dataflow/index.html
14//! [timely dataflow]: ../timely/index.html
15
16use ::http::HeaderValue;
17use std::collections::BTreeMap;
18use std::panic::AssertUnwindSafe;
19use std::path::PathBuf;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use std::{env, io};
24
25use anyhow::{Context, anyhow};
26use derivative::Derivative;
27use futures::FutureExt;
28use ipnet::IpNet;
29use mz_adapter::config::{
30    SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
31};
32use mz_adapter::webhook::WebhookConcurrencyLimiter;
33use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
34use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
35use mz_adapter_types::dyncfgs::{
36    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
37    WITH_0DT_DEPLOYMENT_MAX_WAIT,
38};
39use mz_auth::password::Password;
40use mz_authenticator::GenericOidcAuthenticator;
41use mz_build_info::{BuildInfo, build_info};
42use mz_catalog::config::ClusterReplicaSizeMap;
43use mz_catalog::durable::BootstrapArgs;
44use mz_cloud_resources::CloudResourceController;
45use mz_controller::ControllerConfig;
46use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
47use mz_license_keys::ValidatedLicenseKey;
48use mz_ore::future::OreFutureExt;
49use mz_ore::metrics::MetricsRegistry;
50use mz_ore::now::NowFn;
51use mz_ore::tracing::TracingHandle;
52use mz_ore::url::SensitiveUrl;
53use mz_ore::{instrument, task};
54use mz_persist_client::cache::PersistClientCache;
55use mz_persist_client::usage::StorageUsageClient;
56use mz_pgwire::MetricsConfig;
57use mz_pgwire_common::ConnectionCounter;
58use mz_repr::strconv;
59use mz_secrets::SecretsController;
60use mz_server_core::listeners::{
61    HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
62};
63use mz_server_core::{
64    ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
65    TlsCertConfig, TlsMode,
66};
67use mz_sql::catalog::EnvironmentId;
68use mz_sql::session::vars::{Value, VarInput};
69use tokio::sync::oneshot;
70use tower_http::cors::AllowOrigin;
71use tracing::{Instrument, info, info_span};
72
73use crate::deployment::preflight::{PreflightInput, PreflightOutput};
74use crate::deployment::state::DeploymentState;
75use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
76
77pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
78
79mod deployment;
80pub mod environmentd;
81pub mod http;
82mod telemetry;
83#[cfg(feature = "test")]
84pub mod test_util;
85
86pub const BUILD_INFO: BuildInfo = build_info!();
87
88/// Configuration for an `environmentd` server.
89#[derive(Derivative)]
90#[derivative(Debug)]
91pub struct Config {
92    // === Special modes. ===
93    /// Whether to permit usage of unsafe features. This is never meant to run
94    /// in production.
95    pub unsafe_mode: bool,
96    /// Whether the environmentd is running on a local dev machine. This is
97    /// never meant to run in production or CI.
98    pub all_features: bool,
99
100    // === Connection options. ===
101    /// TLS encryption and authentication configuration.
102    pub tls: Option<TlsCertConfig>,
103    /// Trigger to attempt to reload TLS certififcates.
104    #[derivative(Debug = "ignore")]
105    pub tls_reload_certs: ReloadTrigger,
106    /// Password of the mz_system user.
107    pub external_login_password_mz_system: Option<Password>,
108    /// Frontegg JWT authenticator.
109    pub frontegg: Option<FronteggAuthenticator>,
110    /// Origins for which cross-origin resource sharing (CORS) for HTTP requests
111    /// is permitted.
112    pub cors_allowed_origin: AllowOrigin,
113    /// Raw list of allowed CORS origins. Retained alongside `cors_allowed_origin`
114    /// (which is the computed predicate) so that endpoints like MCP can perform
115    /// server-side Origin validation to defend against DNS rebinding attacks
116    /// (where same-origin requests bypass CORS enforcement).
117    pub cors_allowed_origin_list: Vec<HeaderValue>,
118    /// Public IP addresses which the cloud environment has configured for
119    /// egress.
120    pub egress_addresses: Vec<IpNet>,
121    /// The external host name to connect to the HTTP server of this
122    /// environment.
123    ///
124    /// Presently used to render webhook URLs for end users in notices and the
125    /// system catalog. Not used to establish connections directly.
126    pub http_host_name: Option<String>,
127    /// The URL of the Materialize console to proxy from the /internal-console
128    /// endpoint on the internal HTTP server.
129    pub internal_console_redirect_url: Option<String>,
130
131    // === Controller options. ===
132    /// Storage and compute controller configuration.
133    pub controller: ControllerConfig,
134    /// Secrets controller configuration.
135    pub secrets_controller: Arc<dyn SecretsController>,
136    /// VpcEndpoint controller configuration.
137    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
138
139    // === Storage options. ===
140    /// The interval at which to collect storage usage information.
141    pub storage_usage_collection_interval: Duration,
142    /// How long to retain storage usage records for.
143    pub storage_usage_retention_period: Option<Duration>,
144
145    // === Adapter options. ===
146    /// Catalog configuration.
147    pub catalog_config: CatalogConfig,
148    /// Availability zones in which storage and compute resources may be
149    /// deployed.
150    pub availability_zones: Vec<String>,
151    /// A map from size name to resource allocations for cluster replicas.
152    pub cluster_replica_sizes: ClusterReplicaSizeMap,
153    /// The PostgreSQL URL for the Postgres-backed timestamp oracle.
154    pub timestamp_oracle_url: Option<SensitiveUrl>,
155    /// An API key for Segment. Enables export of audit events to Segment.
156    pub segment_api_key: Option<String>,
157    /// Whether the Segment client is being used on the client side
158    /// (rather than the server side).
159    pub segment_client_side: bool,
160    /// Only create a dummy segment client, only to get more testing coverage.
161    pub test_only_dummy_segment_client: bool,
162    /// An SDK key for LaunchDarkly. Enables system parameter synchronization
163    /// with LaunchDarkly.
164    pub launchdarkly_sdk_key: Option<String>,
165    /// An invertible map from system parameter names to LaunchDarkly feature
166    /// keys to use when propagating values from the latter to the former.
167    pub launchdarkly_key_map: BTreeMap<String, String>,
168    /// The duration at which the system parameter synchronization times out during startup.
169    pub config_sync_timeout: Duration,
170    /// The interval in seconds at which to synchronize system parameter values.
171    pub config_sync_loop_interval: Option<Duration>,
172    /// The path for file based config sync
173    pub config_sync_file_path: Option<PathBuf>,
174
175    // === Bootstrap options. ===
176    /// The cloud ID of this environment.
177    pub environment_id: EnvironmentId,
178    /// What role, if any, should be initially created with elevated privileges.
179    pub bootstrap_role: Option<String>,
180    /// The size of the default cluster replica if bootstrapping.
181    pub bootstrap_default_cluster_replica_size: String,
182    /// The default number of replicas if bootstrapping.
183    pub bootstrap_default_cluster_replication_factor: u32,
184    /// The config of the builtin system cluster replicas if bootstrapping.
185    pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
186    /// The config of the builtin catalog server cluster replicas if bootstrapping.
187    pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
188    /// The config of the builtin probe cluster replicas if bootstrapping.
189    pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
190    /// The config of the builtin support cluster replicas if bootstrapping.
191    pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
192    /// The config of the builtin analytics cluster replicas if bootstrapping.
193    pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
194    /// Values to set for system parameters, if those system parameters have not
195    /// already been set by the system user.
196    pub system_parameter_defaults: BTreeMap<String, String>,
197    /// Helm chart version
198    pub helm_chart_version: Option<String>,
199    /// Configuration managed by license keys
200    pub license_key: ValidatedLicenseKey,
201
202    // === AWS options. ===
203    /// The AWS account ID, which will be used to generate ARNs for
204    /// Materialize-controlled AWS resources.
205    pub aws_account_id: Option<String>,
206    /// Supported AWS PrivateLink availability zone ids.
207    pub aws_privatelink_availability_zones: Option<Vec<String>>,
208
209    // === Observability options. ===
210    /// The metrics registry to use.
211    pub metrics_registry: MetricsRegistry,
212    /// Handle to tracing.
213    pub tracing_handle: TracingHandle,
214
215    // === Testing options. ===
216    /// A now generation function for mocking time.
217    pub now: NowFn,
218    /// If `Some`, force running builtin schema migration using the specified
219    /// migration mechanism ("evolution" or "replacement").
220    pub force_builtin_schema_migration: Option<String>,
221}
222
223/// Configuration for the Catalog.
224#[derive(Debug, Clone)]
225pub struct CatalogConfig {
226    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
227    pub persist_clients: Arc<PersistClientCache>,
228    /// Persist catalog metrics.
229    pub metrics: Arc<mz_catalog::durable::Metrics>,
230}
231
232pub struct Listener<C> {
233    pub handle: ListenerHandle,
234    connection_stream: Pin<Box<dyn ConnectionStream>>,
235    config: C,
236}
237impl<C> Listener<C>
238where
239    C: ListenerConfig,
240{
241    /// Initializes network listeners for a later call to `serve` at the
242    /// specified addresses.
243    ///
244    /// Splitting this function out from `serve` has two benefits:
245    ///
246    ///   * It initializes network listeners as early as possible, so that the OS
247    ///     will queue incoming connections while the server is booting.
248    ///
249    ///   * It allows the caller to communicate with the server via the internal
250    ///     HTTP port while it is booting.
251    ///
252    async fn bind(config: C) -> Result<Self, io::Error> {
253        let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
254        Ok(Self {
255            handle,
256            connection_stream,
257            config,
258        })
259    }
260}
261
262impl Listener<SqlListenerConfig> {
263    #[instrument(name = "environmentd::serve_sql")]
264    pub async fn serve_sql(
265        self,
266        name: String,
267        active_connection_counter: ConnectionCounter,
268        tls_reloading_context: Option<ReloadingSslContext>,
269        frontegg: Option<FronteggAuthenticator>,
270        adapter_client: AdapterClient,
271        oidc: GenericOidcAuthenticator,
272        metrics: MetricsConfig,
273        helm_chart_version: Option<String>,
274    ) -> ListenerHandle {
275        let label: &'static str = Box::leak(name.into_boxed_str());
276        let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
277            context,
278            mode: if self.config.enable_tls {
279                TlsMode::Require
280            } else {
281                TlsMode::Allow
282            },
283        });
284
285        task::spawn(|| format!("{}_sql_server", label), {
286            let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
287                label,
288                tls,
289                adapter_client,
290                authenticator_kind: self.config.authenticator_kind,
291                frontegg,
292                oidc,
293                metrics,
294                active_connection_counter,
295                helm_chart_version,
296                allowed_roles: self.config.allowed_roles,
297            });
298            mz_server_core::serve(ServeConfig {
299                conns: self.connection_stream,
300                server: sql_server,
301                // `environmentd` does not currently need to dynamically
302                // configure graceful termination behavior.
303                dyncfg: None,
304            })
305        });
306        self.handle
307    }
308}
309
310impl Listener<HttpListenerConfig> {
311    #[instrument(name = "environmentd::serve_http")]
312    pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
313        let task_name = format!("{}_http_server", &config.source);
314        task::spawn(|| task_name, {
315            let http_server = HttpServer::new(config);
316            mz_server_core::serve(ServeConfig {
317                conns: self.connection_stream,
318                server: http_server,
319                // `environmentd` does not currently need to dynamically
320                // configure graceful termination behavior.
321                dyncfg: None,
322            })
323        });
324        self.handle
325    }
326}
327
328pub struct Listeners {
329    pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
330    pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
331}
332
333impl Listeners {
334    pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
335        let mut sql = BTreeMap::new();
336        for (name, config) in config.sql {
337            sql.insert(name, Listener::bind(config).await?);
338        }
339
340        let mut http = BTreeMap::new();
341        for (name, config) in config.http {
342            http.insert(name, Listener::bind(config).await?);
343        }
344
345        Ok(Listeners { http, sql })
346    }
347
348    /// Starts an `environmentd` server.
349    ///
350    /// Returns a handle to the server once it is fully booted.
351    #[instrument(name = "environmentd::serve")]
352    pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
353        let serve_start = Instant::now();
354        info!("startup: envd serve: beginning");
355        info!("startup: envd serve: preamble beginning");
356
357        // Validate TLS configuration, if present.
358        let tls_reloading_context = match config.tls {
359            Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
360            None => None,
361        };
362
363        let active_connection_counter = ConnectionCounter::default();
364        let (deployment_state, deployment_state_handle) = DeploymentState::new();
365
366        // Launch HTTP servers.
367        //
368        // We start these servers before we've completed initialization so that
369        // metrics are accessible during initialization. Some HTTP
370        // endpoints require the adapter to be initialized; requests to those
371        // endpoints block until the adapter client is installed.
372        // One of these endpoints is /api/readyz,
373        // which assumes we're ready when the adapter client exists.
374        let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
375        let internal_route_config = Arc::new(InternalRouteConfig {
376            deployment_state_handle,
377            internal_console_redirect_url: config.internal_console_redirect_url,
378        });
379
380        let (authenticator_oidc_tx, authenticator_oidc_rx) = oneshot::channel();
381        let authenticator_oidc_rx = authenticator_oidc_rx.shared();
382        let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
383        let adapter_client_rx = adapter_client_rx.shared();
384
385        let metrics_registry = config.metrics_registry.clone();
386        let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
387        let mut http_listener_handles = BTreeMap::new();
388        for (name, listener) in self.http {
389            let authenticator_kind = listener.config.authenticator_kind();
390            let source: &'static str = Box::leak(name.clone().into_boxed_str());
391            let tls = if listener.config.enable_tls() {
392                tls_reloading_context.clone()
393            } else {
394                None
395            };
396            let http_config = HttpConfig {
397                adapter_client_rx: adapter_client_rx.clone(),
398                active_connection_counter: active_connection_counter.clone(),
399                helm_chart_version: config.helm_chart_version.clone(),
400                source,
401                tls,
402                authenticator_kind,
403                frontegg: config.frontegg.clone(),
404                oidc_rx: authenticator_oidc_rx.clone(),
405                allowed_origin: config.cors_allowed_origin.clone(),
406                allowed_origin_list: config.cors_allowed_origin_list.clone(),
407                concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
408                metrics: metrics.clone(),
409                metrics_registry: metrics_registry.clone(),
410                allowed_roles: listener.config.allowed_roles(),
411                internal_route_config: Arc::clone(&internal_route_config),
412                routes_enabled: listener.config.routes.clone(),
413                replica_http_locator: Arc::clone(&config.controller.replica_http_locator),
414            };
415            http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
416        }
417
418        info!(
419            "startup: envd serve: preamble complete in {:?}",
420            serve_start.elapsed()
421        );
422
423        let catalog_init_start = Instant::now();
424        info!("startup: envd serve: catalog init beginning");
425
426        // Get the current timestamp so we can record when we booted.
427        let boot_ts = (config.now)().into();
428
429        let persist_client = config
430            .catalog_config
431            .persist_clients
432            .open(config.controller.persist_location.clone())
433            .await
434            .context("opening persist client")?;
435        let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
436            persist_client.clone(),
437            config.environment_id.organization_id(),
438            BUILD_INFO.semver_version(),
439            Some(config.controller.deploy_generation),
440            Arc::clone(&config.catalog_config.metrics),
441        )
442        .await?;
443
444        info!(
445            "startup: envd serve: catalog init complete in {:?}",
446            catalog_init_start.elapsed()
447        );
448
449        let system_param_sync_start = Instant::now();
450        info!("startup: envd serve: system parameter sync beginning");
451        // Initialize the system parameter frontend
452        let system_parameter_sync_config =
453            match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
454                (None, None) => None,
455                (None, Some(f)) => {
456                    info!("Using config file path {:?}", f);
457                    Some(SystemParameterSyncConfig::new(
458                        config.environment_id.clone(),
459                        &BUILD_INFO,
460                        &config.metrics_registry,
461                        config.launchdarkly_key_map,
462                        SystemParameterSyncClientConfig::File { path: f },
463                    ))
464                }
465                (Some(key), None) => Some(SystemParameterSyncConfig::new(
466                    config.environment_id.clone(),
467                    &BUILD_INFO,
468                    &config.metrics_registry,
469                    config.launchdarkly_key_map,
470                    SystemParameterSyncClientConfig::LaunchDarkly {
471                        sdk_key: key,
472                        now_fn: config.now.clone(),
473                    },
474                )),
475
476                (Some(_), Some(_)) => {
477                    panic!("Cannot configure both file and Launchdarkly based config syncing")
478                }
479            };
480
481        let remote_system_parameters = load_remote_system_parameters(
482            &mut openable_adapter_storage,
483            system_parameter_sync_config.clone(),
484            config.config_sync_timeout,
485        )
486        .await?;
487        info!(
488            "startup: envd serve: system parameter sync complete in {:?}",
489            system_param_sync_start.elapsed()
490        );
491
492        let preflight_checks_start = Instant::now();
493        info!("startup: envd serve: preflight checks beginning");
494
495        // Determine the maximum wait time when doing a 0dt deployment.
496        let with_0dt_deployment_max_wait = {
497            let cli_default = config
498                .system_parameter_defaults
499                .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
500                .map(|x| {
501                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
502                        anyhow!(
503                            "failed to parse default for {}: {:?}",
504                            WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
505                            err
506                        )
507                    })
508                })
509                .transpose()?;
510            let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
511            let ld = get_ld_value(
512                WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
513                &remote_system_parameters,
514                |x| {
515                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
516                        format!(
517                            "failed to parse LD value {} for {}: {:?}",
518                            x,
519                            WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
520                            err
521                        )
522                    })
523                },
524            )?;
525            let catalog = openable_adapter_storage
526                .get_0dt_deployment_max_wait()
527                .await?;
528            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
529            info!(
530                ?computed,
531                ?ld,
532                ?catalog,
533                ?cli_default,
534                ?compiled_default,
535                "determined value for {} system parameter",
536                WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
537            );
538            computed
539        };
540        // Determine the DDL check interval when doing a 0dt deployment.
541        let with_0dt_deployment_ddl_check_interval = {
542            let cli_default = config
543                .system_parameter_defaults
544                .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
545                .map(|x| {
546                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
547                        anyhow!(
548                            "failed to parse default for {}: {:?}",
549                            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
550                            err
551                        )
552                    })
553                })
554                .transpose()?;
555            let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
556            let ld = get_ld_value(
557                WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
558                &remote_system_parameters,
559                |x| {
560                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
561                        format!(
562                            "failed to parse LD value {} for {}: {:?}",
563                            x,
564                            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
565                            err
566                        )
567                    })
568                },
569            )?;
570            let catalog = openable_adapter_storage
571                .get_0dt_deployment_ddl_check_interval()
572                .await?;
573            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
574            info!(
575                ?computed,
576                ?ld,
577                ?catalog,
578                ?cli_default,
579                ?compiled_default,
580                "determined value for {} system parameter",
581                WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
582            );
583            computed
584        };
585
586        // Determine whether we should panic if we reach the maximum wait time
587        // without the preflight checks succeeding.
588        let enable_0dt_deployment_panic_after_timeout = {
589            let cli_default = config
590                .system_parameter_defaults
591                .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
592                .map(|x| {
593                    strconv::parse_bool(x).map_err(|err| {
594                        anyhow!(
595                            "failed to parse default for {}: {}",
596                            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
597                            err
598                        )
599                    })
600                })
601                .transpose()?;
602            let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
603            let ld = get_ld_value(
604                "enable_0dt_deployment_panic_after_timeout",
605                &remote_system_parameters,
606                |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
607            )?;
608            let catalog = openable_adapter_storage
609                .get_enable_0dt_deployment_panic_after_timeout()
610                .await?;
611            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
612            info!(
613                %computed,
614                ?ld,
615                ?catalog,
616                ?cli_default,
617                ?compiled_default,
618                "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
619            );
620            computed
621        };
622
623        // Perform preflight checks.
624        //
625        // Preflight checks determine whether to boot in read-only mode or not.
626        let bootstrap_args = BootstrapArgs {
627            default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
628            default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
629            bootstrap_role: config.bootstrap_role.clone(),
630            cluster_replica_size_map: config.cluster_replica_sizes.clone(),
631        };
632        let preflight_config = PreflightInput {
633            boot_ts,
634            environment_id: config.environment_id.clone(),
635            persist_client,
636            deploy_generation: config.controller.deploy_generation,
637            deployment_state: deployment_state.clone(),
638            openable_adapter_storage,
639            catalog_metrics: Arc::clone(&config.catalog_config.metrics),
640            caught_up_max_wait: with_0dt_deployment_max_wait,
641            panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
642            bootstrap_args,
643            ddl_check_interval: with_0dt_deployment_ddl_check_interval,
644        };
645        let PreflightOutput {
646            openable_adapter_storage,
647            read_only,
648            caught_up_trigger,
649        } = deployment::preflight::preflight_0dt(preflight_config).await?;
650
651        info!(
652            "startup: envd serve: preflight checks complete in {:?}",
653            preflight_checks_start.elapsed()
654        );
655
656        let catalog_open_start = Instant::now();
657        info!("startup: envd serve: durable catalog open beginning");
658
659        let bootstrap_args = BootstrapArgs {
660            default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
661            default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
662            bootstrap_role: config.bootstrap_role,
663            cluster_replica_size_map: config.cluster_replica_sizes.clone(),
664        };
665
666        // Load the adapter durable storage.
667        let (adapter_storage, audit_logs_iterator) = if read_only {
668            // TODO: behavior of migrations when booting in savepoint mode is
669            // not well defined.
670            let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
671                .open_savepoint(boot_ts, &bootstrap_args)
672                .await?;
673            // In read-only mode, we intentionally do not call `set_is_leader`,
674            // because we are by definition not the leader if we are in
675            // read-only mode.
676
677            (adapter_storage, audit_logs_iterator)
678        } else {
679            let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
680                .open(boot_ts, &bootstrap_args)
681                .await?;
682
683            // Once we have successfully opened the adapter storage in
684            // read/write mode, we can announce we are the leader, as we've
685            // fenced out all other environments using the adapter storage.
686            deployment_state.set_is_leader();
687
688            (adapter_storage, audit_logs_iterator)
689        };
690
691        // Enable Persist compaction if we're not in read only.
692        if !read_only {
693            config.controller.persist_clients.cfg().enable_compaction();
694        }
695
696        info!(
697            "startup: envd serve: durable catalog open complete in {:?}",
698            catalog_open_start.elapsed()
699        );
700
701        let coord_init_start = Instant::now();
702        info!("startup: envd serve: coordinator init beginning");
703
704        if !config
705            .cluster_replica_sizes
706            .0
707            .contains_key(&config.bootstrap_default_cluster_replica_size)
708        {
709            return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
710        }
711        let envd_epoch = adapter_storage.epoch();
712
713        // Initialize storage usage client.
714        let storage_usage_client = StorageUsageClient::open(
715            config
716                .controller
717                .persist_clients
718                .open(config.controller.persist_location.clone())
719                .await
720                .context("opening storage usage client")?,
721        );
722
723        // Initialize adapter.
724        let segment_client = config.segment_api_key.map(|api_key| {
725            mz_segment::Client::new(mz_segment::Config {
726                api_key,
727                client_side: config.segment_client_side,
728            })
729        });
730        let connection_limiter = active_connection_counter.clone();
731        let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
732            connection_limiter.update_limit(limit);
733            connection_limiter.update_superuser_reserved(superuser_reserved);
734        });
735
736        let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
737            connection_context: config.controller.connection_context.clone(),
738            connection_limit_callback,
739            controller_config: config.controller,
740            controller_envd_epoch: envd_epoch,
741            storage: adapter_storage,
742            audit_logs_iterator,
743            timestamp_oracle_url: config.timestamp_oracle_url,
744            unsafe_mode: config.unsafe_mode,
745            all_features: config.all_features,
746            build_info: &BUILD_INFO,
747            environment_id: config.environment_id.clone(),
748            metrics_registry: config.metrics_registry.clone(),
749            now: config.now,
750            secrets_controller: config.secrets_controller,
751            cloud_resource_controller: config.cloud_resource_controller,
752            cluster_replica_sizes: config.cluster_replica_sizes,
753            builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
754            builtin_catalog_server_cluster_config: config
755                .bootstrap_builtin_catalog_server_cluster_config,
756            builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
757            builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
758            builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
759            availability_zones: config.availability_zones,
760            system_parameter_defaults: config.system_parameter_defaults,
761            storage_usage_client,
762            storage_usage_collection_interval: config.storage_usage_collection_interval,
763            storage_usage_retention_period: config.storage_usage_retention_period,
764            segment_client: segment_client.clone(),
765            egress_addresses: config.egress_addresses,
766            remote_system_parameters,
767            aws_account_id: config.aws_account_id,
768            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
769            webhook_concurrency_limit: webhook_concurrency_limit.clone(),
770            http_host_name: config.http_host_name,
771            tracing_handle: config.tracing_handle,
772            read_only_controllers: read_only,
773            caught_up_trigger,
774            helm_chart_version: config.helm_chart_version.clone(),
775            license_key: config.license_key,
776            external_login_password_mz_system: config.external_login_password_mz_system,
777            force_builtin_schema_migration: config.force_builtin_schema_migration,
778        })
779        .instrument(info_span!("adapter::serve"))
780        .await?;
781
782        // Initialize the OIDC authenticator, shared between the HTTP and SQL servers.
783        let oidc = GenericOidcAuthenticator::new(adapter_client.clone());
784
785        info!(
786            "startup: envd serve: coordinator init complete in {:?}",
787            coord_init_start.elapsed()
788        );
789
790        let serve_postamble_start = Instant::now();
791        info!("startup: envd serve: postamble beginning");
792
793        // Send adapter client and OIDC authenticator to the HTTP servers.
794        authenticator_oidc_tx
795            .send(oidc.clone())
796            .expect("rx known to be live");
797        adapter_client_tx
798            .send(adapter_client.clone())
799            .expect("internal HTTP server should not drop first");
800
801        let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
802
803        // Launch SQL server.
804        let mut sql_listener_handles = BTreeMap::new();
805        for (name, listener) in self.sql {
806            sql_listener_handles.insert(
807                name.clone(),
808                listener
809                    .serve_sql(
810                        name,
811                        active_connection_counter.clone(),
812                        tls_reloading_context.clone(),
813                        config.frontegg.clone(),
814                        adapter_client.clone(),
815                        oidc.clone(),
816                        metrics.clone(),
817                        config.helm_chart_version.clone(),
818                    )
819                    .await,
820            );
821        }
822
823        // Start telemetry reporting loop.
824        if let Some(segment_client) = segment_client {
825            telemetry::start_reporting(telemetry::Config {
826                segment_client,
827                adapter_client: adapter_client.clone(),
828                environment_id: config.environment_id,
829                report_interval: Duration::from_secs(3600),
830            });
831        } else if config.test_only_dummy_segment_client {
832            // We only have access to a segment client in production but we
833            // still want to exercise the telemetry reporting code to a degree.
834            // So we create a dummy client and report telemetry into the void.
835            // This way we at least run the telemetry queries the way a
836            // production environment would.
837            tracing::debug!("starting telemetry reporting with a dummy segment client");
838            let segment_client = mz_segment::Client::new_dummy_client();
839            telemetry::start_reporting(telemetry::Config {
840                segment_client,
841                adapter_client: adapter_client.clone(),
842                environment_id: config.environment_id,
843                report_interval: Duration::from_secs(180),
844            });
845        }
846
847        // If system_parameter_sync_config and config_sync_loop_interval are present,
848        // start the system_parameter_sync loop.
849        if let Some(system_parameter_sync_config) = system_parameter_sync_config {
850            task::spawn(
851                || "system_parameter_sync",
852                AssertUnwindSafe(system_parameter_sync(
853                    system_parameter_sync_config,
854                    adapter_client.clone(),
855                    config.config_sync_loop_interval,
856                ))
857                .ore_catch_unwind(),
858            );
859        }
860
861        info!(
862            "startup: envd serve: postamble complete in {:?}",
863            serve_postamble_start.elapsed()
864        );
865        info!(
866            "startup: envd serve: complete in {:?}",
867            serve_start.elapsed()
868        );
869
870        Ok(Server {
871            sql_listener_handles,
872            http_listener_handles,
873            _adapter_handle: adapter_handle,
874        })
875    }
876}
877
878fn get_ld_value<V>(
879    name: &str,
880    remote_system_parameters: &Option<BTreeMap<String, String>>,
881    parse: impl Fn(&str) -> Result<V, String>,
882) -> Result<Option<V>, anyhow::Error> {
883    remote_system_parameters
884        .as_ref()
885        .and_then(|params| params.get(name))
886        .map(|x| {
887            parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
888        })
889        .transpose()
890}
891
892/// A running `environmentd` server.
893pub struct Server {
894    // Drop order matters for these fields.
895    pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
896    pub http_listener_handles: BTreeMap<String, ListenerHandle>,
897    _adapter_handle: mz_adapter::Handle,
898}