Skip to main content

mz_environmentd/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A SQL stream processor built on top of [timely dataflow] and
11//! [differential dataflow].
12//!
13//! [differential dataflow]: ../differential_dataflow/index.html
14//! [timely dataflow]: ../timely/index.html
15
16use ::http::HeaderValue;
17use std::collections::BTreeMap;
18use std::panic::AssertUnwindSafe;
19use std::path::PathBuf;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use std::{env, io};
24
25use anyhow::{Context, anyhow};
26use derivative::Derivative;
27use futures::FutureExt;
28use ipnet::IpNet;
29use mz_adapter::config::{
30    SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
31};
32use mz_adapter::webhook::WebhookConcurrencyLimiter;
33use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
34use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
35use mz_adapter_types::dyncfgs::{
36    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
37    WITH_0DT_DEPLOYMENT_MAX_WAIT,
38};
39use mz_auth::password::Password;
40use mz_authenticator::GenericOidcAuthenticator;
41use mz_build_info::{BuildInfo, build_info};
42use mz_catalog::config::ClusterReplicaSizeMap;
43use mz_catalog::durable::BootstrapArgs;
44use mz_cloud_resources::CloudResourceController;
45use mz_controller::ControllerConfig;
46use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
47use mz_license_keys::ValidatedLicenseKey;
48use mz_ore::future::OreFutureExt;
49use mz_ore::metrics::MetricsRegistry;
50use mz_ore::now::NowFn;
51use mz_ore::tracing::TracingHandle;
52use mz_ore::url::SensitiveUrl;
53use mz_ore::{instrument, task};
54use mz_persist_client::cache::PersistClientCache;
55use mz_persist_client::usage::StorageUsageClient;
56use mz_pgwire::MetricsConfig;
57use mz_pgwire_common::ConnectionCounter;
58use mz_repr::strconv;
59use mz_secrets::SecretsController;
60use mz_server_core::listeners::{
61    HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
62};
63use mz_server_core::{
64    ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
65    TlsCertConfig, TlsMode,
66};
67use mz_sql::catalog::EnvironmentId;
68use mz_sql::session::vars::{Value, VarInput};
69use tokio::sync::oneshot;
70use tower_http::cors::AllowOrigin;
71use tracing::{Instrument, info, info_span};
72
73use crate::deployment::preflight::{PreflightInput, PreflightOutput};
74use crate::deployment::state::DeploymentState;
75use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
76
77pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
78
79mod deployment;
80pub mod environmentd;
81pub mod http;
82mod telemetry;
83#[cfg(feature = "test")]
84pub mod test_util;
85
86pub const BUILD_INFO: BuildInfo = build_info!();
87
88/// Configuration for an `environmentd` server.
89#[derive(Derivative)]
90#[derivative(Debug)]
91pub struct Config {
92    // === Special modes. ===
93    /// Whether to permit usage of unsafe features. This is never meant to run
94    /// in production.
95    pub unsafe_mode: bool,
96    /// Whether the environmentd is running on a local dev machine. This is
97    /// never meant to run in production or CI.
98    pub all_features: bool,
99
100    // === Connection options. ===
101    /// TLS encryption and authentication configuration.
102    pub tls: Option<TlsCertConfig>,
103    /// Trigger to attempt to reload TLS certififcates.
104    #[derivative(Debug = "ignore")]
105    pub tls_reload_certs: ReloadTrigger,
106    /// Password of the mz_system user.
107    pub external_login_password_mz_system: Option<Password>,
108    /// Frontegg JWT authenticator.
109    pub frontegg: Option<FronteggAuthenticator>,
110    /// Origins for which cross-origin resource sharing (CORS) for HTTP requests
111    /// is permitted.
112    pub cors_allowed_origin: AllowOrigin,
113    /// Raw list of allowed CORS origins. Retained alongside `cors_allowed_origin`
114    /// (which is the computed predicate) so that endpoints like MCP can perform
115    /// server-side Origin validation to defend against DNS rebinding attacks
116    /// (where same-origin requests bypass CORS enforcement).
117    pub cors_allowed_origin_list: Vec<HeaderValue>,
118    /// Public IP addresses which the cloud environment has configured for
119    /// egress.
120    pub egress_addresses: Vec<IpNet>,
121    /// The external host name to connect to the HTTP server of this
122    /// environment.
123    ///
124    /// Presently used to render webhook URLs for end users in notices and the
125    /// system catalog. Not used to establish connections directly.
126    pub http_host_name: Option<String>,
127    /// The URL of the Materialize console to proxy from the /internal-console
128    /// endpoint on the internal HTTP server.
129    pub internal_console_redirect_url: Option<String>,
130
131    // === Controller options. ===
132    /// Storage and compute controller configuration.
133    pub controller: ControllerConfig,
134    /// Secrets controller configuration.
135    pub secrets_controller: Arc<dyn SecretsController>,
136    /// VpcEndpoint controller configuration.
137    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
138
139    // === Storage options. ===
140    /// The interval at which to collect storage usage information.
141    pub storage_usage_collection_interval: Duration,
142    /// How long to retain storage usage records for.
143    pub storage_usage_retention_period: Option<Duration>,
144
145    // === Adapter options. ===
146    /// Catalog configuration.
147    pub catalog_config: CatalogConfig,
148    /// Availability zones in which storage and compute resources may be
149    /// deployed.
150    pub availability_zones: Vec<String>,
151    /// A map from size name to resource allocations for cluster replicas.
152    pub cluster_replica_sizes: ClusterReplicaSizeMap,
153    /// The PostgreSQL URL for the Postgres-backed timestamp oracle.
154    pub timestamp_oracle_url: Option<SensitiveUrl>,
155    /// An API key for Segment. Enables export of audit events to Segment.
156    pub segment_api_key: Option<String>,
157    /// Whether the Segment client is being used on the client side
158    /// (rather than the server side).
159    pub segment_client_side: bool,
160    /// Only create a dummy segment client, only to get more testing coverage.
161    pub test_only_dummy_segment_client: bool,
162    /// An SDK key for LaunchDarkly. Enables system parameter synchronization
163    /// with LaunchDarkly.
164    pub launchdarkly_sdk_key: Option<String>,
165    /// An invertible map from system parameter names to LaunchDarkly feature
166    /// keys to use when propagating values from the latter to the former.
167    pub launchdarkly_key_map: BTreeMap<String, String>,
168    /// The duration at which the system parameter synchronization times out during startup.
169    pub config_sync_timeout: Duration,
170    /// The interval in seconds at which to synchronize system parameter values.
171    pub config_sync_loop_interval: Option<Duration>,
172    /// The path for file based config sync
173    pub config_sync_file_path: Option<PathBuf>,
174
175    // === Bootstrap options. ===
176    /// The cloud ID of this environment.
177    pub environment_id: EnvironmentId,
178    /// What role, if any, should be initially created with elevated privileges.
179    pub bootstrap_role: Option<String>,
180    /// The size of the default cluster replica if bootstrapping.
181    pub bootstrap_default_cluster_replica_size: String,
182    /// The default number of replicas if bootstrapping.
183    pub bootstrap_default_cluster_replication_factor: u32,
184    /// The config of the builtin system cluster replicas if bootstrapping.
185    pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
186    /// The config of the builtin catalog server cluster replicas if bootstrapping.
187    pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
188    /// The config of the builtin probe cluster replicas if bootstrapping.
189    pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
190    /// The config of the builtin support cluster replicas if bootstrapping.
191    pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
192    /// The config of the builtin analytics cluster replicas if bootstrapping.
193    pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
194    /// Values to set for system parameters, if those system parameters have not
195    /// already been set by the system user.
196    pub system_parameter_defaults: BTreeMap<String, String>,
197    /// Helm chart version
198    pub helm_chart_version: Option<String>,
199    /// Configuration managed by license keys
200    pub license_key: ValidatedLicenseKey,
201
202    // === AWS options. ===
203    /// The AWS account ID, which will be used to generate ARNs for
204    /// Materialize-controlled AWS resources.
205    pub aws_account_id: Option<String>,
206    /// Supported AWS PrivateLink availability zone ids.
207    pub aws_privatelink_availability_zones: Option<Vec<String>>,
208
209    // === Observability options. ===
210    /// The metrics registry to use.
211    pub metrics_registry: MetricsRegistry,
212    /// Handle to tracing.
213    pub tracing_handle: TracingHandle,
214
215    // === Testing options. ===
216    /// A now generation function for mocking time.
217    pub now: NowFn,
218    /// If `Some`, force running builtin schema migration using the specified
219    /// migration mechanism ("evolution" or "replacement").
220    pub force_builtin_schema_migration: Option<String>,
221}
222
223/// Configuration for the Catalog.
224#[derive(Debug, Clone)]
225pub struct CatalogConfig {
226    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
227    pub persist_clients: Arc<PersistClientCache>,
228    /// Persist catalog metrics.
229    pub metrics: Arc<mz_catalog::durable::Metrics>,
230}
231
232pub struct Listener<C> {
233    pub handle: ListenerHandle,
234    connection_stream: Pin<Box<dyn ConnectionStream>>,
235    config: C,
236}
237impl<C> Listener<C>
238where
239    C: ListenerConfig,
240{
241    /// Initializes network listeners for a later call to `serve` at the
242    /// specified addresses.
243    ///
244    /// Splitting this function out from `serve` has two benefits:
245    ///
246    ///   * It initializes network listeners as early as possible, so that the OS
247    ///     will queue incoming connections while the server is booting.
248    ///
249    ///   * It allows the caller to communicate with the server via the internal
250    ///     HTTP port while it is booting.
251    ///
252    async fn bind(config: C) -> Result<Self, io::Error> {
253        let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
254        Ok(Self {
255            handle,
256            connection_stream,
257            config,
258        })
259    }
260}
261
262impl Listener<SqlListenerConfig> {
263    #[instrument(name = "environmentd::serve_sql")]
264    pub async fn serve_sql(
265        self,
266        name: String,
267        active_connection_counter: ConnectionCounter,
268        tls_reloading_context: Option<ReloadingSslContext>,
269        frontegg: Option<FronteggAuthenticator>,
270        adapter_client: AdapterClient,
271        oidc: GenericOidcAuthenticator,
272        metrics: MetricsConfig,
273        helm_chart_version: Option<String>,
274    ) -> ListenerHandle {
275        let label: &'static str = Box::leak(name.into_boxed_str());
276        let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
277            context,
278            mode: if self.config.enable_tls {
279                TlsMode::Require
280            } else {
281                TlsMode::Allow
282            },
283        });
284
285        task::spawn(|| format!("{}_sql_server", label), {
286            let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
287                label,
288                tls,
289                adapter_client,
290                authenticator_kind: self.config.authenticator_kind,
291                frontegg,
292                oidc,
293                metrics,
294                active_connection_counter,
295                helm_chart_version,
296                allowed_roles: self.config.allowed_roles,
297            });
298            mz_server_core::serve(ServeConfig {
299                conns: self.connection_stream,
300                server: sql_server,
301                // `environmentd` does not currently need to dynamically
302                // configure graceful termination behavior.
303                dyncfg: None,
304            })
305        });
306        self.handle
307    }
308}
309
310impl Listener<HttpListenerConfig> {
311    #[instrument(name = "environmentd::serve_http")]
312    pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
313        let task_name = format!("{}_http_server", &config.source);
314        task::spawn(|| task_name, {
315            let http_server = HttpServer::new(config);
316            mz_server_core::serve(ServeConfig {
317                conns: self.connection_stream,
318                server: http_server,
319                // `environmentd` does not currently need to dynamically
320                // configure graceful termination behavior.
321                dyncfg: None,
322            })
323        });
324        self.handle
325    }
326}
327
328pub struct Listeners {
329    pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
330    pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
331}
332
333impl Listeners {
334    pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
335        let mut sql = BTreeMap::new();
336        for (name, config) in config.sql {
337            sql.insert(name, Listener::bind(config).await?);
338        }
339
340        let mut http = BTreeMap::new();
341        for (name, config) in config.http {
342            http.insert(name, Listener::bind(config).await?);
343        }
344
345        Ok(Listeners { http, sql })
346    }
347
348    /// Starts an `environmentd` server.
349    ///
350    /// Returns a handle to the server once it is fully booted.
351    #[instrument(name = "environmentd::serve")]
352    pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
353        let serve_start = Instant::now();
354        info!("startup: envd serve: beginning");
355        info!("startup: envd serve: preamble beginning");
356
357        // Validate TLS configuration, if present.
358        let tls_reloading_context = match config.tls {
359            Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
360            None => None,
361        };
362
363        let active_connection_counter = ConnectionCounter::default();
364        let (deployment_state, deployment_state_handle) = DeploymentState::new();
365
366        // Launch HTTP servers.
367        //
368        // We start these servers before we've completed initialization so that
369        // metrics are accessible during initialization. Some HTTP
370        // endpoints require the adapter to be initialized; requests to those
371        // endpoints block until the adapter client is installed.
372        // One of these endpoints is /api/readyz,
373        // which assumes we're ready when the adapter client exists.
374        let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
375        let internal_route_config = Arc::new(InternalRouteConfig {
376            deployment_state_handle,
377            internal_console_redirect_url: config.internal_console_redirect_url,
378        });
379
380        let (authenticator_oidc_tx, authenticator_oidc_rx) = oneshot::channel();
381        let authenticator_oidc_rx = authenticator_oidc_rx.shared();
382        let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
383        let adapter_client_rx = adapter_client_rx.shared();
384
385        let metrics_registry = config.metrics_registry.clone();
386        let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
387        let mcp_metrics = http::mcp_metrics::McpMetrics::register_into(&metrics_registry);
388        let mut http_listener_handles = BTreeMap::new();
389        for (name, listener) in self.http {
390            let authenticator_kind = listener.config.authenticator_kind();
391            let source: &'static str = Box::leak(name.clone().into_boxed_str());
392            let tls = if listener.config.enable_tls() {
393                tls_reloading_context.clone()
394            } else {
395                None
396            };
397            let http_config = HttpConfig {
398                adapter_client_rx: adapter_client_rx.clone(),
399                active_connection_counter: active_connection_counter.clone(),
400                helm_chart_version: config.helm_chart_version.clone(),
401                source,
402                tls,
403                authenticator_kind,
404                frontegg: config.frontegg.clone(),
405                oidc_rx: authenticator_oidc_rx.clone(),
406                allowed_origin: config.cors_allowed_origin.clone(),
407                allowed_origin_list: config.cors_allowed_origin_list.clone(),
408                concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
409                metrics: metrics.clone(),
410                metrics_registry: metrics_registry.clone(),
411                mcp_metrics: mcp_metrics.clone(),
412                allowed_roles: listener.config.allowed_roles(),
413                internal_route_config: Arc::clone(&internal_route_config),
414                routes_enabled: listener.config.routes.clone(),
415                replica_http_locator: Arc::clone(&config.controller.replica_http_locator),
416            };
417            http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
418        }
419
420        info!(
421            "startup: envd serve: preamble complete in {:?}",
422            serve_start.elapsed()
423        );
424
425        let catalog_init_start = Instant::now();
426        info!("startup: envd serve: catalog init beginning");
427
428        // Get the current timestamp so we can record when we booted.
429        let boot_ts = (config.now)().into();
430
431        let persist_client = config
432            .catalog_config
433            .persist_clients
434            .open(config.controller.persist_location.clone())
435            .await
436            .context("opening persist client")?;
437        let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
438            persist_client.clone(),
439            config.environment_id.organization_id(),
440            BUILD_INFO.semver_version(),
441            Some(config.controller.deploy_generation),
442            Arc::clone(&config.catalog_config.metrics),
443        )
444        .await?;
445
446        info!(
447            "startup: envd serve: catalog init complete in {:?}",
448            catalog_init_start.elapsed()
449        );
450
451        let system_param_sync_start = Instant::now();
452        info!("startup: envd serve: system parameter sync beginning");
453        // Initialize the system parameter frontend
454        let system_parameter_sync_config =
455            match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
456                (None, None) => None,
457                (None, Some(f)) => {
458                    info!("Using config file path {:?}", f);
459                    Some(SystemParameterSyncConfig::new(
460                        config.environment_id.clone(),
461                        &BUILD_INFO,
462                        &config.metrics_registry,
463                        config.launchdarkly_key_map,
464                        SystemParameterSyncClientConfig::File { path: f },
465                    ))
466                }
467                (Some(key), None) => Some(SystemParameterSyncConfig::new(
468                    config.environment_id.clone(),
469                    &BUILD_INFO,
470                    &config.metrics_registry,
471                    config.launchdarkly_key_map,
472                    SystemParameterSyncClientConfig::LaunchDarkly {
473                        sdk_key: key,
474                        now_fn: config.now.clone(),
475                    },
476                )),
477
478                (Some(_), Some(_)) => {
479                    panic!("Cannot configure both file and Launchdarkly based config syncing")
480                }
481            };
482
483        let remote_system_parameters = load_remote_system_parameters(
484            &mut openable_adapter_storage,
485            system_parameter_sync_config.clone(),
486            config.config_sync_timeout,
487        )
488        .await?;
489        info!(
490            "startup: envd serve: system parameter sync complete in {:?}",
491            system_param_sync_start.elapsed()
492        );
493
494        let preflight_checks_start = Instant::now();
495        info!("startup: envd serve: preflight checks beginning");
496
497        // Determine the maximum wait time when doing a 0dt deployment.
498        let with_0dt_deployment_max_wait = {
499            let cli_default = config
500                .system_parameter_defaults
501                .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
502                .map(|x| {
503                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
504                        anyhow!(
505                            "failed to parse default for {}: {:?}",
506                            WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
507                            err
508                        )
509                    })
510                })
511                .transpose()?;
512            let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
513            let ld = get_ld_value(
514                WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
515                &remote_system_parameters,
516                |x| {
517                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
518                        format!(
519                            "failed to parse LD value {} for {}: {:?}",
520                            x,
521                            WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
522                            err
523                        )
524                    })
525                },
526            )?;
527            let catalog = openable_adapter_storage
528                .get_0dt_deployment_max_wait()
529                .await?;
530            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
531            info!(
532                ?computed,
533                ?ld,
534                ?catalog,
535                ?cli_default,
536                ?compiled_default,
537                "determined value for {} system parameter",
538                WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
539            );
540            computed
541        };
542        // Determine the DDL check interval when doing a 0dt deployment.
543        let with_0dt_deployment_ddl_check_interval = {
544            let cli_default = config
545                .system_parameter_defaults
546                .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
547                .map(|x| {
548                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
549                        anyhow!(
550                            "failed to parse default for {}: {:?}",
551                            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
552                            err
553                        )
554                    })
555                })
556                .transpose()?;
557            let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
558            let ld = get_ld_value(
559                WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
560                &remote_system_parameters,
561                |x| {
562                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
563                        format!(
564                            "failed to parse LD value {} for {}: {:?}",
565                            x,
566                            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
567                            err
568                        )
569                    })
570                },
571            )?;
572            let catalog = openable_adapter_storage
573                .get_0dt_deployment_ddl_check_interval()
574                .await?;
575            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
576            info!(
577                ?computed,
578                ?ld,
579                ?catalog,
580                ?cli_default,
581                ?compiled_default,
582                "determined value for {} system parameter",
583                WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
584            );
585            computed
586        };
587
588        // Determine whether we should panic if we reach the maximum wait time
589        // without the preflight checks succeeding.
590        let enable_0dt_deployment_panic_after_timeout = {
591            let cli_default = config
592                .system_parameter_defaults
593                .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
594                .map(|x| {
595                    strconv::parse_bool(x).map_err(|err| {
596                        anyhow!(
597                            "failed to parse default for {}: {}",
598                            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
599                            err
600                        )
601                    })
602                })
603                .transpose()?;
604            let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
605            let ld = get_ld_value(
606                "enable_0dt_deployment_panic_after_timeout",
607                &remote_system_parameters,
608                |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
609            )?;
610            let catalog = openable_adapter_storage
611                .get_enable_0dt_deployment_panic_after_timeout()
612                .await?;
613            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
614            info!(
615                %computed,
616                ?ld,
617                ?catalog,
618                ?cli_default,
619                ?compiled_default,
620                "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
621            );
622            computed
623        };
624
625        // Perform preflight checks.
626        //
627        // Preflight checks determine whether to boot in read-only mode or not.
628        let bootstrap_args = BootstrapArgs {
629            default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
630            default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
631            bootstrap_role: config.bootstrap_role.clone(),
632            cluster_replica_size_map: config.cluster_replica_sizes.clone(),
633        };
634        let preflight_config = PreflightInput {
635            boot_ts,
636            environment_id: config.environment_id.clone(),
637            persist_client,
638            deploy_generation: config.controller.deploy_generation,
639            deployment_state: deployment_state.clone(),
640            openable_adapter_storage,
641            catalog_metrics: Arc::clone(&config.catalog_config.metrics),
642            caught_up_max_wait: with_0dt_deployment_max_wait,
643            panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
644            bootstrap_args,
645            ddl_check_interval: with_0dt_deployment_ddl_check_interval,
646        };
647        let PreflightOutput {
648            openable_adapter_storage,
649            read_only,
650            caught_up_trigger,
651        } = deployment::preflight::preflight_0dt(preflight_config).await?;
652
653        info!(
654            "startup: envd serve: preflight checks complete in {:?}",
655            preflight_checks_start.elapsed()
656        );
657
658        let catalog_open_start = Instant::now();
659        info!("startup: envd serve: durable catalog open beginning");
660
661        let bootstrap_args = BootstrapArgs {
662            default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
663            default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
664            bootstrap_role: config.bootstrap_role,
665            cluster_replica_size_map: config.cluster_replica_sizes.clone(),
666        };
667
668        // Load the adapter durable storage.
669        let (adapter_storage, audit_logs_iterator) = if read_only {
670            // TODO: behavior of migrations when booting in savepoint mode is
671            // not well defined.
672            let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
673                .open_savepoint(boot_ts, &bootstrap_args)
674                .await?;
675            // In read-only mode, we intentionally do not call `set_is_leader`,
676            // because we are by definition not the leader if we are in
677            // read-only mode.
678
679            (adapter_storage, audit_logs_iterator)
680        } else {
681            let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
682                .open(boot_ts, &bootstrap_args)
683                .await?;
684
685            // Once we have successfully opened the adapter storage in
686            // read/write mode, we can announce we are the leader, as we've
687            // fenced out all other environments using the adapter storage.
688            deployment_state.set_is_leader();
689
690            (adapter_storage, audit_logs_iterator)
691        };
692
693        // Enable Persist compaction if we're not in read only.
694        if !read_only {
695            config.controller.persist_clients.cfg().enable_compaction();
696        }
697
698        info!(
699            "startup: envd serve: durable catalog open complete in {:?}",
700            catalog_open_start.elapsed()
701        );
702
703        let coord_init_start = Instant::now();
704        info!("startup: envd serve: coordinator init beginning");
705
706        if !config
707            .cluster_replica_sizes
708            .0
709            .contains_key(&config.bootstrap_default_cluster_replica_size)
710        {
711            return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
712        }
713        let envd_epoch = adapter_storage.epoch();
714
715        // Initialize storage usage client.
716        let storage_usage_client = StorageUsageClient::open(
717            config
718                .controller
719                .persist_clients
720                .open(config.controller.persist_location.clone())
721                .await
722                .context("opening storage usage client")?,
723        );
724
725        // Initialize adapter.
726        let segment_client = config.segment_api_key.map(|api_key| {
727            mz_segment::Client::new(mz_segment::Config {
728                api_key,
729                client_side: config.segment_client_side,
730            })
731        });
732        let connection_limiter = active_connection_counter.clone();
733        let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
734            connection_limiter.update_limit(limit);
735            connection_limiter.update_superuser_reserved(superuser_reserved);
736        });
737
738        let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
739            connection_context: config.controller.connection_context.clone(),
740            connection_limit_callback,
741            controller_config: config.controller,
742            controller_envd_epoch: envd_epoch,
743            storage: adapter_storage,
744            audit_logs_iterator,
745            timestamp_oracle_url: config.timestamp_oracle_url,
746            unsafe_mode: config.unsafe_mode,
747            all_features: config.all_features,
748            build_info: &BUILD_INFO,
749            environment_id: config.environment_id.clone(),
750            metrics_registry: config.metrics_registry.clone(),
751            now: config.now,
752            secrets_controller: config.secrets_controller,
753            cloud_resource_controller: config.cloud_resource_controller,
754            cluster_replica_sizes: config.cluster_replica_sizes,
755            builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
756            builtin_catalog_server_cluster_config: config
757                .bootstrap_builtin_catalog_server_cluster_config,
758            builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
759            builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
760            builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
761            availability_zones: config.availability_zones,
762            system_parameter_defaults: config.system_parameter_defaults,
763            storage_usage_client,
764            storage_usage_collection_interval: config.storage_usage_collection_interval,
765            storage_usage_retention_period: config.storage_usage_retention_period,
766            segment_client: segment_client.clone(),
767            egress_addresses: config.egress_addresses,
768            remote_system_parameters,
769            aws_account_id: config.aws_account_id,
770            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
771            webhook_concurrency_limit: webhook_concurrency_limit.clone(),
772            http_host_name: config.http_host_name,
773            tracing_handle: config.tracing_handle,
774            read_only_controllers: read_only,
775            caught_up_trigger,
776            helm_chart_version: config.helm_chart_version.clone(),
777            license_key: config.license_key,
778            external_login_password_mz_system: config.external_login_password_mz_system,
779            force_builtin_schema_migration: config.force_builtin_schema_migration,
780        })
781        .instrument(info_span!("adapter::serve"))
782        .await?;
783
784        // Initialize the OIDC authenticator, shared between the HTTP and SQL servers.
785        let oidc = GenericOidcAuthenticator::new(adapter_client.clone());
786
787        info!(
788            "startup: envd serve: coordinator init complete in {:?}",
789            coord_init_start.elapsed()
790        );
791
792        let serve_postamble_start = Instant::now();
793        info!("startup: envd serve: postamble beginning");
794
795        // Send adapter client and OIDC authenticator to the HTTP servers.
796        authenticator_oidc_tx
797            .send(oidc.clone())
798            .expect("rx known to be live");
799        adapter_client_tx
800            .send(adapter_client.clone())
801            .expect("internal HTTP server should not drop first");
802
803        let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
804
805        // Launch SQL server.
806        let mut sql_listener_handles = BTreeMap::new();
807        for (name, listener) in self.sql {
808            sql_listener_handles.insert(
809                name.clone(),
810                listener
811                    .serve_sql(
812                        name,
813                        active_connection_counter.clone(),
814                        tls_reloading_context.clone(),
815                        config.frontegg.clone(),
816                        adapter_client.clone(),
817                        oidc.clone(),
818                        metrics.clone(),
819                        config.helm_chart_version.clone(),
820                    )
821                    .await,
822            );
823        }
824
825        // Start telemetry reporting loop.
826        if let Some(segment_client) = segment_client {
827            telemetry::start_reporting(telemetry::Config {
828                segment_client,
829                adapter_client: adapter_client.clone(),
830                environment_id: config.environment_id,
831                report_interval: Duration::from_secs(3600),
832            });
833        } else if config.test_only_dummy_segment_client {
834            // We only have access to a segment client in production but we
835            // still want to exercise the telemetry reporting code to a degree.
836            // So we create a dummy client and report telemetry into the void.
837            // This way we at least run the telemetry queries the way a
838            // production environment would.
839            tracing::debug!("starting telemetry reporting with a dummy segment client");
840            let segment_client = mz_segment::Client::new_dummy_client();
841            telemetry::start_reporting(telemetry::Config {
842                segment_client,
843                adapter_client: adapter_client.clone(),
844                environment_id: config.environment_id,
845                report_interval: Duration::from_secs(180),
846            });
847        }
848
849        // If system_parameter_sync_config and config_sync_loop_interval are present,
850        // start the system_parameter_sync loop.
851        if let Some(system_parameter_sync_config) = system_parameter_sync_config {
852            task::spawn(
853                || "system_parameter_sync",
854                AssertUnwindSafe(system_parameter_sync(
855                    system_parameter_sync_config,
856                    adapter_client.clone(),
857                    config.config_sync_loop_interval,
858                ))
859                .ore_catch_unwind(),
860            );
861        }
862
863        info!(
864            "startup: envd serve: postamble complete in {:?}",
865            serve_postamble_start.elapsed()
866        );
867        info!(
868            "startup: envd serve: complete in {:?}",
869            serve_start.elapsed()
870        );
871
872        Ok(Server {
873            sql_listener_handles,
874            http_listener_handles,
875            _adapter_handle: adapter_handle,
876        })
877    }
878}
879
880fn get_ld_value<V>(
881    name: &str,
882    remote_system_parameters: &Option<BTreeMap<String, String>>,
883    parse: impl Fn(&str) -> Result<V, String>,
884) -> Result<Option<V>, anyhow::Error> {
885    remote_system_parameters
886        .as_ref()
887        .and_then(|params| params.get(name))
888        .map(|x| {
889            parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
890        })
891        .transpose()
892}
893
894/// A running `environmentd` server.
895pub struct Server {
896    // Drop order matters for these fields.
897    pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
898    pub http_listener_handles: BTreeMap<String, ListenerHandle>,
899    _adapter_handle: mz_adapter::Handle,
900}