1use std::collections::BTreeMap;
17use std::panic::AssertUnwindSafe;
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use std::{env, io};
23
24use anyhow::{Context, anyhow};
25use derivative::Derivative;
26use futures::FutureExt;
27use ipnet::IpNet;
28use mz_adapter::config::{
29 SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
30};
31use mz_adapter::webhook::WebhookConcurrencyLimiter;
32use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
33use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
34use mz_adapter_types::dyncfgs::{
35 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
36 WITH_0DT_DEPLOYMENT_MAX_WAIT,
37};
38use mz_auth::password::Password;
39use mz_authenticator::{Authenticator, GenericOidcAuthenticator};
40use mz_build_info::{BuildInfo, build_info};
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_catalog::durable::BootstrapArgs;
43use mz_cloud_resources::CloudResourceController;
44use mz_controller::ControllerConfig;
45use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_ore::future::OreFutureExt;
48use mz_ore::metrics::MetricsRegistry;
49use mz_ore::now::NowFn;
50use mz_ore::tracing::TracingHandle;
51use mz_ore::url::SensitiveUrl;
52use mz_ore::{instrument, task};
53use mz_persist_client::cache::PersistClientCache;
54use mz_persist_client::usage::StorageUsageClient;
55use mz_pgwire::MetricsConfig;
56use mz_pgwire_common::ConnectionCounter;
57use mz_repr::strconv;
58use mz_secrets::SecretsController;
59use mz_server_core::listeners::{
60 AuthenticatorKind, HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
61};
62use mz_server_core::{
63 ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
64 TlsCertConfig, TlsMode,
65};
66use mz_sql::catalog::EnvironmentId;
67use mz_sql::session::vars::{Value, VarInput};
68use tokio::sync::oneshot;
69use tower_http::cors::AllowOrigin;
70use tracing::{Instrument, info, info_span};
71
72use crate::deployment::preflight::{PreflightInput, PreflightOutput};
73use crate::deployment::state::DeploymentState;
74use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
75
76pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
77
78mod deployment;
79pub mod environmentd;
80pub mod http;
81mod telemetry;
82#[cfg(feature = "test")]
83pub mod test_util;
84
85pub const BUILD_INFO: BuildInfo = build_info!();
86
87#[derive(Derivative)]
89#[derivative(Debug)]
90pub struct Config {
91 pub unsafe_mode: bool,
95 pub all_features: bool,
98
99 pub tls: Option<TlsCertConfig>,
102 #[derivative(Debug = "ignore")]
104 pub tls_reload_certs: ReloadTrigger,
105 pub external_login_password_mz_system: Option<Password>,
107 pub frontegg: Option<FronteggAuthenticator>,
109 pub cors_allowed_origin: AllowOrigin,
112 pub egress_addresses: Vec<IpNet>,
115 pub http_host_name: Option<String>,
121 pub internal_console_redirect_url: Option<String>,
124
125 pub controller: ControllerConfig,
128 pub secrets_controller: Arc<dyn SecretsController>,
130 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
132
133 pub storage_usage_collection_interval: Duration,
136 pub storage_usage_retention_period: Option<Duration>,
138
139 pub catalog_config: CatalogConfig,
142 pub availability_zones: Vec<String>,
145 pub cluster_replica_sizes: ClusterReplicaSizeMap,
147 pub timestamp_oracle_url: Option<SensitiveUrl>,
149 pub segment_api_key: Option<String>,
151 pub segment_client_side: bool,
154 pub test_only_dummy_segment_client: bool,
156 pub launchdarkly_sdk_key: Option<String>,
159 pub launchdarkly_key_map: BTreeMap<String, String>,
162 pub config_sync_timeout: Duration,
164 pub config_sync_loop_interval: Option<Duration>,
166 pub config_sync_file_path: Option<PathBuf>,
168
169 pub environment_id: EnvironmentId,
172 pub bootstrap_role: Option<String>,
174 pub bootstrap_default_cluster_replica_size: String,
176 pub bootstrap_default_cluster_replication_factor: u32,
178 pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
180 pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
182 pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
184 pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
186 pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
188 pub system_parameter_defaults: BTreeMap<String, String>,
191 pub helm_chart_version: Option<String>,
193 pub license_key: ValidatedLicenseKey,
195
196 pub aws_account_id: Option<String>,
200 pub aws_privatelink_availability_zones: Option<Vec<String>>,
202
203 pub metrics_registry: MetricsRegistry,
206 pub tracing_handle: TracingHandle,
208
209 pub now: NowFn,
212 pub force_builtin_schema_migration: Option<String>,
215}
216
217#[derive(Debug, Clone)]
219pub struct CatalogConfig {
220 pub persist_clients: Arc<PersistClientCache>,
222 pub metrics: Arc<mz_catalog::durable::Metrics>,
224}
225
226pub struct Listener<C> {
227 pub handle: ListenerHandle,
228 connection_stream: Pin<Box<dyn ConnectionStream>>,
229 config: C,
230}
231impl<C> Listener<C>
232where
233 C: ListenerConfig,
234{
235 async fn bind(config: C) -> Result<Self, io::Error> {
247 let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
248 Ok(Self {
249 handle,
250 connection_stream,
251 config,
252 })
253 }
254}
255
256impl Listener<SqlListenerConfig> {
257 #[instrument(name = "environmentd::serve_sql")]
258 pub async fn serve_sql(
259 self,
260 name: String,
261 active_connection_counter: ConnectionCounter,
262 tls_reloading_context: Option<ReloadingSslContext>,
263 frontegg: Option<FronteggAuthenticator>,
264 adapter_client: AdapterClient,
265 oidc: GenericOidcAuthenticator,
266 metrics: MetricsConfig,
267 helm_chart_version: Option<String>,
268 ) -> ListenerHandle {
269 let label: &'static str = Box::leak(name.into_boxed_str());
270 let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
271 context,
272 mode: if self.config.enable_tls {
273 TlsMode::Require
274 } else {
275 TlsMode::Allow
276 },
277 });
278
279 task::spawn(|| format!("{}_sql_server", label), {
280 let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
281 label,
282 tls,
283 adapter_client,
284 authenticator_kind: self.config.authenticator_kind,
285 frontegg,
286 oidc,
287 metrics,
288 active_connection_counter,
289 helm_chart_version,
290 allowed_roles: self.config.allowed_roles,
291 });
292 mz_server_core::serve(ServeConfig {
293 conns: self.connection_stream,
294 server: sql_server,
295 dyncfg: None,
298 })
299 });
300 self.handle
301 }
302}
303
304impl Listener<HttpListenerConfig> {
305 #[instrument(name = "environmentd::serve_http")]
306 pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
307 let task_name = format!("{}_http_server", &config.source);
308 task::spawn(|| task_name, {
309 let http_server = HttpServer::new(config);
310 mz_server_core::serve(ServeConfig {
311 conns: self.connection_stream,
312 server: http_server,
313 dyncfg: None,
316 })
317 });
318 self.handle
319 }
320}
321
322pub struct Listeners {
323 pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
324 pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
325}
326
327impl Listeners {
328 pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
329 let mut sql = BTreeMap::new();
330 for (name, config) in config.sql {
331 sql.insert(name, Listener::bind(config).await?);
332 }
333
334 let mut http = BTreeMap::new();
335 for (name, config) in config.http {
336 http.insert(name, Listener::bind(config).await?);
337 }
338
339 Ok(Listeners { http, sql })
340 }
341
342 #[instrument(name = "environmentd::serve")]
346 pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
347 let serve_start = Instant::now();
348 info!("startup: envd serve: beginning");
349 info!("startup: envd serve: preamble beginning");
350
351 let tls_reloading_context = match config.tls {
353 Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
354 None => None,
355 };
356
357 let active_connection_counter = ConnectionCounter::default();
358 let (deployment_state, deployment_state_handle) = DeploymentState::new();
359
360 let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
369 let internal_route_config = Arc::new(InternalRouteConfig {
370 deployment_state_handle,
371 internal_console_redirect_url: config.internal_console_redirect_url,
372 });
373
374 let (authenticator_frontegg_tx, authenticator_frontegg_rx) = oneshot::channel();
375 let authenticator_frontegg_rx = authenticator_frontegg_rx.shared();
376 let (authenticator_password_tx, authenticator_password_rx) = oneshot::channel();
377 let authenticator_password_rx = authenticator_password_rx.shared();
378 let (authenticator_oidc_tx, authenticator_oidc_rx) = oneshot::channel();
379 let authenticator_oidc_rx = authenticator_oidc_rx.shared();
380 let (authenticator_none_tx, authenticator_none_rx) = oneshot::channel();
381 let authenticator_none_rx = authenticator_none_rx.shared();
382
383 if let Some(frontegg) = &config.frontegg {
386 authenticator_frontegg_tx
387 .send(Arc::new(Authenticator::Frontegg(frontegg.clone())))
388 .expect("rx known to be live");
389 }
390 authenticator_none_tx
391 .send(Arc::new(Authenticator::None))
392 .expect("rx known to be live");
393
394 let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
395 let adapter_client_rx = adapter_client_rx.shared();
396
397 let metrics_registry = config.metrics_registry.clone();
398 let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
399 let mut http_listener_handles = BTreeMap::new();
400 for (name, listener) in self.http {
401 let authenticator_kind = listener.config.authenticator_kind();
402 let authenticator_rx = match authenticator_kind {
403 AuthenticatorKind::Frontegg => authenticator_frontegg_rx.clone(),
404 AuthenticatorKind::Password => authenticator_password_rx.clone(),
405 AuthenticatorKind::Sasl => authenticator_password_rx.clone(),
406 AuthenticatorKind::Oidc => authenticator_oidc_rx.clone(),
407 AuthenticatorKind::None => authenticator_none_rx.clone(),
408 };
409 let source: &'static str = Box::leak(name.clone().into_boxed_str());
410 let tls = if listener.config.enable_tls() {
411 tls_reloading_context.clone()
412 } else {
413 None
414 };
415 let http_config = HttpConfig {
416 adapter_client_rx: adapter_client_rx.clone(),
417 active_connection_counter: active_connection_counter.clone(),
418 helm_chart_version: config.helm_chart_version.clone(),
419 source,
420 tls,
421 authenticator_kind,
422 authenticator_rx,
423 allowed_origin: config.cors_allowed_origin.clone(),
424 concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
425 metrics: metrics.clone(),
426 metrics_registry: metrics_registry.clone(),
427 allowed_roles: listener.config.allowed_roles(),
428 internal_route_config: Arc::clone(&internal_route_config),
429 routes_enabled: listener.config.routes.clone(),
430 replica_http_locator: Arc::clone(&config.controller.replica_http_locator),
431 };
432 http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
433 }
434
435 info!(
436 "startup: envd serve: preamble complete in {:?}",
437 serve_start.elapsed()
438 );
439
440 let catalog_init_start = Instant::now();
441 info!("startup: envd serve: catalog init beginning");
442
443 let boot_ts = (config.now)().into();
445
446 let persist_client = config
447 .catalog_config
448 .persist_clients
449 .open(config.controller.persist_location.clone())
450 .await
451 .context("opening persist client")?;
452 let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
453 persist_client.clone(),
454 config.environment_id.organization_id(),
455 BUILD_INFO.semver_version(),
456 Some(config.controller.deploy_generation),
457 Arc::clone(&config.catalog_config.metrics),
458 )
459 .await?;
460
461 info!(
462 "startup: envd serve: catalog init complete in {:?}",
463 catalog_init_start.elapsed()
464 );
465
466 let system_param_sync_start = Instant::now();
467 info!("startup: envd serve: system parameter sync beginning");
468 let system_parameter_sync_config =
470 match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
471 (None, None) => None,
472 (None, Some(f)) => {
473 info!("Using config file path {:?}", f);
474 Some(SystemParameterSyncConfig::new(
475 config.environment_id.clone(),
476 &BUILD_INFO,
477 &config.metrics_registry,
478 config.launchdarkly_key_map,
479 SystemParameterSyncClientConfig::File { path: f },
480 ))
481 }
482 (Some(key), None) => Some(SystemParameterSyncConfig::new(
483 config.environment_id.clone(),
484 &BUILD_INFO,
485 &config.metrics_registry,
486 config.launchdarkly_key_map,
487 SystemParameterSyncClientConfig::LaunchDarkly {
488 sdk_key: key,
489 now_fn: config.now.clone(),
490 },
491 )),
492
493 (Some(_), Some(_)) => {
494 panic!("Cannot configure both file and Launchdarkly based config syncing")
495 }
496 };
497
498 let remote_system_parameters = load_remote_system_parameters(
499 &mut openable_adapter_storage,
500 system_parameter_sync_config.clone(),
501 config.config_sync_timeout,
502 )
503 .await?;
504 info!(
505 "startup: envd serve: system parameter sync complete in {:?}",
506 system_param_sync_start.elapsed()
507 );
508
509 let preflight_checks_start = Instant::now();
510 info!("startup: envd serve: preflight checks beginning");
511
512 let with_0dt_deployment_max_wait = {
514 let cli_default = config
515 .system_parameter_defaults
516 .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
517 .map(|x| {
518 Duration::parse(VarInput::Flat(x)).map_err(|err| {
519 anyhow!(
520 "failed to parse default for {}: {:?}",
521 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
522 err
523 )
524 })
525 })
526 .transpose()?;
527 let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
528 let ld = get_ld_value(
529 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
530 &remote_system_parameters,
531 |x| {
532 Duration::parse(VarInput::Flat(x)).map_err(|err| {
533 format!(
534 "failed to parse LD value {} for {}: {:?}",
535 x,
536 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
537 err
538 )
539 })
540 },
541 )?;
542 let catalog = openable_adapter_storage
543 .get_0dt_deployment_max_wait()
544 .await?;
545 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
546 info!(
547 ?computed,
548 ?ld,
549 ?catalog,
550 ?cli_default,
551 ?compiled_default,
552 "determined value for {} system parameter",
553 WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
554 );
555 computed
556 };
557 let with_0dt_deployment_ddl_check_interval = {
559 let cli_default = config
560 .system_parameter_defaults
561 .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
562 .map(|x| {
563 Duration::parse(VarInput::Flat(x)).map_err(|err| {
564 anyhow!(
565 "failed to parse default for {}: {:?}",
566 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
567 err
568 )
569 })
570 })
571 .transpose()?;
572 let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
573 let ld = get_ld_value(
574 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
575 &remote_system_parameters,
576 |x| {
577 Duration::parse(VarInput::Flat(x)).map_err(|err| {
578 format!(
579 "failed to parse LD value {} for {}: {:?}",
580 x,
581 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
582 err
583 )
584 })
585 },
586 )?;
587 let catalog = openable_adapter_storage
588 .get_0dt_deployment_ddl_check_interval()
589 .await?;
590 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
591 info!(
592 ?computed,
593 ?ld,
594 ?catalog,
595 ?cli_default,
596 ?compiled_default,
597 "determined value for {} system parameter",
598 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
599 );
600 computed
601 };
602
603 let enable_0dt_deployment_panic_after_timeout = {
606 let cli_default = config
607 .system_parameter_defaults
608 .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
609 .map(|x| {
610 strconv::parse_bool(x).map_err(|err| {
611 anyhow!(
612 "failed to parse default for {}: {}",
613 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
614 err
615 )
616 })
617 })
618 .transpose()?;
619 let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
620 let ld = get_ld_value(
621 "enable_0dt_deployment_panic_after_timeout",
622 &remote_system_parameters,
623 |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
624 )?;
625 let catalog = openable_adapter_storage
626 .get_enable_0dt_deployment_panic_after_timeout()
627 .await?;
628 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
629 info!(
630 %computed,
631 ?ld,
632 ?catalog,
633 ?cli_default,
634 ?compiled_default,
635 "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
636 );
637 computed
638 };
639
640 let bootstrap_args = BootstrapArgs {
644 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
645 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
646 bootstrap_role: config.bootstrap_role.clone(),
647 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
648 };
649 let preflight_config = PreflightInput {
650 boot_ts,
651 environment_id: config.environment_id.clone(),
652 persist_client,
653 deploy_generation: config.controller.deploy_generation,
654 deployment_state: deployment_state.clone(),
655 openable_adapter_storage,
656 catalog_metrics: Arc::clone(&config.catalog_config.metrics),
657 caught_up_max_wait: with_0dt_deployment_max_wait,
658 panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
659 bootstrap_args,
660 ddl_check_interval: with_0dt_deployment_ddl_check_interval,
661 };
662 let PreflightOutput {
663 openable_adapter_storage,
664 read_only,
665 caught_up_trigger,
666 } = deployment::preflight::preflight_0dt(preflight_config).await?;
667
668 info!(
669 "startup: envd serve: preflight checks complete in {:?}",
670 preflight_checks_start.elapsed()
671 );
672
673 let catalog_open_start = Instant::now();
674 info!("startup: envd serve: durable catalog open beginning");
675
676 let bootstrap_args = BootstrapArgs {
677 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
678 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
679 bootstrap_role: config.bootstrap_role,
680 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
681 };
682
683 let (adapter_storage, audit_logs_iterator) = if read_only {
685 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
688 .open_savepoint(boot_ts, &bootstrap_args)
689 .await?;
690 (adapter_storage, audit_logs_iterator)
695 } else {
696 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
697 .open(boot_ts, &bootstrap_args)
698 .await?;
699
700 deployment_state.set_is_leader();
704
705 (adapter_storage, audit_logs_iterator)
706 };
707
708 if !read_only {
710 config.controller.persist_clients.cfg().enable_compaction();
711 }
712
713 info!(
714 "startup: envd serve: durable catalog open complete in {:?}",
715 catalog_open_start.elapsed()
716 );
717
718 let coord_init_start = Instant::now();
719 info!("startup: envd serve: coordinator init beginning");
720
721 if !config
722 .cluster_replica_sizes
723 .0
724 .contains_key(&config.bootstrap_default_cluster_replica_size)
725 {
726 return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
727 }
728 let envd_epoch = adapter_storage.epoch();
729
730 let storage_usage_client = StorageUsageClient::open(
732 config
733 .controller
734 .persist_clients
735 .open(config.controller.persist_location.clone())
736 .await
737 .context("opening storage usage client")?,
738 );
739
740 let segment_client = config.segment_api_key.map(|api_key| {
742 mz_segment::Client::new(mz_segment::Config {
743 api_key,
744 client_side: config.segment_client_side,
745 })
746 });
747 let connection_limiter = active_connection_counter.clone();
748 let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
749 connection_limiter.update_limit(limit);
750 connection_limiter.update_superuser_reserved(superuser_reserved);
751 });
752
753 let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
754 connection_context: config.controller.connection_context.clone(),
755 connection_limit_callback,
756 controller_config: config.controller,
757 controller_envd_epoch: envd_epoch,
758 storage: adapter_storage,
759 audit_logs_iterator,
760 timestamp_oracle_url: config.timestamp_oracle_url,
761 unsafe_mode: config.unsafe_mode,
762 all_features: config.all_features,
763 build_info: &BUILD_INFO,
764 environment_id: config.environment_id.clone(),
765 metrics_registry: config.metrics_registry.clone(),
766 now: config.now,
767 secrets_controller: config.secrets_controller,
768 cloud_resource_controller: config.cloud_resource_controller,
769 cluster_replica_sizes: config.cluster_replica_sizes,
770 builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
771 builtin_catalog_server_cluster_config: config
772 .bootstrap_builtin_catalog_server_cluster_config,
773 builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
774 builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
775 builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
776 availability_zones: config.availability_zones,
777 system_parameter_defaults: config.system_parameter_defaults,
778 storage_usage_client,
779 storage_usage_collection_interval: config.storage_usage_collection_interval,
780 storage_usage_retention_period: config.storage_usage_retention_period,
781 segment_client: segment_client.clone(),
782 egress_addresses: config.egress_addresses,
783 remote_system_parameters,
784 aws_account_id: config.aws_account_id,
785 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
786 webhook_concurrency_limit: webhook_concurrency_limit.clone(),
787 http_host_name: config.http_host_name,
788 tracing_handle: config.tracing_handle,
789 read_only_controllers: read_only,
790 caught_up_trigger,
791 helm_chart_version: config.helm_chart_version.clone(),
792 license_key: config.license_key,
793 external_login_password_mz_system: config.external_login_password_mz_system,
794 force_builtin_schema_migration: config.force_builtin_schema_migration,
795 })
796 .instrument(info_span!("adapter::serve"))
797 .await?;
798
799 let oidc = GenericOidcAuthenticator::new(adapter_client.clone());
801
802 info!(
803 "startup: envd serve: coordinator init complete in {:?}",
804 coord_init_start.elapsed()
805 );
806
807 let serve_postamble_start = Instant::now();
808 info!("startup: envd serve: postamble beginning");
809
810 authenticator_password_tx
812 .send(Arc::new(Authenticator::Password(adapter_client.clone())))
813 .expect("rx known to be live");
814
815 authenticator_oidc_tx
816 .send(Arc::new(Authenticator::Oidc(oidc.clone())))
817 .expect("rx known to be live");
818 adapter_client_tx
819 .send(adapter_client.clone())
820 .expect("internal HTTP server should not drop first");
821
822 let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
823
824 let mut sql_listener_handles = BTreeMap::new();
826 for (name, listener) in self.sql {
827 sql_listener_handles.insert(
828 name.clone(),
829 listener
830 .serve_sql(
831 name,
832 active_connection_counter.clone(),
833 tls_reloading_context.clone(),
834 config.frontegg.clone(),
835 adapter_client.clone(),
836 oidc.clone(),
837 metrics.clone(),
838 config.helm_chart_version.clone(),
839 )
840 .await,
841 );
842 }
843
844 if let Some(segment_client) = segment_client {
846 telemetry::start_reporting(telemetry::Config {
847 segment_client,
848 adapter_client: adapter_client.clone(),
849 environment_id: config.environment_id,
850 report_interval: Duration::from_secs(3600),
851 });
852 } else if config.test_only_dummy_segment_client {
853 tracing::debug!("starting telemetry reporting with a dummy segment client");
859 let segment_client = mz_segment::Client::new_dummy_client();
860 telemetry::start_reporting(telemetry::Config {
861 segment_client,
862 adapter_client: adapter_client.clone(),
863 environment_id: config.environment_id,
864 report_interval: Duration::from_secs(180),
865 });
866 }
867
868 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
871 task::spawn(
872 || "system_parameter_sync",
873 AssertUnwindSafe(system_parameter_sync(
874 system_parameter_sync_config,
875 adapter_client.clone(),
876 config.config_sync_loop_interval,
877 ))
878 .ore_catch_unwind(),
879 );
880 }
881
882 info!(
883 "startup: envd serve: postamble complete in {:?}",
884 serve_postamble_start.elapsed()
885 );
886 info!(
887 "startup: envd serve: complete in {:?}",
888 serve_start.elapsed()
889 );
890
891 Ok(Server {
892 sql_listener_handles,
893 http_listener_handles,
894 _adapter_handle: adapter_handle,
895 })
896 }
897}
898
899fn get_ld_value<V>(
900 name: &str,
901 remote_system_parameters: &Option<BTreeMap<String, String>>,
902 parse: impl Fn(&str) -> Result<V, String>,
903) -> Result<Option<V>, anyhow::Error> {
904 remote_system_parameters
905 .as_ref()
906 .and_then(|params| params.get(name))
907 .map(|x| {
908 parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
909 })
910 .transpose()
911}
912
913pub struct Server {
915 pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
917 pub http_listener_handles: BTreeMap<String, ListenerHandle>,
918 _adapter_handle: mz_adapter::Handle,
919}