mz_environmentd/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A SQL stream processor built on top of [timely dataflow] and
11//! [differential dataflow].
12//!
13//! [differential dataflow]: ../differential_dataflow/index.html
14//! [timely dataflow]: ../timely/index.html
15
16use std::collections::BTreeMap;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::panic::AssertUnwindSafe;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use std::{env, io};
23
24use anyhow::{Context, anyhow};
25use derivative::Derivative;
26use ipnet::IpNet;
27use mz_adapter::config::{SystemParameterSyncConfig, system_parameter_sync};
28use mz_adapter::webhook::WebhookConcurrencyLimiter;
29use mz_adapter::{AdapterError, load_remote_system_parameters};
30use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
31use mz_adapter_types::dyncfgs::{
32    ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT,
33    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
34};
35use mz_build_info::{BuildInfo, build_info};
36use mz_catalog::config::ClusterReplicaSizeMap;
37use mz_catalog::durable::BootstrapArgs;
38use mz_cloud_resources::CloudResourceController;
39use mz_controller::ControllerConfig;
40use mz_frontegg_auth::Authenticator as FronteggAuthentication;
41use mz_license_keys::ValidatedLicenseKey;
42use mz_ore::future::OreFutureExt;
43use mz_ore::metrics::MetricsRegistry;
44use mz_ore::now::NowFn;
45use mz_ore::tracing::TracingHandle;
46use mz_ore::url::SensitiveUrl;
47use mz_ore::{instrument, task};
48use mz_persist_client::cache::PersistClientCache;
49use mz_persist_client::usage::StorageUsageClient;
50use mz_pgwire_common::ConnectionCounter;
51use mz_repr::strconv;
52use mz_secrets::SecretsController;
53use mz_server_core::{ConnectionStream, ListenerHandle, ReloadTrigger, ServeConfig, TlsCertConfig};
54use mz_sql::catalog::EnvironmentId;
55use mz_sql::session::vars::{Value, VarInput};
56use tokio::sync::oneshot;
57use tower_http::cors::AllowOrigin;
58use tracing::{Instrument, info, info_span};
59
60use crate::deployment::preflight::{PreflightInput, PreflightOutput};
61use crate::deployment::state::DeploymentState;
62use crate::http::{HttpConfig, HttpServer, InternalHttpConfig, InternalHttpServer};
63
64pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
65
66mod deployment;
67pub mod environmentd;
68pub mod http;
69mod telemetry;
70#[cfg(feature = "test")]
71pub mod test_util;
72
73pub const BUILD_INFO: BuildInfo = build_info!();
74
75/// Configuration for an `environmentd` server.
76#[derive(Derivative)]
77#[derivative(Debug)]
78pub struct Config {
79    // === Special modes. ===
80    /// Whether to permit usage of unsafe features. This is never meant to run
81    /// in production.
82    pub unsafe_mode: bool,
83    /// Whether the environmentd is running on a local dev machine. This is
84    /// never meant to run in production or CI.
85    pub all_features: bool,
86
87    // === Connection options. ===
88    /// TLS encryption and authentication configuration.
89    pub tls: Option<TlsCertConfig>,
90    /// Trigger to attempt to reload TLS certififcates.
91    #[derivative(Debug = "ignore")]
92    pub tls_reload_certs: ReloadTrigger,
93    /// Frontegg JWT authentication configuration.
94    pub frontegg: Option<FronteggAuthentication>,
95    /// Origins for which cross-origin resource sharing (CORS) for HTTP requests
96    /// is permitted.
97    pub cors_allowed_origin: AllowOrigin,
98    /// Public IP addresses which the cloud environment has configured for
99    /// egress.
100    pub egress_addresses: Vec<IpNet>,
101    /// The external host name to connect to the HTTP server of this
102    /// environment.
103    ///
104    /// Presently used to render webhook URLs for end users in notices and the
105    /// system catalog. Not used to establish connections directly.
106    pub http_host_name: Option<String>,
107    /// The URL of the Materialize console to proxy from the /internal-console
108    /// endpoint on the internal HTTP server.
109    pub internal_console_redirect_url: Option<String>,
110    /// Whether to enable self hosted auth
111    pub self_hosted_auth: bool,
112    /// Whether to enable self hosted auth on the internal pg port
113    pub self_hosted_auth_internal: bool,
114
115    // === Controller options. ===
116    /// Storage and compute controller configuration.
117    pub controller: ControllerConfig,
118    /// Secrets controller configuration.
119    pub secrets_controller: Arc<dyn SecretsController>,
120    /// VpcEndpoint controller configuration.
121    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
122
123    // === Storage options. ===
124    /// The interval at which to collect storage usage information.
125    pub storage_usage_collection_interval: Duration,
126    /// How long to retain storage usage records for.
127    pub storage_usage_retention_period: Option<Duration>,
128
129    // === Adapter options. ===
130    /// Catalog configuration.
131    pub catalog_config: CatalogConfig,
132    /// Availability zones in which storage and compute resources may be
133    /// deployed.
134    pub availability_zones: Vec<String>,
135    /// A map from size name to resource allocations for cluster replicas.
136    pub cluster_replica_sizes: ClusterReplicaSizeMap,
137    /// The PostgreSQL URL for the Postgres-backed timestamp oracle.
138    pub timestamp_oracle_url: Option<SensitiveUrl>,
139    /// An API key for Segment. Enables export of audit events to Segment.
140    pub segment_api_key: Option<String>,
141    /// Whether the Segment client is being used on the client side
142    /// (rather than the server side).
143    pub segment_client_side: bool,
144    /// An SDK key for LaunchDarkly. Enables system parameter synchronization
145    /// with LaunchDarkly.
146    pub launchdarkly_sdk_key: Option<String>,
147    /// An invertible map from system parameter names to LaunchDarkly feature
148    /// keys to use when propagating values from the latter to the former.
149    pub launchdarkly_key_map: BTreeMap<String, String>,
150    /// The duration at which the system parameter synchronization times out during startup.
151    pub config_sync_timeout: Duration,
152    /// The interval in seconds at which to synchronize system parameter values.
153    pub config_sync_loop_interval: Option<Duration>,
154
155    // === Bootstrap options. ===
156    /// The cloud ID of this environment.
157    pub environment_id: EnvironmentId,
158    /// What role, if any, should be initially created with elevated privileges.
159    pub bootstrap_role: Option<String>,
160    /// The size of the default cluster replica if bootstrapping.
161    pub bootstrap_default_cluster_replica_size: String,
162    /// The default number of replicas if bootstrapping.
163    pub bootstrap_default_cluster_replication_factor: u32,
164    /// The config of the builtin system cluster replicas if bootstrapping.
165    pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
166    /// The config of the builtin catalog server cluster replicas if bootstrapping.
167    pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
168    /// The config of the builtin probe cluster replicas if bootstrapping.
169    pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
170    /// The config of the builtin support cluster replicas if bootstrapping.
171    pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
172    /// The config of the builtin analytics cluster replicas if bootstrapping.
173    pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
174    /// Values to set for system parameters, if those system parameters have not
175    /// already been set by the system user.
176    pub system_parameter_defaults: BTreeMap<String, String>,
177    /// Helm chart version
178    pub helm_chart_version: Option<String>,
179    /// Configuration managed by license keys
180    pub license_key: ValidatedLicenseKey,
181
182    // === AWS options. ===
183    /// The AWS account ID, which will be used to generate ARNs for
184    /// Materialize-controlled AWS resources.
185    pub aws_account_id: Option<String>,
186    /// Supported AWS PrivateLink availability zone ids.
187    pub aws_privatelink_availability_zones: Option<Vec<String>>,
188
189    // === Observability options. ===
190    /// The metrics registry to use.
191    pub metrics_registry: MetricsRegistry,
192    /// Handle to tracing.
193    pub tracing_handle: TracingHandle,
194
195    // === Testing options. ===
196    /// A now generation function for mocking time.
197    pub now: NowFn,
198}
199
200/// Configuration for network listeners.
201pub struct ListenersConfig {
202    /// The IP address and port to listen for pgwire connections on.
203    pub sql_listen_addr: SocketAddr,
204    /// The IP address and port to listen for HTTP connections on.
205    pub http_listen_addr: SocketAddr,
206    /// The IP address and port to listen for pgwire connections from the cloud
207    /// system on.
208    pub internal_sql_listen_addr: SocketAddr,
209    /// The IP address and port to serve the metrics registry from.
210    pub internal_http_listen_addr: SocketAddr,
211}
212
213/// Configuration for the Catalog.
214#[derive(Debug, Clone)]
215pub struct CatalogConfig {
216    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
217    pub persist_clients: Arc<PersistClientCache>,
218    /// Persist catalog metrics.
219    pub metrics: Arc<mz_catalog::durable::Metrics>,
220}
221
222/// Listeners for an `environmentd` server.
223pub struct Listeners {
224    // Drop order matters for these fields.
225    sql: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
226    http: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
227    internal_sql: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
228    internal_http: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
229}
230
231impl Listeners {
232    /// Initializes network listeners for a later call to `serve` at the
233    /// specified addresses.
234    ///
235    /// Splitting this function out from `serve` has two benefits:
236    ///
237    ///   * It initializes network listeners as early as possible, so that the OS
238    ///     will queue incoming connections while the server is booting.
239    ///
240    ///   * It allows the caller to communicate with the server via the internal
241    ///     HTTP port while it is booting.
242    ///
243    pub async fn bind(
244        ListenersConfig {
245            sql_listen_addr,
246            http_listen_addr,
247            internal_sql_listen_addr,
248            internal_http_listen_addr,
249        }: ListenersConfig,
250    ) -> Result<Listeners, io::Error> {
251        let sql = mz_server_core::listen(&sql_listen_addr).await?;
252        let http = mz_server_core::listen(&http_listen_addr).await?;
253        let internal_sql = mz_server_core::listen(&internal_sql_listen_addr).await?;
254        let internal_http = mz_server_core::listen(&internal_http_listen_addr).await?;
255        Ok(Listeners {
256            sql,
257            http,
258            internal_sql,
259            internal_http,
260        })
261    }
262
263    /// Like [`Listeners::bind`], but binds each ports to an arbitrary free
264    /// local address.
265    pub async fn bind_any_local() -> Result<Listeners, io::Error> {
266        Listeners::bind(ListenersConfig {
267            sql_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
268            http_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
269            internal_sql_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
270            internal_http_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
271        })
272        .await
273    }
274
275    /// Starts an `environmentd` server.
276    ///
277    /// Returns a handle to the server once it is fully booted.
278    #[instrument(name = "environmentd::serve")]
279    pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
280        let serve_start = Instant::now();
281        info!("startup: envd serve: beginning");
282        info!("startup: envd serve: preamble beginning");
283
284        let Listeners {
285            sql: (sql_listener, sql_conns),
286            http: (http_listener, http_conns),
287            internal_sql: (internal_sql_listener, internal_sql_conns),
288            internal_http: (internal_http_listener, internal_http_conns),
289        } = self;
290
291        // Validate TLS configuration, if present.
292        let (pgwire_tls, http_tls) = match &config.tls {
293            None => (None, None),
294            Some(tls_config) => {
295                let context = tls_config.reloading_context(config.tls_reload_certs)?;
296                let pgwire_tls = mz_server_core::ReloadingTlsConfig {
297                    context: context.clone(),
298                    mode: mz_server_core::TlsMode::Require,
299                };
300                let http_tls = http::ReloadingTlsConfig {
301                    context,
302                    mode: http::TlsMode::Require,
303                };
304                (Some(pgwire_tls), Some(http_tls))
305            }
306        };
307
308        let active_connection_counter = ConnectionCounter::default();
309        let (deployment_state, deployment_state_handle) = DeploymentState::new();
310
311        // Start the internal HTTP server.
312        //
313        // We start this server before we've completed initialization so that
314        // metrics are accessible during initialization. Some internal HTTP
315        // endpoints require the adapter to be initialized; requests to those
316        // endpoints block until the adapter client is installed.
317        let (internal_http_adapter_client_tx, internal_http_adapter_client_rx) = oneshot::channel();
318        task::spawn(|| "internal_http_server", {
319            let internal_http_server = InternalHttpServer::new(InternalHttpConfig {
320                metrics_registry: config.metrics_registry.clone(),
321                adapter_client_rx: internal_http_adapter_client_rx,
322                active_connection_counter: active_connection_counter.clone(),
323                helm_chart_version: config.helm_chart_version.clone(),
324                deployment_state_handle,
325                internal_console_redirect_url: config.internal_console_redirect_url,
326            });
327            mz_server_core::serve(ServeConfig {
328                server: internal_http_server,
329                conns: internal_http_conns,
330                // `environmentd` does not currently need to dynamically
331                // configure graceful termination behavior.
332                dyncfg: None,
333            })
334        });
335
336        info!(
337            "startup: envd serve: preamble complete in {:?}",
338            serve_start.elapsed()
339        );
340
341        let catalog_init_start = Instant::now();
342        info!("startup: envd serve: catalog init beginning");
343
344        // Get the current timestamp so we can record when we booted.
345        let boot_ts = (config.now)().into();
346
347        let persist_client = config
348            .catalog_config
349            .persist_clients
350            .open(config.controller.persist_location.clone())
351            .await
352            .context("opening persist client")?;
353        let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
354            persist_client.clone(),
355            config.environment_id.organization_id(),
356            BUILD_INFO.semver_version(),
357            Some(config.controller.deploy_generation),
358            Arc::clone(&config.catalog_config.metrics),
359        )
360        .await?;
361
362        info!(
363            "startup: envd serve: catalog init complete in {:?}",
364            catalog_init_start.elapsed()
365        );
366
367        let system_param_sync_start = Instant::now();
368        info!("startup: envd serve: system parameter sync beginning");
369        // Initialize the system parameter frontend if `launchdarkly_sdk_key` is set.
370        let system_parameter_sync_config = if let Some(ld_sdk_key) = config.launchdarkly_sdk_key {
371            Some(SystemParameterSyncConfig::new(
372                config.environment_id.clone(),
373                &BUILD_INFO,
374                &config.metrics_registry,
375                config.now.clone(),
376                ld_sdk_key,
377                config.launchdarkly_key_map,
378            ))
379        } else {
380            None
381        };
382        let remote_system_parameters = load_remote_system_parameters(
383            &mut openable_adapter_storage,
384            system_parameter_sync_config.clone(),
385            config.config_sync_timeout,
386        )
387        .await?;
388        info!(
389            "startup: envd serve: system parameter sync complete in {:?}",
390            system_param_sync_start.elapsed()
391        );
392
393        let preflight_checks_start = Instant::now();
394        info!("startup: envd serve: preflight checks beginning");
395
396        // Determine whether we should perform a 0dt deployment.
397        let enable_0dt_deployment = {
398            let cli_default = config
399                .system_parameter_defaults
400                .get(ENABLE_0DT_DEPLOYMENT.name())
401                .map(|x| {
402                    strconv::parse_bool(x).map_err(|err| {
403                        anyhow!(
404                            "failed to parse default for {}: {}",
405                            ENABLE_0DT_DEPLOYMENT.name(),
406                            err
407                        )
408                    })
409                })
410                .transpose()?;
411            let compiled_default = ENABLE_0DT_DEPLOYMENT.default().clone();
412            let ld = get_ld_value("enable_0dt_deployment", &remote_system_parameters, |x| {
413                strconv::parse_bool(x).map_err(|x| x.to_string())
414            })?;
415            let catalog = openable_adapter_storage.get_enable_0dt_deployment().await?;
416            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
417            info!(
418                %computed,
419                ?ld,
420                ?catalog,
421                ?cli_default,
422                ?compiled_default,
423                "determined value for enable_0dt_deployment system parameter",
424            );
425            computed
426        };
427        // Determine the maximum wait time when doing a 0dt deployment.
428        let with_0dt_deployment_max_wait = {
429            let cli_default = config
430                .system_parameter_defaults
431                .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
432                .map(|x| {
433                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
434                        anyhow!(
435                            "failed to parse default for {}: {:?}",
436                            WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
437                            err
438                        )
439                    })
440                })
441                .transpose()?;
442            let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
443            let ld = get_ld_value(
444                WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
445                &remote_system_parameters,
446                |x| {
447                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
448                        format!(
449                            "failed to parse LD value {} for {}: {:?}",
450                            x,
451                            WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
452                            err
453                        )
454                    })
455                },
456            )?;
457            let catalog = openable_adapter_storage
458                .get_0dt_deployment_max_wait()
459                .await?;
460            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
461            info!(
462                ?computed,
463                ?ld,
464                ?catalog,
465                ?cli_default,
466                ?compiled_default,
467                "determined value for {} system parameter",
468                WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
469            );
470            computed
471        };
472        // Determine the DDL check interval when doing a 0dt deployment.
473        let with_0dt_deployment_ddl_check_interval = {
474            let cli_default = config
475                .system_parameter_defaults
476                .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
477                .map(|x| {
478                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
479                        anyhow!(
480                            "failed to parse default for {}: {:?}",
481                            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
482                            err
483                        )
484                    })
485                })
486                .transpose()?;
487            let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
488            let ld = get_ld_value(
489                WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
490                &remote_system_parameters,
491                |x| {
492                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
493                        format!(
494                            "failed to parse LD value {} for {}: {:?}",
495                            x,
496                            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
497                            err
498                        )
499                    })
500                },
501            )?;
502            let catalog = openable_adapter_storage
503                .get_0dt_deployment_ddl_check_interval()
504                .await?;
505            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
506            info!(
507                ?computed,
508                ?ld,
509                ?catalog,
510                ?cli_default,
511                ?compiled_default,
512                "determined value for {} system parameter",
513                WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
514            );
515            computed
516        };
517
518        // Determine whether we should panic if we reach the maximum wait time
519        // without the preflight checks succeeding.
520        let enable_0dt_deployment_panic_after_timeout = {
521            let cli_default = config
522                .system_parameter_defaults
523                .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
524                .map(|x| {
525                    strconv::parse_bool(x).map_err(|err| {
526                        anyhow!(
527                            "failed to parse default for {}: {}",
528                            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
529                            err
530                        )
531                    })
532                })
533                .transpose()?;
534            let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
535            let ld = get_ld_value(
536                "enable_0dt_deployment_panic_after_timeout",
537                &remote_system_parameters,
538                |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
539            )?;
540            let catalog = openable_adapter_storage
541                .get_enable_0dt_deployment_panic_after_timeout()
542                .await?;
543            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
544            info!(
545                %computed,
546                ?ld,
547                ?catalog,
548                ?cli_default,
549                ?compiled_default,
550                "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
551            );
552            computed
553        };
554
555        // Perform preflight checks.
556        //
557        // Preflight checks determine whether to boot in read-only mode or not.
558        let mut read_only = false;
559        let mut caught_up_trigger = None;
560        let bootstrap_args = BootstrapArgs {
561            default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
562            default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
563            bootstrap_role: config.bootstrap_role.clone(),
564            cluster_replica_size_map: config.cluster_replica_sizes.clone(),
565        };
566        let preflight_config = PreflightInput {
567            boot_ts,
568            environment_id: config.environment_id.clone(),
569            persist_client,
570            deploy_generation: config.controller.deploy_generation,
571            deployment_state: deployment_state.clone(),
572            openable_adapter_storage,
573            catalog_metrics: Arc::clone(&config.catalog_config.metrics),
574            caught_up_max_wait: with_0dt_deployment_max_wait,
575            panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
576            bootstrap_args,
577            ddl_check_interval: with_0dt_deployment_ddl_check_interval,
578        };
579        if enable_0dt_deployment {
580            PreflightOutput {
581                openable_adapter_storage,
582                read_only,
583                caught_up_trigger,
584            } = deployment::preflight::preflight_0dt(preflight_config).await?;
585        } else {
586            openable_adapter_storage =
587                deployment::preflight::preflight_legacy(preflight_config).await?;
588        };
589
590        info!(
591            "startup: envd serve: preflight checks complete in {:?}",
592            preflight_checks_start.elapsed()
593        );
594
595        let catalog_open_start = Instant::now();
596        info!("startup: envd serve: durable catalog open beginning");
597
598        let bootstrap_args = BootstrapArgs {
599            default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
600            default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
601            bootstrap_role: config.bootstrap_role,
602            cluster_replica_size_map: config.cluster_replica_sizes.clone(),
603        };
604
605        // Load the adapter durable storage.
606        let (adapter_storage, audit_logs_iterator) = if read_only {
607            // TODO: behavior of migrations when booting in savepoint mode is
608            // not well defined.
609            let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
610                .open_savepoint(boot_ts, &bootstrap_args)
611                .await?;
612            // In read-only mode, we intentionally do not call `set_is_leader`,
613            // because we are by definition not the leader if we are in
614            // read-only mode.
615
616            (adapter_storage, audit_logs_iterator)
617        } else {
618            let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
619                .open(boot_ts, &bootstrap_args)
620                .await?;
621
622            // Once we have successfully opened the adapter storage in
623            // read/write mode, we can announce we are the leader, as we've
624            // fenced out all other environments using the adapter storage.
625            deployment_state.set_is_leader();
626
627            (adapter_storage, audit_logs_iterator)
628        };
629
630        // Enable Persist compaction if we're not in read only.
631        if !read_only {
632            config.controller.persist_clients.cfg().enable_compaction();
633        }
634
635        info!(
636            "startup: envd serve: durable catalog open complete in {:?}",
637            catalog_open_start.elapsed()
638        );
639
640        let coord_init_start = Instant::now();
641        info!("startup: envd serve: coordinator init beginning");
642
643        if !config
644            .cluster_replica_sizes
645            .0
646            .contains_key(&config.bootstrap_default_cluster_replica_size)
647        {
648            return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
649        }
650        let envd_epoch = adapter_storage.epoch();
651
652        // Initialize storage usage client.
653        let storage_usage_client = StorageUsageClient::open(
654            config
655                .controller
656                .persist_clients
657                .open(config.controller.persist_location.clone())
658                .await
659                .context("opening storage usage client")?,
660        );
661
662        // Initialize adapter.
663        let segment_client = config.segment_api_key.map(|api_key| {
664            mz_segment::Client::new(mz_segment::Config {
665                api_key,
666                client_side: config.segment_client_side,
667            })
668        });
669        let connection_limiter = active_connection_counter.clone();
670        let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
671            connection_limiter.update_limit(limit);
672            connection_limiter.update_superuser_reserved(superuser_reserved);
673        });
674
675        let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
676        let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
677            connection_context: config.controller.connection_context.clone(),
678            connection_limit_callback,
679            controller_config: config.controller,
680            controller_envd_epoch: envd_epoch,
681            storage: adapter_storage,
682            audit_logs_iterator,
683            timestamp_oracle_url: config.timestamp_oracle_url,
684            unsafe_mode: config.unsafe_mode,
685            all_features: config.all_features,
686            build_info: &BUILD_INFO,
687            environment_id: config.environment_id.clone(),
688            metrics_registry: config.metrics_registry.clone(),
689            now: config.now,
690            secrets_controller: config.secrets_controller,
691            cloud_resource_controller: config.cloud_resource_controller,
692            cluster_replica_sizes: config.cluster_replica_sizes,
693            builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
694            builtin_catalog_server_cluster_config: config
695                .bootstrap_builtin_catalog_server_cluster_config,
696            builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
697            builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
698            builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
699            availability_zones: config.availability_zones,
700            system_parameter_defaults: config.system_parameter_defaults,
701            storage_usage_client,
702            storage_usage_collection_interval: config.storage_usage_collection_interval,
703            storage_usage_retention_period: config.storage_usage_retention_period,
704            segment_client: segment_client.clone(),
705            egress_addresses: config.egress_addresses,
706            remote_system_parameters,
707            aws_account_id: config.aws_account_id,
708            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
709            webhook_concurrency_limit: webhook_concurrency_limit.clone(),
710            http_host_name: config.http_host_name,
711            tracing_handle: config.tracing_handle,
712            read_only_controllers: read_only,
713            enable_0dt_deployment,
714            caught_up_trigger,
715            helm_chart_version: config.helm_chart_version.clone(),
716            license_key: config.license_key,
717        })
718        .instrument(info_span!("adapter::serve"))
719        .await?;
720
721        info!(
722            "startup: envd serve: coordinator init complete in {:?}",
723            coord_init_start.elapsed()
724        );
725
726        let serve_postamble_start = Instant::now();
727        info!("startup: envd serve: postamble beginning");
728
729        // Install an adapter client in the internal HTTP server.
730        internal_http_adapter_client_tx
731            .send(adapter_client.clone())
732            .expect("internal HTTP server should not drop first");
733
734        let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
735        // Launch SQL server.
736        task::spawn(|| "sql_server", {
737            let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
738                label: "external_pgwire",
739                tls: pgwire_tls.clone(),
740                adapter_client: adapter_client.clone(),
741                frontegg: config.frontegg.clone(),
742                use_self_hosted_auth: config.self_hosted_auth,
743                metrics: metrics.clone(),
744                internal: false,
745                active_connection_counter: active_connection_counter.clone(),
746                helm_chart_version: config.helm_chart_version.clone(),
747            });
748            mz_server_core::serve(ServeConfig {
749                conns: sql_conns,
750                server: sql_server,
751                // `environmentd` does not currently need to dynamically
752                // configure graceful termination behavior.
753                dyncfg: None,
754            })
755        });
756
757        // Launch internal SQL server.
758        task::spawn(|| "internal_sql_server", {
759            let internal_sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
760                label: "internal_pgwire",
761                tls: pgwire_tls.map(|mut pgwire_tls| {
762                    // Allow, but do not require, TLS connections on the internal
763                    // port. Some users of the internal SQL server do not support
764                    // TLS, while others require it, so we allow both.
765                    //
766                    // TODO(benesch): migrate all internal applications to TLS and
767                    // remove `TlsMode::Allow`.
768                    pgwire_tls.mode = mz_server_core::TlsMode::Allow;
769                    pgwire_tls
770                }),
771                adapter_client: adapter_client.clone(),
772                frontegg: None,
773                use_self_hosted_auth: config.self_hosted_auth_internal,
774                metrics: metrics.clone(),
775                internal: true,
776                active_connection_counter: active_connection_counter.clone(),
777                helm_chart_version: config.helm_chart_version.clone(),
778            });
779            mz_server_core::serve(ServeConfig {
780                conns: internal_sql_conns,
781                server: internal_sql_server,
782                // `environmentd` does not currently need to dynamically
783                // configure graceful termination behavior.
784                dyncfg: None,
785            })
786        });
787
788        // Launch HTTP server.
789        let http_metrics = http::Metrics::register_into(&config.metrics_registry, "mz_http");
790        task::spawn(|| "http_server", {
791            let http_server = HttpServer::new(HttpConfig {
792                source: "external",
793                tls: http_tls,
794                frontegg: config.frontegg.clone(),
795                adapter_client: adapter_client.clone(),
796                allowed_origin: config.cors_allowed_origin.clone(),
797                active_connection_counter: active_connection_counter.clone(),
798                helm_chart_version: config.helm_chart_version.clone(),
799                concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
800                metrics: http_metrics.clone(),
801            });
802            mz_server_core::serve(ServeConfig {
803                conns: http_conns,
804                server: http_server,
805                // `environmentd` does not currently need to dynamically
806                // configure graceful termination behavior.
807                dyncfg: None,
808            })
809        });
810
811        // Start telemetry reporting loop.
812        if let Some(segment_client) = segment_client {
813            telemetry::start_reporting(telemetry::Config {
814                segment_client,
815                adapter_client: adapter_client.clone(),
816                environment_id: config.environment_id,
817            });
818        }
819
820        // If system_parameter_sync_config and config_sync_loop_interval are present,
821        // start the system_parameter_sync loop.
822        if let Some(system_parameter_sync_config) = system_parameter_sync_config {
823            task::spawn(
824                || "system_parameter_sync",
825                AssertUnwindSafe(system_parameter_sync(
826                    system_parameter_sync_config,
827                    adapter_client,
828                    config.config_sync_loop_interval,
829                ))
830                .ore_catch_unwind(),
831            );
832        }
833
834        info!(
835            "startup: envd serve: postamble complete in {:?}",
836            serve_postamble_start.elapsed()
837        );
838        info!(
839            "startup: envd serve: complete in {:?}",
840            serve_start.elapsed()
841        );
842
843        Ok(Server {
844            sql_listener,
845            http_listener,
846            internal_sql_listener,
847            internal_http_listener,
848            _adapter_handle: adapter_handle,
849        })
850    }
851
852    pub fn sql_local_addr(&self) -> SocketAddr {
853        self.sql.0.local_addr()
854    }
855
856    pub fn http_local_addr(&self) -> SocketAddr {
857        self.http.0.local_addr()
858    }
859
860    pub fn internal_sql_local_addr(&self) -> SocketAddr {
861        self.internal_sql.0.local_addr()
862    }
863
864    pub fn internal_http_local_addr(&self) -> SocketAddr {
865        self.internal_http.0.local_addr()
866    }
867}
868
869fn get_ld_value<V>(
870    name: &str,
871    remote_system_parameters: &Option<BTreeMap<String, String>>,
872    parse: impl Fn(&str) -> Result<V, String>,
873) -> Result<Option<V>, anyhow::Error> {
874    remote_system_parameters
875        .as_ref()
876        .and_then(|params| params.get(name))
877        .map(|x| {
878            parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
879        })
880        .transpose()
881}
882
883/// A running `environmentd` server.
884pub struct Server {
885    // Drop order matters for these fields.
886    sql_listener: ListenerHandle,
887    http_listener: ListenerHandle,
888    internal_sql_listener: ListenerHandle,
889    internal_http_listener: ListenerHandle,
890    _adapter_handle: mz_adapter::Handle,
891}
892
893impl Server {
894    pub fn sql_local_addr(&self) -> SocketAddr {
895        self.sql_listener.local_addr()
896    }
897
898    pub fn http_local_addr(&self) -> SocketAddr {
899        self.http_listener.local_addr()
900    }
901
902    pub fn internal_sql_local_addr(&self) -> SocketAddr {
903        self.internal_sql_listener.local_addr()
904    }
905
906    pub fn internal_http_local_addr(&self) -> SocketAddr {
907        self.internal_http_listener.local_addr()
908    }
909}