mz_environmentd/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A SQL stream processor built on top of [timely dataflow] and
11//! [differential dataflow].
12//!
13//! [differential dataflow]: ../differential_dataflow/index.html
14//! [timely dataflow]: ../timely/index.html
15
16use std::collections::BTreeMap;
17use std::panic::AssertUnwindSafe;
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use std::{env, io};
23
24use anyhow::{Context, anyhow};
25use derivative::Derivative;
26use futures::FutureExt;
27use ipnet::IpNet;
28use mz_adapter::config::{
29    SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
30};
31use mz_adapter::webhook::WebhookConcurrencyLimiter;
32use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
33use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
34use mz_adapter_types::dyncfgs::{
35    ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT,
36    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
37};
38use mz_auth::password::Password;
39use mz_authenticator::Authenticator;
40use mz_build_info::{BuildInfo, build_info};
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_catalog::durable::BootstrapArgs;
43use mz_cloud_resources::CloudResourceController;
44use mz_controller::ControllerConfig;
45use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_ore::future::OreFutureExt;
48use mz_ore::metrics::MetricsRegistry;
49use mz_ore::now::NowFn;
50use mz_ore::tracing::TracingHandle;
51use mz_ore::url::SensitiveUrl;
52use mz_ore::{instrument, task};
53use mz_persist_client::cache::PersistClientCache;
54use mz_persist_client::usage::StorageUsageClient;
55use mz_pgwire::MetricsConfig;
56use mz_pgwire_common::ConnectionCounter;
57use mz_repr::strconv;
58use mz_secrets::SecretsController;
59use mz_server_core::listeners::{
60    AuthenticatorKind, HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
61};
62use mz_server_core::{
63    ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
64    TlsCertConfig, TlsMode,
65};
66use mz_sql::catalog::EnvironmentId;
67use mz_sql::session::vars::{Value, VarInput};
68use tokio::sync::oneshot;
69use tower_http::cors::AllowOrigin;
70use tracing::{Instrument, info, info_span};
71
72use crate::deployment::preflight::{PreflightInput, PreflightOutput};
73use crate::deployment::state::DeploymentState;
74use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
75
76pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
77
78mod deployment;
79pub mod environmentd;
80pub mod http;
81mod telemetry;
82#[cfg(feature = "test")]
83pub mod test_util;
84
85pub const BUILD_INFO: BuildInfo = build_info!();
86
87/// Configuration for an `environmentd` server.
88#[derive(Derivative)]
89#[derivative(Debug)]
90pub struct Config {
91    // === Special modes. ===
92    /// Whether to permit usage of unsafe features. This is never meant to run
93    /// in production.
94    pub unsafe_mode: bool,
95    /// Whether the environmentd is running on a local dev machine. This is
96    /// never meant to run in production or CI.
97    pub all_features: bool,
98
99    // === Connection options. ===
100    /// TLS encryption and authentication configuration.
101    pub tls: Option<TlsCertConfig>,
102    /// Trigger to attempt to reload TLS certififcates.
103    #[derivative(Debug = "ignore")]
104    pub tls_reload_certs: ReloadTrigger,
105    /// Password of the mz_system user.
106    pub external_login_password_mz_system: Option<Password>,
107    /// Frontegg JWT authentication configuration.
108    pub frontegg: Option<FronteggAuthenticator>,
109    /// Origins for which cross-origin resource sharing (CORS) for HTTP requests
110    /// is permitted.
111    pub cors_allowed_origin: AllowOrigin,
112    /// Public IP addresses which the cloud environment has configured for
113    /// egress.
114    pub egress_addresses: Vec<IpNet>,
115    /// The external host name to connect to the HTTP server of this
116    /// environment.
117    ///
118    /// Presently used to render webhook URLs for end users in notices and the
119    /// system catalog. Not used to establish connections directly.
120    pub http_host_name: Option<String>,
121    /// The URL of the Materialize console to proxy from the /internal-console
122    /// endpoint on the internal HTTP server.
123    pub internal_console_redirect_url: Option<String>,
124
125    // === Controller options. ===
126    /// Storage and compute controller configuration.
127    pub controller: ControllerConfig,
128    /// Secrets controller configuration.
129    pub secrets_controller: Arc<dyn SecretsController>,
130    /// VpcEndpoint controller configuration.
131    pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
132
133    // === Storage options. ===
134    /// The interval at which to collect storage usage information.
135    pub storage_usage_collection_interval: Duration,
136    /// How long to retain storage usage records for.
137    pub storage_usage_retention_period: Option<Duration>,
138
139    // === Adapter options. ===
140    /// Catalog configuration.
141    pub catalog_config: CatalogConfig,
142    /// Availability zones in which storage and compute resources may be
143    /// deployed.
144    pub availability_zones: Vec<String>,
145    /// A map from size name to resource allocations for cluster replicas.
146    pub cluster_replica_sizes: ClusterReplicaSizeMap,
147    /// The PostgreSQL URL for the Postgres-backed timestamp oracle.
148    pub timestamp_oracle_url: Option<SensitiveUrl>,
149    /// An API key for Segment. Enables export of audit events to Segment.
150    pub segment_api_key: Option<String>,
151    /// Whether the Segment client is being used on the client side
152    /// (rather than the server side).
153    pub segment_client_side: bool,
154    /// Only create a dummy segment client, only to get more testing coverage.
155    pub test_only_dummy_segment_client: bool,
156    /// An SDK key for LaunchDarkly. Enables system parameter synchronization
157    /// with LaunchDarkly.
158    pub launchdarkly_sdk_key: Option<String>,
159    /// An invertible map from system parameter names to LaunchDarkly feature
160    /// keys to use when propagating values from the latter to the former.
161    pub launchdarkly_key_map: BTreeMap<String, String>,
162    /// The duration at which the system parameter synchronization times out during startup.
163    pub config_sync_timeout: Duration,
164    /// The interval in seconds at which to synchronize system parameter values.
165    pub config_sync_loop_interval: Option<Duration>,
166    /// The path for file based config sync
167    pub config_sync_file_path: Option<PathBuf>,
168
169    // === Bootstrap options. ===
170    /// The cloud ID of this environment.
171    pub environment_id: EnvironmentId,
172    /// What role, if any, should be initially created with elevated privileges.
173    pub bootstrap_role: Option<String>,
174    /// The size of the default cluster replica if bootstrapping.
175    pub bootstrap_default_cluster_replica_size: String,
176    /// The default number of replicas if bootstrapping.
177    pub bootstrap_default_cluster_replication_factor: u32,
178    /// The config of the builtin system cluster replicas if bootstrapping.
179    pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
180    /// The config of the builtin catalog server cluster replicas if bootstrapping.
181    pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
182    /// The config of the builtin probe cluster replicas if bootstrapping.
183    pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
184    /// The config of the builtin support cluster replicas if bootstrapping.
185    pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
186    /// The config of the builtin analytics cluster replicas if bootstrapping.
187    pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
188    /// Values to set for system parameters, if those system parameters have not
189    /// already been set by the system user.
190    pub system_parameter_defaults: BTreeMap<String, String>,
191    /// Helm chart version
192    pub helm_chart_version: Option<String>,
193    /// Configuration managed by license keys
194    pub license_key: ValidatedLicenseKey,
195
196    // === AWS options. ===
197    /// The AWS account ID, which will be used to generate ARNs for
198    /// Materialize-controlled AWS resources.
199    pub aws_account_id: Option<String>,
200    /// Supported AWS PrivateLink availability zone ids.
201    pub aws_privatelink_availability_zones: Option<Vec<String>>,
202
203    // === Observability options. ===
204    /// The metrics registry to use.
205    pub metrics_registry: MetricsRegistry,
206    /// Handle to tracing.
207    pub tracing_handle: TracingHandle,
208
209    // === Testing options. ===
210    /// A now generation function for mocking time.
211    pub now: NowFn,
212}
213
214/// Configuration for the Catalog.
215#[derive(Debug, Clone)]
216pub struct CatalogConfig {
217    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
218    pub persist_clients: Arc<PersistClientCache>,
219    /// Persist catalog metrics.
220    pub metrics: Arc<mz_catalog::durable::Metrics>,
221}
222
223pub struct Listener<C> {
224    pub handle: ListenerHandle,
225    connection_stream: Pin<Box<dyn ConnectionStream>>,
226    config: C,
227}
228impl<C> Listener<C>
229where
230    C: ListenerConfig,
231{
232    /// Initializes network listeners for a later call to `serve` at the
233    /// specified addresses.
234    ///
235    /// Splitting this function out from `serve` has two benefits:
236    ///
237    ///   * It initializes network listeners as early as possible, so that the OS
238    ///     will queue incoming connections while the server is booting.
239    ///
240    ///   * It allows the caller to communicate with the server via the internal
241    ///     HTTP port while it is booting.
242    ///
243    async fn bind(config: C) -> Result<Self, io::Error> {
244        let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
245        Ok(Self {
246            handle,
247            connection_stream,
248            config,
249        })
250    }
251}
252
253impl Listener<SqlListenerConfig> {
254    #[instrument(name = "environmentd::serve_sql")]
255    pub async fn serve_sql(
256        self,
257        name: String,
258        active_connection_counter: ConnectionCounter,
259        tls_reloading_context: Option<ReloadingSslContext>,
260        frontegg: Option<FronteggAuthenticator>,
261        adapter_client: AdapterClient,
262        metrics: MetricsConfig,
263        helm_chart_version: Option<String>,
264    ) -> ListenerHandle {
265        let label: &'static str = Box::leak(name.into_boxed_str());
266        let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
267            context,
268            mode: if self.config.enable_tls {
269                TlsMode::Require
270            } else {
271                TlsMode::Allow
272            },
273        });
274        let authenticator = match self.config.authenticator_kind {
275            AuthenticatorKind::Frontegg => Authenticator::Frontegg(
276                frontegg.expect("Frontegg args are required with AuthenticatorKind::Frontegg"),
277            ),
278            AuthenticatorKind::Password => Authenticator::Password(adapter_client.clone()),
279            AuthenticatorKind::None => Authenticator::None,
280        };
281
282        task::spawn(|| format!("{}_sql_server", label), {
283            let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
284                label,
285                tls,
286                adapter_client,
287                authenticator,
288                metrics,
289                active_connection_counter,
290                helm_chart_version,
291                allowed_roles: self.config.allowed_roles,
292            });
293            mz_server_core::serve(ServeConfig {
294                conns: self.connection_stream,
295                server: sql_server,
296                // `environmentd` does not currently need to dynamically
297                // configure graceful termination behavior.
298                dyncfg: None,
299            })
300        });
301        self.handle
302    }
303}
304
305impl Listener<HttpListenerConfig> {
306    #[instrument(name = "environmentd::serve_http")]
307    pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
308        let task_name = format!("{}_http_server", &config.source);
309        task::spawn(|| task_name, {
310            let http_server = HttpServer::new(config);
311            mz_server_core::serve(ServeConfig {
312                conns: self.connection_stream,
313                server: http_server,
314                // `environmentd` does not currently need to dynamically
315                // configure graceful termination behavior.
316                dyncfg: None,
317            })
318        });
319        self.handle
320    }
321}
322
323pub struct Listeners {
324    pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
325    pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
326}
327
328impl Listeners {
329    pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
330        let mut sql = BTreeMap::new();
331        for (name, config) in config.sql {
332            sql.insert(name, Listener::bind(config).await?);
333        }
334
335        let mut http = BTreeMap::new();
336        for (name, config) in config.http {
337            http.insert(name, Listener::bind(config).await?);
338        }
339
340        Ok(Listeners { http, sql })
341    }
342
343    /// Starts an `environmentd` server.
344    ///
345    /// Returns a handle to the server once it is fully booted.
346    #[instrument(name = "environmentd::serve")]
347    pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
348        let serve_start = Instant::now();
349        info!("startup: envd serve: beginning");
350        info!("startup: envd serve: preamble beginning");
351
352        // Validate TLS configuration, if present.
353        let tls_reloading_context = match config.tls {
354            Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
355            None => None,
356        };
357
358        let active_connection_counter = ConnectionCounter::default();
359        let (deployment_state, deployment_state_handle) = DeploymentState::new();
360
361        // Launch HTTP servers.
362        //
363        // We start these servers before we've completed initialization so that
364        // metrics are accessible during initialization. Some HTTP
365        // endpoints require the adapter to be initialized; requests to those
366        // endpoints block until the adapter client is installed.
367        // One of these endpoints is /api/readyz,
368        // which assumes we're ready when the adapter client exists.
369        let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
370        let internal_route_config = Arc::new(InternalRouteConfig {
371            deployment_state_handle,
372            internal_console_redirect_url: config.internal_console_redirect_url,
373        });
374
375        let (authenticator_frontegg_tx, authenticator_frontegg_rx) = oneshot::channel();
376        let authenticator_frontegg_rx = authenticator_frontegg_rx.shared();
377        let (authenticator_password_tx, authenticator_password_rx) = oneshot::channel();
378        let authenticator_password_rx = authenticator_password_rx.shared();
379        let (authenticator_none_tx, authenticator_none_rx) = oneshot::channel();
380        let authenticator_none_rx = authenticator_none_rx.shared();
381
382        // We can only send the Frontegg and None variants immediately.
383        // The Password variant requires an adapter client.
384        if let Some(frontegg) = &config.frontegg {
385            authenticator_frontegg_tx
386                .send(Arc::new(Authenticator::Frontegg(frontegg.clone())))
387                .expect("rx known to be live");
388        }
389        authenticator_none_tx
390            .send(Arc::new(Authenticator::None))
391            .expect("rx known to be live");
392
393        let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
394        let adapter_client_rx = adapter_client_rx.shared();
395
396        let metrics_registry = config.metrics_registry.clone();
397        let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
398        let mut http_listener_handles = BTreeMap::new();
399        for (name, listener) in self.http {
400            let authenticator_kind = listener.config.authenticator_kind();
401            let authenticator_rx = match authenticator_kind {
402                AuthenticatorKind::Frontegg => authenticator_frontegg_rx.clone(),
403                AuthenticatorKind::Password => authenticator_password_rx.clone(),
404                AuthenticatorKind::None => authenticator_none_rx.clone(),
405            };
406            let source: &'static str = Box::leak(name.clone().into_boxed_str());
407            let tls = if listener.config.enable_tls() {
408                tls_reloading_context.clone()
409            } else {
410                None
411            };
412            let http_config = HttpConfig {
413                adapter_client_rx: adapter_client_rx.clone(),
414                active_connection_counter: active_connection_counter.clone(),
415                helm_chart_version: config.helm_chart_version.clone(),
416                source,
417                tls,
418                authenticator_kind,
419                authenticator_rx,
420                allowed_origin: config.cors_allowed_origin.clone(),
421                concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
422                metrics: metrics.clone(),
423                metrics_registry: metrics_registry.clone(),
424                allowed_roles: listener.config.allowed_roles(),
425                internal_route_config: Arc::clone(&internal_route_config),
426                routes_enabled: listener.config.routes.clone(),
427            };
428            http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
429        }
430
431        info!(
432            "startup: envd serve: preamble complete in {:?}",
433            serve_start.elapsed()
434        );
435
436        let catalog_init_start = Instant::now();
437        info!("startup: envd serve: catalog init beginning");
438
439        // Get the current timestamp so we can record when we booted.
440        let boot_ts = (config.now)().into();
441
442        let persist_client = config
443            .catalog_config
444            .persist_clients
445            .open(config.controller.persist_location.clone())
446            .await
447            .context("opening persist client")?;
448        let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
449            persist_client.clone(),
450            config.environment_id.organization_id(),
451            BUILD_INFO.semver_version(),
452            Some(config.controller.deploy_generation),
453            Arc::clone(&config.catalog_config.metrics),
454        )
455        .await?;
456
457        info!(
458            "startup: envd serve: catalog init complete in {:?}",
459            catalog_init_start.elapsed()
460        );
461
462        let system_param_sync_start = Instant::now();
463        info!("startup: envd serve: system parameter sync beginning");
464        // Initialize the system parameter frontend
465        let system_parameter_sync_config =
466            match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
467                (None, None) => None,
468                (None, Some(f)) => Some(SystemParameterSyncConfig::new(
469                    config.environment_id.clone(),
470                    &BUILD_INFO,
471                    &config.metrics_registry,
472                    config.launchdarkly_key_map,
473                    SystemParameterSyncClientConfig::File { path: f },
474                )),
475                (Some(key), None) => Some(SystemParameterSyncConfig::new(
476                    config.environment_id.clone(),
477                    &BUILD_INFO,
478                    &config.metrics_registry,
479                    config.launchdarkly_key_map,
480                    SystemParameterSyncClientConfig::LaunchDarkly {
481                        sdk_key: key,
482                        now_fn: config.now.clone(),
483                    },
484                )),
485
486                (Some(_), Some(_)) => {
487                    panic!("Cannot configure both file and Launchdarkly based config syncing")
488                }
489            };
490
491        let remote_system_parameters = load_remote_system_parameters(
492            &mut openable_adapter_storage,
493            system_parameter_sync_config.clone(),
494            config.config_sync_timeout,
495        )
496        .await?;
497        info!(
498            "startup: envd serve: system parameter sync complete in {:?}",
499            system_param_sync_start.elapsed()
500        );
501
502        let preflight_checks_start = Instant::now();
503        info!("startup: envd serve: preflight checks beginning");
504
505        // Determine whether we should perform a 0dt deployment.
506        let enable_0dt_deployment = {
507            let cli_default = config
508                .system_parameter_defaults
509                .get(ENABLE_0DT_DEPLOYMENT.name())
510                .map(|x| {
511                    strconv::parse_bool(x).map_err(|err| {
512                        anyhow!(
513                            "failed to parse default for {}: {}",
514                            ENABLE_0DT_DEPLOYMENT.name(),
515                            err
516                        )
517                    })
518                })
519                .transpose()?;
520            let compiled_default = ENABLE_0DT_DEPLOYMENT.default().clone();
521            let ld = get_ld_value("enable_0dt_deployment", &remote_system_parameters, |x| {
522                strconv::parse_bool(x).map_err(|x| x.to_string())
523            })?;
524            let catalog = openable_adapter_storage.get_enable_0dt_deployment().await?;
525            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
526            info!(
527                %computed,
528                ?ld,
529                ?catalog,
530                ?cli_default,
531                ?compiled_default,
532                "determined value for enable_0dt_deployment system parameter",
533            );
534            computed
535        };
536        // Determine the maximum wait time when doing a 0dt deployment.
537        let with_0dt_deployment_max_wait = {
538            let cli_default = config
539                .system_parameter_defaults
540                .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
541                .map(|x| {
542                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
543                        anyhow!(
544                            "failed to parse default for {}: {:?}",
545                            WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
546                            err
547                        )
548                    })
549                })
550                .transpose()?;
551            let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
552            let ld = get_ld_value(
553                WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
554                &remote_system_parameters,
555                |x| {
556                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
557                        format!(
558                            "failed to parse LD value {} for {}: {:?}",
559                            x,
560                            WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
561                            err
562                        )
563                    })
564                },
565            )?;
566            let catalog = openable_adapter_storage
567                .get_0dt_deployment_max_wait()
568                .await?;
569            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
570            info!(
571                ?computed,
572                ?ld,
573                ?catalog,
574                ?cli_default,
575                ?compiled_default,
576                "determined value for {} system parameter",
577                WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
578            );
579            computed
580        };
581        // Determine the DDL check interval when doing a 0dt deployment.
582        let with_0dt_deployment_ddl_check_interval = {
583            let cli_default = config
584                .system_parameter_defaults
585                .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
586                .map(|x| {
587                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
588                        anyhow!(
589                            "failed to parse default for {}: {:?}",
590                            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
591                            err
592                        )
593                    })
594                })
595                .transpose()?;
596            let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
597            let ld = get_ld_value(
598                WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
599                &remote_system_parameters,
600                |x| {
601                    Duration::parse(VarInput::Flat(x)).map_err(|err| {
602                        format!(
603                            "failed to parse LD value {} for {}: {:?}",
604                            x,
605                            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
606                            err
607                        )
608                    })
609                },
610            )?;
611            let catalog = openable_adapter_storage
612                .get_0dt_deployment_ddl_check_interval()
613                .await?;
614            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
615            info!(
616                ?computed,
617                ?ld,
618                ?catalog,
619                ?cli_default,
620                ?compiled_default,
621                "determined value for {} system parameter",
622                WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
623            );
624            computed
625        };
626
627        // Determine whether we should panic if we reach the maximum wait time
628        // without the preflight checks succeeding.
629        let enable_0dt_deployment_panic_after_timeout = {
630            let cli_default = config
631                .system_parameter_defaults
632                .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
633                .map(|x| {
634                    strconv::parse_bool(x).map_err(|err| {
635                        anyhow!(
636                            "failed to parse default for {}: {}",
637                            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
638                            err
639                        )
640                    })
641                })
642                .transpose()?;
643            let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
644            let ld = get_ld_value(
645                "enable_0dt_deployment_panic_after_timeout",
646                &remote_system_parameters,
647                |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
648            )?;
649            let catalog = openable_adapter_storage
650                .get_enable_0dt_deployment_panic_after_timeout()
651                .await?;
652            let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
653            info!(
654                %computed,
655                ?ld,
656                ?catalog,
657                ?cli_default,
658                ?compiled_default,
659                "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
660            );
661            computed
662        };
663
664        // Perform preflight checks.
665        //
666        // Preflight checks determine whether to boot in read-only mode or not.
667        let mut read_only = false;
668        let mut caught_up_trigger = None;
669        let bootstrap_args = BootstrapArgs {
670            default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
671            default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
672            bootstrap_role: config.bootstrap_role.clone(),
673            cluster_replica_size_map: config.cluster_replica_sizes.clone(),
674        };
675        let preflight_config = PreflightInput {
676            boot_ts,
677            environment_id: config.environment_id.clone(),
678            persist_client,
679            deploy_generation: config.controller.deploy_generation,
680            deployment_state: deployment_state.clone(),
681            openable_adapter_storage,
682            catalog_metrics: Arc::clone(&config.catalog_config.metrics),
683            caught_up_max_wait: with_0dt_deployment_max_wait,
684            panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
685            bootstrap_args,
686            ddl_check_interval: with_0dt_deployment_ddl_check_interval,
687        };
688        if enable_0dt_deployment {
689            PreflightOutput {
690                openable_adapter_storage,
691                read_only,
692                caught_up_trigger,
693            } = deployment::preflight::preflight_0dt(preflight_config).await?;
694        } else {
695            openable_adapter_storage =
696                deployment::preflight::preflight_legacy(preflight_config).await?;
697        };
698
699        info!(
700            "startup: envd serve: preflight checks complete in {:?}",
701            preflight_checks_start.elapsed()
702        );
703
704        let catalog_open_start = Instant::now();
705        info!("startup: envd serve: durable catalog open beginning");
706
707        let bootstrap_args = BootstrapArgs {
708            default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
709            default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
710            bootstrap_role: config.bootstrap_role,
711            cluster_replica_size_map: config.cluster_replica_sizes.clone(),
712        };
713
714        // Load the adapter durable storage.
715        let (adapter_storage, audit_logs_iterator) = if read_only {
716            // TODO: behavior of migrations when booting in savepoint mode is
717            // not well defined.
718            let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
719                .open_savepoint(boot_ts, &bootstrap_args)
720                .await?;
721            // In read-only mode, we intentionally do not call `set_is_leader`,
722            // because we are by definition not the leader if we are in
723            // read-only mode.
724
725            (adapter_storage, audit_logs_iterator)
726        } else {
727            let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
728                .open(boot_ts, &bootstrap_args)
729                .await?;
730
731            // Once we have successfully opened the adapter storage in
732            // read/write mode, we can announce we are the leader, as we've
733            // fenced out all other environments using the adapter storage.
734            deployment_state.set_is_leader();
735
736            (adapter_storage, audit_logs_iterator)
737        };
738
739        // Enable Persist compaction if we're not in read only.
740        if !read_only {
741            config.controller.persist_clients.cfg().enable_compaction();
742        }
743
744        info!(
745            "startup: envd serve: durable catalog open complete in {:?}",
746            catalog_open_start.elapsed()
747        );
748
749        let coord_init_start = Instant::now();
750        info!("startup: envd serve: coordinator init beginning");
751
752        if !config
753            .cluster_replica_sizes
754            .0
755            .contains_key(&config.bootstrap_default_cluster_replica_size)
756        {
757            return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
758        }
759        let envd_epoch = adapter_storage.epoch();
760
761        // Initialize storage usage client.
762        let storage_usage_client = StorageUsageClient::open(
763            config
764                .controller
765                .persist_clients
766                .open(config.controller.persist_location.clone())
767                .await
768                .context("opening storage usage client")?,
769        );
770
771        // Initialize adapter.
772        let segment_client = config.segment_api_key.map(|api_key| {
773            mz_segment::Client::new(mz_segment::Config {
774                api_key,
775                client_side: config.segment_client_side,
776            })
777        });
778        let connection_limiter = active_connection_counter.clone();
779        let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
780            connection_limiter.update_limit(limit);
781            connection_limiter.update_superuser_reserved(superuser_reserved);
782        });
783
784        let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
785            connection_context: config.controller.connection_context.clone(),
786            connection_limit_callback,
787            controller_config: config.controller,
788            controller_envd_epoch: envd_epoch,
789            storage: adapter_storage,
790            audit_logs_iterator,
791            timestamp_oracle_url: config.timestamp_oracle_url,
792            unsafe_mode: config.unsafe_mode,
793            all_features: config.all_features,
794            build_info: &BUILD_INFO,
795            environment_id: config.environment_id.clone(),
796            metrics_registry: config.metrics_registry.clone(),
797            now: config.now,
798            secrets_controller: config.secrets_controller,
799            cloud_resource_controller: config.cloud_resource_controller,
800            cluster_replica_sizes: config.cluster_replica_sizes,
801            builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
802            builtin_catalog_server_cluster_config: config
803                .bootstrap_builtin_catalog_server_cluster_config,
804            builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
805            builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
806            builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
807            availability_zones: config.availability_zones,
808            system_parameter_defaults: config.system_parameter_defaults,
809            storage_usage_client,
810            storage_usage_collection_interval: config.storage_usage_collection_interval,
811            storage_usage_retention_period: config.storage_usage_retention_period,
812            segment_client: segment_client.clone(),
813            egress_addresses: config.egress_addresses,
814            remote_system_parameters,
815            aws_account_id: config.aws_account_id,
816            aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
817            webhook_concurrency_limit: webhook_concurrency_limit.clone(),
818            http_host_name: config.http_host_name,
819            tracing_handle: config.tracing_handle,
820            read_only_controllers: read_only,
821            enable_0dt_deployment,
822            caught_up_trigger,
823            helm_chart_version: config.helm_chart_version.clone(),
824            license_key: config.license_key,
825            external_login_password_mz_system: config.external_login_password_mz_system,
826        })
827        .instrument(info_span!("adapter::serve"))
828        .await?;
829
830        info!(
831            "startup: envd serve: coordinator init complete in {:?}",
832            coord_init_start.elapsed()
833        );
834
835        let serve_postamble_start = Instant::now();
836        info!("startup: envd serve: postamble beginning");
837
838        // Send adapter client to the HTTP servers.
839        authenticator_password_tx
840            .send(Arc::new(Authenticator::Password(adapter_client.clone())))
841            .expect("rx known to be live");
842        adapter_client_tx
843            .send(adapter_client.clone())
844            .expect("internal HTTP server should not drop first");
845
846        let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
847
848        // Launch SQL server.
849        let mut sql_listener_handles = BTreeMap::new();
850        for (name, listener) in self.sql {
851            sql_listener_handles.insert(
852                name.clone(),
853                listener
854                    .serve_sql(
855                        name,
856                        active_connection_counter.clone(),
857                        tls_reloading_context.clone(),
858                        config.frontegg.clone(),
859                        adapter_client.clone(),
860                        metrics.clone(),
861                        config.helm_chart_version.clone(),
862                    )
863                    .await,
864            );
865        }
866
867        // Start telemetry reporting loop.
868        if let Some(segment_client) = segment_client {
869            telemetry::start_reporting(telemetry::Config {
870                segment_client,
871                adapter_client: adapter_client.clone(),
872                environment_id: config.environment_id,
873                report_interval: Duration::from_secs(3600),
874            });
875        } else if config.test_only_dummy_segment_client {
876            // We only have access to a segment client in production but we
877            // still want to exercise the telemetry reporting code to a degree.
878            // So we create a dummy client and report telemetry into the void.
879            // This way we at least run the telemetry queries the way a
880            // production environment would.
881            tracing::debug!("starting telemetry reporting with a dummy segment client");
882            let segment_client = mz_segment::Client::new_dummy_client();
883            telemetry::start_reporting(telemetry::Config {
884                segment_client,
885                adapter_client: adapter_client.clone(),
886                environment_id: config.environment_id,
887                report_interval: Duration::from_secs(180),
888            });
889        }
890
891        // If system_parameter_sync_config and config_sync_loop_interval are present,
892        // start the system_parameter_sync loop.
893        if let Some(system_parameter_sync_config) = system_parameter_sync_config {
894            task::spawn(
895                || "system_parameter_sync",
896                AssertUnwindSafe(system_parameter_sync(
897                    system_parameter_sync_config,
898                    adapter_client.clone(),
899                    config.config_sync_loop_interval,
900                ))
901                .ore_catch_unwind(),
902            );
903        }
904
905        info!(
906            "startup: envd serve: postamble complete in {:?}",
907            serve_postamble_start.elapsed()
908        );
909        info!(
910            "startup: envd serve: complete in {:?}",
911            serve_start.elapsed()
912        );
913
914        Ok(Server {
915            sql_listener_handles,
916            http_listener_handles,
917            _adapter_handle: adapter_handle,
918        })
919    }
920}
921
922fn get_ld_value<V>(
923    name: &str,
924    remote_system_parameters: &Option<BTreeMap<String, String>>,
925    parse: impl Fn(&str) -> Result<V, String>,
926) -> Result<Option<V>, anyhow::Error> {
927    remote_system_parameters
928        .as_ref()
929        .and_then(|params| params.get(name))
930        .map(|x| {
931            parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
932        })
933        .transpose()
934}
935
936/// A running `environmentd` server.
937pub struct Server {
938    // Drop order matters for these fields.
939    pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
940    pub http_listener_handles: BTreeMap<String, ListenerHandle>,
941    _adapter_handle: mz_adapter::Handle,
942}