1use std::collections::BTreeMap;
17use std::panic::AssertUnwindSafe;
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use std::{env, io};
23
24use anyhow::{Context, anyhow};
25use derivative::Derivative;
26use futures::FutureExt;
27use ipnet::IpNet;
28use mz_adapter::config::{
29 SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
30};
31use mz_adapter::webhook::WebhookConcurrencyLimiter;
32use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
33use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
34use mz_adapter_types::dyncfgs::{
35 ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT,
36 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
37};
38use mz_auth::password::Password;
39use mz_authenticator::Authenticator;
40use mz_build_info::{BuildInfo, build_info};
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_catalog::durable::BootstrapArgs;
43use mz_cloud_resources::CloudResourceController;
44use mz_controller::ControllerConfig;
45use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_ore::future::OreFutureExt;
48use mz_ore::metrics::MetricsRegistry;
49use mz_ore::now::NowFn;
50use mz_ore::tracing::TracingHandle;
51use mz_ore::url::SensitiveUrl;
52use mz_ore::{instrument, task};
53use mz_persist_client::cache::PersistClientCache;
54use mz_persist_client::usage::StorageUsageClient;
55use mz_pgwire::MetricsConfig;
56use mz_pgwire_common::ConnectionCounter;
57use mz_repr::strconv;
58use mz_secrets::SecretsController;
59use mz_server_core::listeners::{
60 AuthenticatorKind, HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
61};
62use mz_server_core::{
63 ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
64 TlsCertConfig, TlsMode,
65};
66use mz_sql::catalog::EnvironmentId;
67use mz_sql::session::vars::{Value, VarInput};
68use tokio::sync::oneshot;
69use tower_http::cors::AllowOrigin;
70use tracing::{Instrument, info, info_span};
71
72use crate::deployment::preflight::{PreflightInput, PreflightOutput};
73use crate::deployment::state::DeploymentState;
74use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
75
76pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
77
78mod deployment;
79pub mod environmentd;
80pub mod http;
81mod telemetry;
82#[cfg(feature = "test")]
83pub mod test_util;
84
85pub const BUILD_INFO: BuildInfo = build_info!();
86
87#[derive(Derivative)]
89#[derivative(Debug)]
90pub struct Config {
91 pub unsafe_mode: bool,
95 pub all_features: bool,
98
99 pub tls: Option<TlsCertConfig>,
102 #[derivative(Debug = "ignore")]
104 pub tls_reload_certs: ReloadTrigger,
105 pub external_login_password_mz_system: Option<Password>,
107 pub frontegg: Option<FronteggAuthenticator>,
109 pub cors_allowed_origin: AllowOrigin,
112 pub egress_addresses: Vec<IpNet>,
115 pub http_host_name: Option<String>,
121 pub internal_console_redirect_url: Option<String>,
124
125 pub controller: ControllerConfig,
128 pub secrets_controller: Arc<dyn SecretsController>,
130 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
132
133 pub storage_usage_collection_interval: Duration,
136 pub storage_usage_retention_period: Option<Duration>,
138
139 pub catalog_config: CatalogConfig,
142 pub availability_zones: Vec<String>,
145 pub cluster_replica_sizes: ClusterReplicaSizeMap,
147 pub timestamp_oracle_url: Option<SensitiveUrl>,
149 pub segment_api_key: Option<String>,
151 pub segment_client_side: bool,
154 pub test_only_dummy_segment_client: bool,
156 pub launchdarkly_sdk_key: Option<String>,
159 pub launchdarkly_key_map: BTreeMap<String, String>,
162 pub config_sync_timeout: Duration,
164 pub config_sync_loop_interval: Option<Duration>,
166 pub config_sync_file_path: Option<PathBuf>,
168
169 pub environment_id: EnvironmentId,
172 pub bootstrap_role: Option<String>,
174 pub bootstrap_default_cluster_replica_size: String,
176 pub bootstrap_default_cluster_replication_factor: u32,
178 pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
180 pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
182 pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
184 pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
186 pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
188 pub system_parameter_defaults: BTreeMap<String, String>,
191 pub helm_chart_version: Option<String>,
193 pub license_key: ValidatedLicenseKey,
195
196 pub aws_account_id: Option<String>,
200 pub aws_privatelink_availability_zones: Option<Vec<String>>,
202
203 pub metrics_registry: MetricsRegistry,
206 pub tracing_handle: TracingHandle,
208
209 pub now: NowFn,
212}
213
214#[derive(Debug, Clone)]
216pub struct CatalogConfig {
217 pub persist_clients: Arc<PersistClientCache>,
219 pub metrics: Arc<mz_catalog::durable::Metrics>,
221}
222
223pub struct Listener<C> {
224 pub handle: ListenerHandle,
225 connection_stream: Pin<Box<dyn ConnectionStream>>,
226 config: C,
227}
228impl<C> Listener<C>
229where
230 C: ListenerConfig,
231{
232 async fn bind(config: C) -> Result<Self, io::Error> {
244 let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
245 Ok(Self {
246 handle,
247 connection_stream,
248 config,
249 })
250 }
251}
252
253impl Listener<SqlListenerConfig> {
254 #[instrument(name = "environmentd::serve_sql")]
255 pub async fn serve_sql(
256 self,
257 name: String,
258 active_connection_counter: ConnectionCounter,
259 tls_reloading_context: Option<ReloadingSslContext>,
260 frontegg: Option<FronteggAuthenticator>,
261 adapter_client: AdapterClient,
262 metrics: MetricsConfig,
263 helm_chart_version: Option<String>,
264 ) -> ListenerHandle {
265 let label: &'static str = Box::leak(name.into_boxed_str());
266 let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
267 context,
268 mode: if self.config.enable_tls {
269 TlsMode::Require
270 } else {
271 TlsMode::Allow
272 },
273 });
274 let authenticator = match self.config.authenticator_kind {
275 AuthenticatorKind::Frontegg => Authenticator::Frontegg(
276 frontegg.expect("Frontegg args are required with AuthenticatorKind::Frontegg"),
277 ),
278 AuthenticatorKind::Password => Authenticator::Password(adapter_client.clone()),
279 AuthenticatorKind::None => Authenticator::None,
280 };
281
282 task::spawn(|| format!("{}_sql_server", label), {
283 let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
284 label,
285 tls,
286 adapter_client,
287 authenticator,
288 metrics,
289 active_connection_counter,
290 helm_chart_version,
291 allowed_roles: self.config.allowed_roles,
292 });
293 mz_server_core::serve(ServeConfig {
294 conns: self.connection_stream,
295 server: sql_server,
296 dyncfg: None,
299 })
300 });
301 self.handle
302 }
303}
304
305impl Listener<HttpListenerConfig> {
306 #[instrument(name = "environmentd::serve_http")]
307 pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
308 let task_name = format!("{}_http_server", &config.source);
309 task::spawn(|| task_name, {
310 let http_server = HttpServer::new(config);
311 mz_server_core::serve(ServeConfig {
312 conns: self.connection_stream,
313 server: http_server,
314 dyncfg: None,
317 })
318 });
319 self.handle
320 }
321}
322
323pub struct Listeners {
324 pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
325 pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
326}
327
328impl Listeners {
329 pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
330 let mut sql = BTreeMap::new();
331 for (name, config) in config.sql {
332 sql.insert(name, Listener::bind(config).await?);
333 }
334
335 let mut http = BTreeMap::new();
336 for (name, config) in config.http {
337 http.insert(name, Listener::bind(config).await?);
338 }
339
340 Ok(Listeners { http, sql })
341 }
342
343 #[instrument(name = "environmentd::serve")]
347 pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
348 let serve_start = Instant::now();
349 info!("startup: envd serve: beginning");
350 info!("startup: envd serve: preamble beginning");
351
352 let tls_reloading_context = match config.tls {
354 Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
355 None => None,
356 };
357
358 let active_connection_counter = ConnectionCounter::default();
359 let (deployment_state, deployment_state_handle) = DeploymentState::new();
360
361 let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
370 let internal_route_config = Arc::new(InternalRouteConfig {
371 deployment_state_handle,
372 internal_console_redirect_url: config.internal_console_redirect_url,
373 });
374
375 let (authenticator_frontegg_tx, authenticator_frontegg_rx) = oneshot::channel();
376 let authenticator_frontegg_rx = authenticator_frontegg_rx.shared();
377 let (authenticator_password_tx, authenticator_password_rx) = oneshot::channel();
378 let authenticator_password_rx = authenticator_password_rx.shared();
379 let (authenticator_none_tx, authenticator_none_rx) = oneshot::channel();
380 let authenticator_none_rx = authenticator_none_rx.shared();
381
382 if let Some(frontegg) = &config.frontegg {
385 authenticator_frontegg_tx
386 .send(Arc::new(Authenticator::Frontegg(frontegg.clone())))
387 .expect("rx known to be live");
388 }
389 authenticator_none_tx
390 .send(Arc::new(Authenticator::None))
391 .expect("rx known to be live");
392
393 let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
394 let adapter_client_rx = adapter_client_rx.shared();
395
396 let metrics_registry = config.metrics_registry.clone();
397 let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
398 let mut http_listener_handles = BTreeMap::new();
399 for (name, listener) in self.http {
400 let authenticator_kind = listener.config.authenticator_kind();
401 let authenticator_rx = match authenticator_kind {
402 AuthenticatorKind::Frontegg => authenticator_frontegg_rx.clone(),
403 AuthenticatorKind::Password => authenticator_password_rx.clone(),
404 AuthenticatorKind::None => authenticator_none_rx.clone(),
405 };
406 let source: &'static str = Box::leak(name.clone().into_boxed_str());
407 let tls = if listener.config.enable_tls() {
408 tls_reloading_context.clone()
409 } else {
410 None
411 };
412 let http_config = HttpConfig {
413 adapter_client_rx: adapter_client_rx.clone(),
414 active_connection_counter: active_connection_counter.clone(),
415 helm_chart_version: config.helm_chart_version.clone(),
416 source,
417 tls,
418 authenticator_kind,
419 authenticator_rx,
420 allowed_origin: config.cors_allowed_origin.clone(),
421 concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
422 metrics: metrics.clone(),
423 metrics_registry: metrics_registry.clone(),
424 allowed_roles: listener.config.allowed_roles(),
425 internal_route_config: Arc::clone(&internal_route_config),
426 routes_enabled: listener.config.routes.clone(),
427 };
428 http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
429 }
430
431 info!(
432 "startup: envd serve: preamble complete in {:?}",
433 serve_start.elapsed()
434 );
435
436 let catalog_init_start = Instant::now();
437 info!("startup: envd serve: catalog init beginning");
438
439 let boot_ts = (config.now)().into();
441
442 let persist_client = config
443 .catalog_config
444 .persist_clients
445 .open(config.controller.persist_location.clone())
446 .await
447 .context("opening persist client")?;
448 let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
449 persist_client.clone(),
450 config.environment_id.organization_id(),
451 BUILD_INFO.semver_version(),
452 Some(config.controller.deploy_generation),
453 Arc::clone(&config.catalog_config.metrics),
454 )
455 .await?;
456
457 info!(
458 "startup: envd serve: catalog init complete in {:?}",
459 catalog_init_start.elapsed()
460 );
461
462 let system_param_sync_start = Instant::now();
463 info!("startup: envd serve: system parameter sync beginning");
464 let system_parameter_sync_config =
466 match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
467 (None, None) => None,
468 (None, Some(f)) => Some(SystemParameterSyncConfig::new(
469 config.environment_id.clone(),
470 &BUILD_INFO,
471 &config.metrics_registry,
472 config.launchdarkly_key_map,
473 SystemParameterSyncClientConfig::File { path: f },
474 )),
475 (Some(key), None) => Some(SystemParameterSyncConfig::new(
476 config.environment_id.clone(),
477 &BUILD_INFO,
478 &config.metrics_registry,
479 config.launchdarkly_key_map,
480 SystemParameterSyncClientConfig::LaunchDarkly {
481 sdk_key: key,
482 now_fn: config.now.clone(),
483 },
484 )),
485
486 (Some(_), Some(_)) => {
487 panic!("Cannot configure both file and Launchdarkly based config syncing")
488 }
489 };
490
491 let remote_system_parameters = load_remote_system_parameters(
492 &mut openable_adapter_storage,
493 system_parameter_sync_config.clone(),
494 config.config_sync_timeout,
495 )
496 .await?;
497 info!(
498 "startup: envd serve: system parameter sync complete in {:?}",
499 system_param_sync_start.elapsed()
500 );
501
502 let preflight_checks_start = Instant::now();
503 info!("startup: envd serve: preflight checks beginning");
504
505 let enable_0dt_deployment = {
507 let cli_default = config
508 .system_parameter_defaults
509 .get(ENABLE_0DT_DEPLOYMENT.name())
510 .map(|x| {
511 strconv::parse_bool(x).map_err(|err| {
512 anyhow!(
513 "failed to parse default for {}: {}",
514 ENABLE_0DT_DEPLOYMENT.name(),
515 err
516 )
517 })
518 })
519 .transpose()?;
520 let compiled_default = ENABLE_0DT_DEPLOYMENT.default().clone();
521 let ld = get_ld_value("enable_0dt_deployment", &remote_system_parameters, |x| {
522 strconv::parse_bool(x).map_err(|x| x.to_string())
523 })?;
524 let catalog = openable_adapter_storage.get_enable_0dt_deployment().await?;
525 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
526 info!(
527 %computed,
528 ?ld,
529 ?catalog,
530 ?cli_default,
531 ?compiled_default,
532 "determined value for enable_0dt_deployment system parameter",
533 );
534 computed
535 };
536 let with_0dt_deployment_max_wait = {
538 let cli_default = config
539 .system_parameter_defaults
540 .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
541 .map(|x| {
542 Duration::parse(VarInput::Flat(x)).map_err(|err| {
543 anyhow!(
544 "failed to parse default for {}: {:?}",
545 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
546 err
547 )
548 })
549 })
550 .transpose()?;
551 let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
552 let ld = get_ld_value(
553 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
554 &remote_system_parameters,
555 |x| {
556 Duration::parse(VarInput::Flat(x)).map_err(|err| {
557 format!(
558 "failed to parse LD value {} for {}: {:?}",
559 x,
560 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
561 err
562 )
563 })
564 },
565 )?;
566 let catalog = openable_adapter_storage
567 .get_0dt_deployment_max_wait()
568 .await?;
569 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
570 info!(
571 ?computed,
572 ?ld,
573 ?catalog,
574 ?cli_default,
575 ?compiled_default,
576 "determined value for {} system parameter",
577 WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
578 );
579 computed
580 };
581 let with_0dt_deployment_ddl_check_interval = {
583 let cli_default = config
584 .system_parameter_defaults
585 .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
586 .map(|x| {
587 Duration::parse(VarInput::Flat(x)).map_err(|err| {
588 anyhow!(
589 "failed to parse default for {}: {:?}",
590 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
591 err
592 )
593 })
594 })
595 .transpose()?;
596 let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
597 let ld = get_ld_value(
598 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
599 &remote_system_parameters,
600 |x| {
601 Duration::parse(VarInput::Flat(x)).map_err(|err| {
602 format!(
603 "failed to parse LD value {} for {}: {:?}",
604 x,
605 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
606 err
607 )
608 })
609 },
610 )?;
611 let catalog = openable_adapter_storage
612 .get_0dt_deployment_ddl_check_interval()
613 .await?;
614 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
615 info!(
616 ?computed,
617 ?ld,
618 ?catalog,
619 ?cli_default,
620 ?compiled_default,
621 "determined value for {} system parameter",
622 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
623 );
624 computed
625 };
626
627 let enable_0dt_deployment_panic_after_timeout = {
630 let cli_default = config
631 .system_parameter_defaults
632 .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
633 .map(|x| {
634 strconv::parse_bool(x).map_err(|err| {
635 anyhow!(
636 "failed to parse default for {}: {}",
637 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
638 err
639 )
640 })
641 })
642 .transpose()?;
643 let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
644 let ld = get_ld_value(
645 "enable_0dt_deployment_panic_after_timeout",
646 &remote_system_parameters,
647 |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
648 )?;
649 let catalog = openable_adapter_storage
650 .get_enable_0dt_deployment_panic_after_timeout()
651 .await?;
652 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
653 info!(
654 %computed,
655 ?ld,
656 ?catalog,
657 ?cli_default,
658 ?compiled_default,
659 "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
660 );
661 computed
662 };
663
664 let mut read_only = false;
668 let mut caught_up_trigger = None;
669 let bootstrap_args = BootstrapArgs {
670 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
671 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
672 bootstrap_role: config.bootstrap_role.clone(),
673 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
674 };
675 let preflight_config = PreflightInput {
676 boot_ts,
677 environment_id: config.environment_id.clone(),
678 persist_client,
679 deploy_generation: config.controller.deploy_generation,
680 deployment_state: deployment_state.clone(),
681 openable_adapter_storage,
682 catalog_metrics: Arc::clone(&config.catalog_config.metrics),
683 caught_up_max_wait: with_0dt_deployment_max_wait,
684 panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
685 bootstrap_args,
686 ddl_check_interval: with_0dt_deployment_ddl_check_interval,
687 };
688 if enable_0dt_deployment {
689 PreflightOutput {
690 openable_adapter_storage,
691 read_only,
692 caught_up_trigger,
693 } = deployment::preflight::preflight_0dt(preflight_config).await?;
694 } else {
695 openable_adapter_storage =
696 deployment::preflight::preflight_legacy(preflight_config).await?;
697 };
698
699 info!(
700 "startup: envd serve: preflight checks complete in {:?}",
701 preflight_checks_start.elapsed()
702 );
703
704 let catalog_open_start = Instant::now();
705 info!("startup: envd serve: durable catalog open beginning");
706
707 let bootstrap_args = BootstrapArgs {
708 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
709 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
710 bootstrap_role: config.bootstrap_role,
711 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
712 };
713
714 let (adapter_storage, audit_logs_iterator) = if read_only {
716 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
719 .open_savepoint(boot_ts, &bootstrap_args)
720 .await?;
721 (adapter_storage, audit_logs_iterator)
726 } else {
727 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
728 .open(boot_ts, &bootstrap_args)
729 .await?;
730
731 deployment_state.set_is_leader();
735
736 (adapter_storage, audit_logs_iterator)
737 };
738
739 if !read_only {
741 config.controller.persist_clients.cfg().enable_compaction();
742 }
743
744 info!(
745 "startup: envd serve: durable catalog open complete in {:?}",
746 catalog_open_start.elapsed()
747 );
748
749 let coord_init_start = Instant::now();
750 info!("startup: envd serve: coordinator init beginning");
751
752 if !config
753 .cluster_replica_sizes
754 .0
755 .contains_key(&config.bootstrap_default_cluster_replica_size)
756 {
757 return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
758 }
759 let envd_epoch = adapter_storage.epoch();
760
761 let storage_usage_client = StorageUsageClient::open(
763 config
764 .controller
765 .persist_clients
766 .open(config.controller.persist_location.clone())
767 .await
768 .context("opening storage usage client")?,
769 );
770
771 let segment_client = config.segment_api_key.map(|api_key| {
773 mz_segment::Client::new(mz_segment::Config {
774 api_key,
775 client_side: config.segment_client_side,
776 })
777 });
778 let connection_limiter = active_connection_counter.clone();
779 let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
780 connection_limiter.update_limit(limit);
781 connection_limiter.update_superuser_reserved(superuser_reserved);
782 });
783
784 let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
785 connection_context: config.controller.connection_context.clone(),
786 connection_limit_callback,
787 controller_config: config.controller,
788 controller_envd_epoch: envd_epoch,
789 storage: adapter_storage,
790 audit_logs_iterator,
791 timestamp_oracle_url: config.timestamp_oracle_url,
792 unsafe_mode: config.unsafe_mode,
793 all_features: config.all_features,
794 build_info: &BUILD_INFO,
795 environment_id: config.environment_id.clone(),
796 metrics_registry: config.metrics_registry.clone(),
797 now: config.now,
798 secrets_controller: config.secrets_controller,
799 cloud_resource_controller: config.cloud_resource_controller,
800 cluster_replica_sizes: config.cluster_replica_sizes,
801 builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
802 builtin_catalog_server_cluster_config: config
803 .bootstrap_builtin_catalog_server_cluster_config,
804 builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
805 builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
806 builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
807 availability_zones: config.availability_zones,
808 system_parameter_defaults: config.system_parameter_defaults,
809 storage_usage_client,
810 storage_usage_collection_interval: config.storage_usage_collection_interval,
811 storage_usage_retention_period: config.storage_usage_retention_period,
812 segment_client: segment_client.clone(),
813 egress_addresses: config.egress_addresses,
814 remote_system_parameters,
815 aws_account_id: config.aws_account_id,
816 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
817 webhook_concurrency_limit: webhook_concurrency_limit.clone(),
818 http_host_name: config.http_host_name,
819 tracing_handle: config.tracing_handle,
820 read_only_controllers: read_only,
821 enable_0dt_deployment,
822 caught_up_trigger,
823 helm_chart_version: config.helm_chart_version.clone(),
824 license_key: config.license_key,
825 external_login_password_mz_system: config.external_login_password_mz_system,
826 })
827 .instrument(info_span!("adapter::serve"))
828 .await?;
829
830 info!(
831 "startup: envd serve: coordinator init complete in {:?}",
832 coord_init_start.elapsed()
833 );
834
835 let serve_postamble_start = Instant::now();
836 info!("startup: envd serve: postamble beginning");
837
838 authenticator_password_tx
840 .send(Arc::new(Authenticator::Password(adapter_client.clone())))
841 .expect("rx known to be live");
842 adapter_client_tx
843 .send(adapter_client.clone())
844 .expect("internal HTTP server should not drop first");
845
846 let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
847
848 let mut sql_listener_handles = BTreeMap::new();
850 for (name, listener) in self.sql {
851 sql_listener_handles.insert(
852 name.clone(),
853 listener
854 .serve_sql(
855 name,
856 active_connection_counter.clone(),
857 tls_reloading_context.clone(),
858 config.frontegg.clone(),
859 adapter_client.clone(),
860 metrics.clone(),
861 config.helm_chart_version.clone(),
862 )
863 .await,
864 );
865 }
866
867 if let Some(segment_client) = segment_client {
869 telemetry::start_reporting(telemetry::Config {
870 segment_client,
871 adapter_client: adapter_client.clone(),
872 environment_id: config.environment_id,
873 report_interval: Duration::from_secs(3600),
874 });
875 } else if config.test_only_dummy_segment_client {
876 tracing::debug!("starting telemetry reporting with a dummy segment client");
882 let segment_client = mz_segment::Client::new_dummy_client();
883 telemetry::start_reporting(telemetry::Config {
884 segment_client,
885 adapter_client: adapter_client.clone(),
886 environment_id: config.environment_id,
887 report_interval: Duration::from_secs(180),
888 });
889 }
890
891 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
894 task::spawn(
895 || "system_parameter_sync",
896 AssertUnwindSafe(system_parameter_sync(
897 system_parameter_sync_config,
898 adapter_client.clone(),
899 config.config_sync_loop_interval,
900 ))
901 .ore_catch_unwind(),
902 );
903 }
904
905 info!(
906 "startup: envd serve: postamble complete in {:?}",
907 serve_postamble_start.elapsed()
908 );
909 info!(
910 "startup: envd serve: complete in {:?}",
911 serve_start.elapsed()
912 );
913
914 Ok(Server {
915 sql_listener_handles,
916 http_listener_handles,
917 _adapter_handle: adapter_handle,
918 })
919 }
920}
921
922fn get_ld_value<V>(
923 name: &str,
924 remote_system_parameters: &Option<BTreeMap<String, String>>,
925 parse: impl Fn(&str) -> Result<V, String>,
926) -> Result<Option<V>, anyhow::Error> {
927 remote_system_parameters
928 .as_ref()
929 .and_then(|params| params.get(name))
930 .map(|x| {
931 parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
932 })
933 .transpose()
934}
935
936pub struct Server {
938 pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
940 pub http_listener_handles: BTreeMap<String, ListenerHandle>,
941 _adapter_handle: mz_adapter::Handle,
942}