1use std::collections::BTreeMap;
17use std::panic::AssertUnwindSafe;
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use std::{env, io};
23
24use anyhow::{Context, anyhow};
25use derivative::Derivative;
26use futures::FutureExt;
27use ipnet::IpNet;
28use mz_adapter::config::{
29 SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
30};
31use mz_adapter::webhook::WebhookConcurrencyLimiter;
32use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
33use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
34use mz_adapter_types::dyncfgs::{
35 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
36 WITH_0DT_DEPLOYMENT_MAX_WAIT,
37};
38use mz_auth::password::Password;
39use mz_authenticator::Authenticator;
40use mz_build_info::{BuildInfo, build_info};
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_catalog::durable::BootstrapArgs;
43use mz_cloud_resources::CloudResourceController;
44use mz_controller::ControllerConfig;
45use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_ore::future::OreFutureExt;
48use mz_ore::metrics::MetricsRegistry;
49use mz_ore::now::NowFn;
50use mz_ore::tracing::TracingHandle;
51use mz_ore::url::SensitiveUrl;
52use mz_ore::{instrument, task};
53use mz_persist_client::cache::PersistClientCache;
54use mz_persist_client::usage::StorageUsageClient;
55use mz_pgwire::MetricsConfig;
56use mz_pgwire_common::ConnectionCounter;
57use mz_repr::strconv;
58use mz_secrets::SecretsController;
59use mz_server_core::listeners::{
60 AuthenticatorKind, HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
61};
62use mz_server_core::{
63 ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
64 TlsCertConfig, TlsMode,
65};
66use mz_sql::catalog::EnvironmentId;
67use mz_sql::session::vars::{Value, VarInput};
68use tokio::sync::oneshot;
69use tower_http::cors::AllowOrigin;
70use tracing::{Instrument, info, info_span};
71
72use crate::deployment::preflight::{PreflightInput, PreflightOutput};
73use crate::deployment::state::DeploymentState;
74use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
75
76pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
77
78mod deployment;
79pub mod environmentd;
80pub mod http;
81mod telemetry;
82#[cfg(feature = "test")]
83pub mod test_util;
84
85pub const BUILD_INFO: BuildInfo = build_info!();
86
87#[derive(Derivative)]
89#[derivative(Debug)]
90pub struct Config {
91 pub unsafe_mode: bool,
95 pub all_features: bool,
98
99 pub tls: Option<TlsCertConfig>,
102 #[derivative(Debug = "ignore")]
104 pub tls_reload_certs: ReloadTrigger,
105 pub external_login_password_mz_system: Option<Password>,
107 pub frontegg: Option<FronteggAuthenticator>,
109 pub cors_allowed_origin: AllowOrigin,
112 pub egress_addresses: Vec<IpNet>,
115 pub http_host_name: Option<String>,
121 pub internal_console_redirect_url: Option<String>,
124
125 pub controller: ControllerConfig,
128 pub secrets_controller: Arc<dyn SecretsController>,
130 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
132
133 pub storage_usage_collection_interval: Duration,
136 pub storage_usage_retention_period: Option<Duration>,
138
139 pub catalog_config: CatalogConfig,
142 pub availability_zones: Vec<String>,
145 pub cluster_replica_sizes: ClusterReplicaSizeMap,
147 pub timestamp_oracle_url: Option<SensitiveUrl>,
149 pub segment_api_key: Option<String>,
151 pub segment_client_side: bool,
154 pub test_only_dummy_segment_client: bool,
156 pub launchdarkly_sdk_key: Option<String>,
159 pub launchdarkly_key_map: BTreeMap<String, String>,
162 pub config_sync_timeout: Duration,
164 pub config_sync_loop_interval: Option<Duration>,
166 pub config_sync_file_path: Option<PathBuf>,
168
169 pub environment_id: EnvironmentId,
172 pub bootstrap_role: Option<String>,
174 pub bootstrap_default_cluster_replica_size: String,
176 pub bootstrap_default_cluster_replication_factor: u32,
178 pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
180 pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
182 pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
184 pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
186 pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
188 pub system_parameter_defaults: BTreeMap<String, String>,
191 pub helm_chart_version: Option<String>,
193 pub license_key: ValidatedLicenseKey,
195
196 pub aws_account_id: Option<String>,
200 pub aws_privatelink_availability_zones: Option<Vec<String>>,
202
203 pub metrics_registry: MetricsRegistry,
206 pub tracing_handle: TracingHandle,
208
209 pub now: NowFn,
212 pub force_builtin_schema_migration: Option<String>,
215}
216
217#[derive(Debug, Clone)]
219pub struct CatalogConfig {
220 pub persist_clients: Arc<PersistClientCache>,
222 pub metrics: Arc<mz_catalog::durable::Metrics>,
224}
225
226pub struct Listener<C> {
227 pub handle: ListenerHandle,
228 connection_stream: Pin<Box<dyn ConnectionStream>>,
229 config: C,
230}
231impl<C> Listener<C>
232where
233 C: ListenerConfig,
234{
235 async fn bind(config: C) -> Result<Self, io::Error> {
247 let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
248 Ok(Self {
249 handle,
250 connection_stream,
251 config,
252 })
253 }
254}
255
256impl Listener<SqlListenerConfig> {
257 #[instrument(name = "environmentd::serve_sql")]
258 pub async fn serve_sql(
259 self,
260 name: String,
261 active_connection_counter: ConnectionCounter,
262 tls_reloading_context: Option<ReloadingSslContext>,
263 frontegg: Option<FronteggAuthenticator>,
264 adapter_client: AdapterClient,
265 metrics: MetricsConfig,
266 helm_chart_version: Option<String>,
267 ) -> ListenerHandle {
268 let label: &'static str = Box::leak(name.into_boxed_str());
269 let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
270 context,
271 mode: if self.config.enable_tls {
272 TlsMode::Require
273 } else {
274 TlsMode::Allow
275 },
276 });
277 let authenticator = match self.config.authenticator_kind {
278 AuthenticatorKind::Frontegg => Authenticator::Frontegg(
279 frontegg.expect("Frontegg args are required with AuthenticatorKind::Frontegg"),
280 ),
281 AuthenticatorKind::Password => Authenticator::Password(adapter_client.clone()),
282 AuthenticatorKind::Sasl => Authenticator::Sasl(adapter_client.clone()),
283 AuthenticatorKind::None => Authenticator::None,
284 };
285
286 task::spawn(|| format!("{}_sql_server", label), {
287 let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
288 label,
289 tls,
290 adapter_client,
291 authenticator,
292 metrics,
293 active_connection_counter,
294 helm_chart_version,
295 allowed_roles: self.config.allowed_roles,
296 });
297 mz_server_core::serve(ServeConfig {
298 conns: self.connection_stream,
299 server: sql_server,
300 dyncfg: None,
303 })
304 });
305 self.handle
306 }
307}
308
309impl Listener<HttpListenerConfig> {
310 #[instrument(name = "environmentd::serve_http")]
311 pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
312 let task_name = format!("{}_http_server", &config.source);
313 task::spawn(|| task_name, {
314 let http_server = HttpServer::new(config);
315 mz_server_core::serve(ServeConfig {
316 conns: self.connection_stream,
317 server: http_server,
318 dyncfg: None,
321 })
322 });
323 self.handle
324 }
325}
326
327pub struct Listeners {
328 pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
329 pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
330}
331
332impl Listeners {
333 pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
334 let mut sql = BTreeMap::new();
335 for (name, config) in config.sql {
336 sql.insert(name, Listener::bind(config).await?);
337 }
338
339 let mut http = BTreeMap::new();
340 for (name, config) in config.http {
341 http.insert(name, Listener::bind(config).await?);
342 }
343
344 Ok(Listeners { http, sql })
345 }
346
347 #[instrument(name = "environmentd::serve")]
351 pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
352 let serve_start = Instant::now();
353 info!("startup: envd serve: beginning");
354 info!("startup: envd serve: preamble beginning");
355
356 let tls_reloading_context = match config.tls {
358 Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
359 None => None,
360 };
361
362 let active_connection_counter = ConnectionCounter::default();
363 let (deployment_state, deployment_state_handle) = DeploymentState::new();
364
365 let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
374 let internal_route_config = Arc::new(InternalRouteConfig {
375 deployment_state_handle,
376 internal_console_redirect_url: config.internal_console_redirect_url,
377 });
378
379 let (authenticator_frontegg_tx, authenticator_frontegg_rx) = oneshot::channel();
380 let authenticator_frontegg_rx = authenticator_frontegg_rx.shared();
381 let (authenticator_password_tx, authenticator_password_rx) = oneshot::channel();
382 let authenticator_password_rx = authenticator_password_rx.shared();
383 let (authenticator_none_tx, authenticator_none_rx) = oneshot::channel();
384 let authenticator_none_rx = authenticator_none_rx.shared();
385
386 if let Some(frontegg) = &config.frontegg {
389 authenticator_frontegg_tx
390 .send(Arc::new(Authenticator::Frontegg(frontegg.clone())))
391 .expect("rx known to be live");
392 }
393 authenticator_none_tx
394 .send(Arc::new(Authenticator::None))
395 .expect("rx known to be live");
396
397 let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
398 let adapter_client_rx = adapter_client_rx.shared();
399
400 let metrics_registry = config.metrics_registry.clone();
401 let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
402 let mut http_listener_handles = BTreeMap::new();
403 for (name, listener) in self.http {
404 let authenticator_kind = listener.config.authenticator_kind();
405 let authenticator_rx = match authenticator_kind {
406 AuthenticatorKind::Frontegg => authenticator_frontegg_rx.clone(),
407 AuthenticatorKind::Password => authenticator_password_rx.clone(),
408 AuthenticatorKind::Sasl => authenticator_password_rx.clone(),
409 AuthenticatorKind::None => authenticator_none_rx.clone(),
410 };
411 let source: &'static str = Box::leak(name.clone().into_boxed_str());
412 let tls = if listener.config.enable_tls() {
413 tls_reloading_context.clone()
414 } else {
415 None
416 };
417 let http_config = HttpConfig {
418 adapter_client_rx: adapter_client_rx.clone(),
419 active_connection_counter: active_connection_counter.clone(),
420 helm_chart_version: config.helm_chart_version.clone(),
421 source,
422 tls,
423 authenticator_kind,
424 authenticator_rx,
425 allowed_origin: config.cors_allowed_origin.clone(),
426 concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
427 metrics: metrics.clone(),
428 metrics_registry: metrics_registry.clone(),
429 allowed_roles: listener.config.allowed_roles(),
430 internal_route_config: Arc::clone(&internal_route_config),
431 routes_enabled: listener.config.routes.clone(),
432 replica_http_locator: Arc::clone(&config.controller.replica_http_locator),
433 };
434 http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
435 }
436
437 info!(
438 "startup: envd serve: preamble complete in {:?}",
439 serve_start.elapsed()
440 );
441
442 let catalog_init_start = Instant::now();
443 info!("startup: envd serve: catalog init beginning");
444
445 let boot_ts = (config.now)().into();
447
448 let persist_client = config
449 .catalog_config
450 .persist_clients
451 .open(config.controller.persist_location.clone())
452 .await
453 .context("opening persist client")?;
454 let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
455 persist_client.clone(),
456 config.environment_id.organization_id(),
457 BUILD_INFO.semver_version(),
458 Some(config.controller.deploy_generation),
459 Arc::clone(&config.catalog_config.metrics),
460 )
461 .await?;
462
463 info!(
464 "startup: envd serve: catalog init complete in {:?}",
465 catalog_init_start.elapsed()
466 );
467
468 let system_param_sync_start = Instant::now();
469 info!("startup: envd serve: system parameter sync beginning");
470 let system_parameter_sync_config =
472 match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
473 (None, None) => None,
474 (None, Some(f)) => {
475 info!("Using config file path {:?}", f);
476 Some(SystemParameterSyncConfig::new(
477 config.environment_id.clone(),
478 &BUILD_INFO,
479 &config.metrics_registry,
480 config.launchdarkly_key_map,
481 SystemParameterSyncClientConfig::File { path: f },
482 ))
483 }
484 (Some(key), None) => Some(SystemParameterSyncConfig::new(
485 config.environment_id.clone(),
486 &BUILD_INFO,
487 &config.metrics_registry,
488 config.launchdarkly_key_map,
489 SystemParameterSyncClientConfig::LaunchDarkly {
490 sdk_key: key,
491 now_fn: config.now.clone(),
492 },
493 )),
494
495 (Some(_), Some(_)) => {
496 panic!("Cannot configure both file and Launchdarkly based config syncing")
497 }
498 };
499
500 let remote_system_parameters = load_remote_system_parameters(
501 &mut openable_adapter_storage,
502 system_parameter_sync_config.clone(),
503 config.config_sync_timeout,
504 )
505 .await?;
506 info!(
507 "startup: envd serve: system parameter sync complete in {:?}",
508 system_param_sync_start.elapsed()
509 );
510
511 let preflight_checks_start = Instant::now();
512 info!("startup: envd serve: preflight checks beginning");
513
514 let with_0dt_deployment_max_wait = {
516 let cli_default = config
517 .system_parameter_defaults
518 .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
519 .map(|x| {
520 Duration::parse(VarInput::Flat(x)).map_err(|err| {
521 anyhow!(
522 "failed to parse default for {}: {:?}",
523 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
524 err
525 )
526 })
527 })
528 .transpose()?;
529 let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
530 let ld = get_ld_value(
531 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
532 &remote_system_parameters,
533 |x| {
534 Duration::parse(VarInput::Flat(x)).map_err(|err| {
535 format!(
536 "failed to parse LD value {} for {}: {:?}",
537 x,
538 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
539 err
540 )
541 })
542 },
543 )?;
544 let catalog = openable_adapter_storage
545 .get_0dt_deployment_max_wait()
546 .await?;
547 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
548 info!(
549 ?computed,
550 ?ld,
551 ?catalog,
552 ?cli_default,
553 ?compiled_default,
554 "determined value for {} system parameter",
555 WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
556 );
557 computed
558 };
559 let with_0dt_deployment_ddl_check_interval = {
561 let cli_default = config
562 .system_parameter_defaults
563 .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
564 .map(|x| {
565 Duration::parse(VarInput::Flat(x)).map_err(|err| {
566 anyhow!(
567 "failed to parse default for {}: {:?}",
568 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
569 err
570 )
571 })
572 })
573 .transpose()?;
574 let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
575 let ld = get_ld_value(
576 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
577 &remote_system_parameters,
578 |x| {
579 Duration::parse(VarInput::Flat(x)).map_err(|err| {
580 format!(
581 "failed to parse LD value {} for {}: {:?}",
582 x,
583 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
584 err
585 )
586 })
587 },
588 )?;
589 let catalog = openable_adapter_storage
590 .get_0dt_deployment_ddl_check_interval()
591 .await?;
592 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
593 info!(
594 ?computed,
595 ?ld,
596 ?catalog,
597 ?cli_default,
598 ?compiled_default,
599 "determined value for {} system parameter",
600 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
601 );
602 computed
603 };
604
605 let enable_0dt_deployment_panic_after_timeout = {
608 let cli_default = config
609 .system_parameter_defaults
610 .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
611 .map(|x| {
612 strconv::parse_bool(x).map_err(|err| {
613 anyhow!(
614 "failed to parse default for {}: {}",
615 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
616 err
617 )
618 })
619 })
620 .transpose()?;
621 let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
622 let ld = get_ld_value(
623 "enable_0dt_deployment_panic_after_timeout",
624 &remote_system_parameters,
625 |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
626 )?;
627 let catalog = openable_adapter_storage
628 .get_enable_0dt_deployment_panic_after_timeout()
629 .await?;
630 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
631 info!(
632 %computed,
633 ?ld,
634 ?catalog,
635 ?cli_default,
636 ?compiled_default,
637 "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
638 );
639 computed
640 };
641
642 let bootstrap_args = BootstrapArgs {
646 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
647 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
648 bootstrap_role: config.bootstrap_role.clone(),
649 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
650 };
651 let preflight_config = PreflightInput {
652 boot_ts,
653 environment_id: config.environment_id.clone(),
654 persist_client,
655 deploy_generation: config.controller.deploy_generation,
656 deployment_state: deployment_state.clone(),
657 openable_adapter_storage,
658 catalog_metrics: Arc::clone(&config.catalog_config.metrics),
659 caught_up_max_wait: with_0dt_deployment_max_wait,
660 panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
661 bootstrap_args,
662 ddl_check_interval: with_0dt_deployment_ddl_check_interval,
663 };
664 let PreflightOutput {
665 openable_adapter_storage,
666 read_only,
667 caught_up_trigger,
668 } = deployment::preflight::preflight_0dt(preflight_config).await?;
669
670 info!(
671 "startup: envd serve: preflight checks complete in {:?}",
672 preflight_checks_start.elapsed()
673 );
674
675 let catalog_open_start = Instant::now();
676 info!("startup: envd serve: durable catalog open beginning");
677
678 let bootstrap_args = BootstrapArgs {
679 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
680 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
681 bootstrap_role: config.bootstrap_role,
682 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
683 };
684
685 let (adapter_storage, audit_logs_iterator) = if read_only {
687 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
690 .open_savepoint(boot_ts, &bootstrap_args)
691 .await?;
692 (adapter_storage, audit_logs_iterator)
697 } else {
698 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
699 .open(boot_ts, &bootstrap_args)
700 .await?;
701
702 deployment_state.set_is_leader();
706
707 (adapter_storage, audit_logs_iterator)
708 };
709
710 if !read_only {
712 config.controller.persist_clients.cfg().enable_compaction();
713 }
714
715 info!(
716 "startup: envd serve: durable catalog open complete in {:?}",
717 catalog_open_start.elapsed()
718 );
719
720 let coord_init_start = Instant::now();
721 info!("startup: envd serve: coordinator init beginning");
722
723 if !config
724 .cluster_replica_sizes
725 .0
726 .contains_key(&config.bootstrap_default_cluster_replica_size)
727 {
728 return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
729 }
730 let envd_epoch = adapter_storage.epoch();
731
732 let storage_usage_client = StorageUsageClient::open(
734 config
735 .controller
736 .persist_clients
737 .open(config.controller.persist_location.clone())
738 .await
739 .context("opening storage usage client")?,
740 );
741
742 let segment_client = config.segment_api_key.map(|api_key| {
744 mz_segment::Client::new(mz_segment::Config {
745 api_key,
746 client_side: config.segment_client_side,
747 })
748 });
749 let connection_limiter = active_connection_counter.clone();
750 let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
751 connection_limiter.update_limit(limit);
752 connection_limiter.update_superuser_reserved(superuser_reserved);
753 });
754
755 let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
756 connection_context: config.controller.connection_context.clone(),
757 connection_limit_callback,
758 controller_config: config.controller,
759 controller_envd_epoch: envd_epoch,
760 storage: adapter_storage,
761 audit_logs_iterator,
762 timestamp_oracle_url: config.timestamp_oracle_url,
763 unsafe_mode: config.unsafe_mode,
764 all_features: config.all_features,
765 build_info: &BUILD_INFO,
766 environment_id: config.environment_id.clone(),
767 metrics_registry: config.metrics_registry.clone(),
768 now: config.now,
769 secrets_controller: config.secrets_controller,
770 cloud_resource_controller: config.cloud_resource_controller,
771 cluster_replica_sizes: config.cluster_replica_sizes,
772 builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
773 builtin_catalog_server_cluster_config: config
774 .bootstrap_builtin_catalog_server_cluster_config,
775 builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
776 builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
777 builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
778 availability_zones: config.availability_zones,
779 system_parameter_defaults: config.system_parameter_defaults,
780 storage_usage_client,
781 storage_usage_collection_interval: config.storage_usage_collection_interval,
782 storage_usage_retention_period: config.storage_usage_retention_period,
783 segment_client: segment_client.clone(),
784 egress_addresses: config.egress_addresses,
785 remote_system_parameters,
786 aws_account_id: config.aws_account_id,
787 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
788 webhook_concurrency_limit: webhook_concurrency_limit.clone(),
789 http_host_name: config.http_host_name,
790 tracing_handle: config.tracing_handle,
791 read_only_controllers: read_only,
792 caught_up_trigger,
793 helm_chart_version: config.helm_chart_version.clone(),
794 license_key: config.license_key,
795 external_login_password_mz_system: config.external_login_password_mz_system,
796 force_builtin_schema_migration: config.force_builtin_schema_migration,
797 })
798 .instrument(info_span!("adapter::serve"))
799 .await?;
800
801 info!(
802 "startup: envd serve: coordinator init complete in {:?}",
803 coord_init_start.elapsed()
804 );
805
806 let serve_postamble_start = Instant::now();
807 info!("startup: envd serve: postamble beginning");
808
809 authenticator_password_tx
811 .send(Arc::new(Authenticator::Password(adapter_client.clone())))
812 .expect("rx known to be live");
813 adapter_client_tx
814 .send(adapter_client.clone())
815 .expect("internal HTTP server should not drop first");
816
817 let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
818
819 let mut sql_listener_handles = BTreeMap::new();
821 for (name, listener) in self.sql {
822 sql_listener_handles.insert(
823 name.clone(),
824 listener
825 .serve_sql(
826 name,
827 active_connection_counter.clone(),
828 tls_reloading_context.clone(),
829 config.frontegg.clone(),
830 adapter_client.clone(),
831 metrics.clone(),
832 config.helm_chart_version.clone(),
833 )
834 .await,
835 );
836 }
837
838 if let Some(segment_client) = segment_client {
840 telemetry::start_reporting(telemetry::Config {
841 segment_client,
842 adapter_client: adapter_client.clone(),
843 environment_id: config.environment_id,
844 report_interval: Duration::from_secs(3600),
845 });
846 } else if config.test_only_dummy_segment_client {
847 tracing::debug!("starting telemetry reporting with a dummy segment client");
853 let segment_client = mz_segment::Client::new_dummy_client();
854 telemetry::start_reporting(telemetry::Config {
855 segment_client,
856 adapter_client: adapter_client.clone(),
857 environment_id: config.environment_id,
858 report_interval: Duration::from_secs(180),
859 });
860 }
861
862 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
865 task::spawn(
866 || "system_parameter_sync",
867 AssertUnwindSafe(system_parameter_sync(
868 system_parameter_sync_config,
869 adapter_client.clone(),
870 config.config_sync_loop_interval,
871 ))
872 .ore_catch_unwind(),
873 );
874 }
875
876 info!(
877 "startup: envd serve: postamble complete in {:?}",
878 serve_postamble_start.elapsed()
879 );
880 info!(
881 "startup: envd serve: complete in {:?}",
882 serve_start.elapsed()
883 );
884
885 Ok(Server {
886 sql_listener_handles,
887 http_listener_handles,
888 _adapter_handle: adapter_handle,
889 })
890 }
891}
892
893fn get_ld_value<V>(
894 name: &str,
895 remote_system_parameters: &Option<BTreeMap<String, String>>,
896 parse: impl Fn(&str) -> Result<V, String>,
897) -> Result<Option<V>, anyhow::Error> {
898 remote_system_parameters
899 .as_ref()
900 .and_then(|params| params.get(name))
901 .map(|x| {
902 parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
903 })
904 .transpose()
905}
906
907pub struct Server {
909 pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
911 pub http_listener_handles: BTreeMap<String, ListenerHandle>,
912 _adapter_handle: mz_adapter::Handle,
913}