1use ::http::HeaderValue;
17use std::collections::BTreeMap;
18use std::panic::AssertUnwindSafe;
19use std::path::PathBuf;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use std::{env, io};
24
25use anyhow::{Context, anyhow};
26use derivative::Derivative;
27use futures::FutureExt;
28use ipnet::IpNet;
29use mz_adapter::config::{
30 SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
31};
32use mz_adapter::webhook::WebhookConcurrencyLimiter;
33use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
34use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
35use mz_adapter_types::dyncfgs::{
36 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
37 WITH_0DT_DEPLOYMENT_MAX_WAIT,
38};
39use mz_auth::password::Password;
40use mz_authenticator::GenericOidcAuthenticator;
41use mz_build_info::{BuildInfo, build_info};
42use mz_catalog::config::ClusterReplicaSizeMap;
43use mz_catalog::durable::BootstrapArgs;
44use mz_cloud_resources::CloudResourceController;
45use mz_controller::ControllerConfig;
46use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
47use mz_license_keys::ValidatedLicenseKey;
48use mz_ore::future::OreFutureExt;
49use mz_ore::metrics::MetricsRegistry;
50use mz_ore::now::NowFn;
51use mz_ore::tracing::TracingHandle;
52use mz_ore::url::SensitiveUrl;
53use mz_ore::{instrument, task};
54use mz_persist_client::cache::PersistClientCache;
55use mz_persist_client::usage::StorageUsageClient;
56use mz_pgwire::MetricsConfig;
57use mz_pgwire_common::ConnectionCounter;
58use mz_repr::strconv;
59use mz_secrets::SecretsController;
60use mz_server_core::listeners::{
61 HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
62};
63use mz_server_core::{
64 ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
65 TlsCertConfig, TlsMode,
66};
67use mz_sql::catalog::EnvironmentId;
68use mz_sql::session::vars::{Value, VarInput};
69use tokio::sync::oneshot;
70use tower_http::cors::AllowOrigin;
71use tracing::{Instrument, info, info_span};
72
73use crate::deployment::preflight::{PreflightInput, PreflightOutput};
74use crate::deployment::state::DeploymentState;
75use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
76
77pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
78
79mod deployment;
80pub mod environmentd;
81pub mod http;
82mod telemetry;
83#[cfg(feature = "test")]
84pub mod test_util;
85
86pub const BUILD_INFO: BuildInfo = build_info!();
87
88#[derive(Derivative)]
90#[derivative(Debug)]
91pub struct Config {
92 pub unsafe_mode: bool,
96 pub all_features: bool,
99
100 pub tls: Option<TlsCertConfig>,
103 #[derivative(Debug = "ignore")]
105 pub tls_reload_certs: ReloadTrigger,
106 pub external_login_password_mz_system: Option<Password>,
108 pub frontegg: Option<FronteggAuthenticator>,
110 pub frontegg_oauth_issuer_url: Option<String>,
112 pub cors_allowed_origin: AllowOrigin,
115 pub cors_allowed_origin_list: Vec<HeaderValue>,
120 pub egress_addresses: Vec<IpNet>,
123 pub http_host_name: Option<String>,
129 pub internal_console_redirect_url: Option<String>,
132
133 pub controller: ControllerConfig,
136 pub secrets_controller: Arc<dyn SecretsController>,
138 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
140
141 pub storage_usage_collection_interval: Duration,
144 pub storage_usage_retention_period: Option<Duration>,
146
147 pub catalog_config: CatalogConfig,
150 pub availability_zones: Vec<String>,
153 pub cluster_replica_sizes: ClusterReplicaSizeMap,
155 pub timestamp_oracle_url: Option<SensitiveUrl>,
157 pub segment_api_key: Option<String>,
159 pub segment_client_side: bool,
162 pub test_only_dummy_segment_client: bool,
164 pub launchdarkly_sdk_key: Option<String>,
167 pub launchdarkly_key_map: BTreeMap<String, String>,
170 pub config_sync_timeout: Duration,
172 pub config_sync_loop_interval: Option<Duration>,
174 pub config_sync_file_path: Option<PathBuf>,
176
177 pub environment_id: EnvironmentId,
180 pub bootstrap_role: Option<String>,
182 pub bootstrap_default_cluster_replica_size: String,
184 pub bootstrap_default_cluster_replication_factor: u32,
186 pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
188 pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
190 pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
192 pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
194 pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
196 pub system_parameter_defaults: BTreeMap<String, String>,
199 pub helm_chart_version: Option<String>,
201 pub license_key: ValidatedLicenseKey,
203
204 pub aws_account_id: Option<String>,
208 pub aws_privatelink_availability_zones: Option<Vec<String>>,
210
211 pub metrics_registry: MetricsRegistry,
214 pub tracing_handle: TracingHandle,
216
217 pub now: NowFn,
220 pub force_builtin_schema_migration: Option<String>,
223}
224
225#[derive(Debug, Clone)]
227pub struct CatalogConfig {
228 pub persist_clients: Arc<PersistClientCache>,
230 pub metrics: Arc<mz_catalog::durable::Metrics>,
232}
233
234pub struct Listener<C> {
235 pub handle: ListenerHandle,
236 connection_stream: Pin<Box<dyn ConnectionStream>>,
237 config: C,
238}
239impl<C> Listener<C>
240where
241 C: ListenerConfig,
242{
243 async fn bind(config: C) -> Result<Self, io::Error> {
255 let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
256 Ok(Self {
257 handle,
258 connection_stream,
259 config,
260 })
261 }
262}
263
264impl Listener<SqlListenerConfig> {
265 #[instrument(name = "environmentd::serve_sql")]
266 pub async fn serve_sql(
267 self,
268 name: String,
269 active_connection_counter: ConnectionCounter,
270 tls_reloading_context: Option<ReloadingSslContext>,
271 frontegg: Option<FronteggAuthenticator>,
272 adapter_client: AdapterClient,
273 oidc: GenericOidcAuthenticator,
274 metrics: MetricsConfig,
275 helm_chart_version: Option<String>,
276 ) -> ListenerHandle {
277 let label: &'static str = Box::leak(name.into_boxed_str());
278 let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
279 context,
280 mode: if self.config.enable_tls {
281 TlsMode::Require
282 } else {
283 TlsMode::Allow
284 },
285 });
286
287 task::spawn(|| format!("{}_sql_server", label), {
288 let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
289 label,
290 tls,
291 adapter_client,
292 authenticator_kind: self.config.authenticator_kind,
293 frontegg,
294 oidc,
295 metrics,
296 active_connection_counter,
297 helm_chart_version,
298 allowed_roles: self.config.allowed_roles,
299 });
300 mz_server_core::serve(ServeConfig {
301 conns: self.connection_stream,
302 server: sql_server,
303 dyncfg: None,
306 })
307 });
308 self.handle
309 }
310}
311
312impl Listener<HttpListenerConfig> {
313 #[instrument(name = "environmentd::serve_http")]
314 pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
315 let task_name = format!("{}_http_server", &config.source);
316 task::spawn(|| task_name, {
317 let http_server = HttpServer::new(config);
318 mz_server_core::serve(ServeConfig {
319 conns: self.connection_stream,
320 server: http_server,
321 dyncfg: None,
324 })
325 });
326 self.handle
327 }
328}
329
330pub struct Listeners {
331 pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
332 pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
333}
334
335impl Listeners {
336 pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
337 let mut sql = BTreeMap::new();
338 for (name, config) in config.sql {
339 sql.insert(name, Listener::bind(config).await?);
340 }
341
342 let mut http = BTreeMap::new();
343 for (name, config) in config.http {
344 http.insert(name, Listener::bind(config).await?);
345 }
346
347 Ok(Listeners { http, sql })
348 }
349
350 #[instrument(name = "environmentd::serve")]
354 pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
355 let serve_start = Instant::now();
356 info!("startup: envd serve: beginning");
357 info!("startup: envd serve: preamble beginning");
358
359 let tls_reloading_context = match config.tls {
361 Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
362 None => None,
363 };
364
365 let active_connection_counter = ConnectionCounter::default();
366 let (deployment_state, deployment_state_handle) = DeploymentState::new();
367
368 let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
377 let internal_route_config = Arc::new(InternalRouteConfig {
378 deployment_state_handle,
379 internal_console_redirect_url: config.internal_console_redirect_url,
380 });
381
382 let (authenticator_oidc_tx, authenticator_oidc_rx) = oneshot::channel();
383 let authenticator_oidc_rx = authenticator_oidc_rx.shared();
384 let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
385 let adapter_client_rx = adapter_client_rx.shared();
386
387 let metrics_registry = config.metrics_registry.clone();
388 let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
389 let mcp_metrics = http::mcp_metrics::McpMetrics::register_into(&metrics_registry);
390 let oauth_metadata_metrics =
391 http::oauth_metadata::OauthMetadataMetrics::register_into(&metrics_registry);
392 let mut http_listener_handles = BTreeMap::new();
393 for (name, listener) in self.http {
394 let authenticator_kind = listener.config.authenticator_kind();
395 let source: &'static str = Box::leak(name.clone().into_boxed_str());
396 let tls = if listener.config.enable_tls() {
397 tls_reloading_context.clone()
398 } else {
399 None
400 };
401 let http_config = HttpConfig {
402 adapter_client_rx: adapter_client_rx.clone(),
403 active_connection_counter: active_connection_counter.clone(),
404 helm_chart_version: config.helm_chart_version.clone(),
405 http_host_name: config.http_host_name.clone(),
406 frontegg_oauth_issuer_url: config.frontegg_oauth_issuer_url.clone(),
407 source,
408 tls,
409 authenticator_kind,
410 frontegg: config.frontegg.clone(),
411 oidc_rx: authenticator_oidc_rx.clone(),
412 allowed_origin: config.cors_allowed_origin.clone(),
413 allowed_origin_list: config.cors_allowed_origin_list.clone(),
414 concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
415 metrics: metrics.clone(),
416 metrics_registry: metrics_registry.clone(),
417 mcp_metrics: mcp_metrics.clone(),
418 oauth_metadata_metrics: oauth_metadata_metrics.clone(),
419 allowed_roles: listener.config.allowed_roles(),
420 internal_route_config: Arc::clone(&internal_route_config),
421 routes_enabled: listener.config.routes.clone(),
422 replica_http_locator: Arc::clone(&config.controller.replica_http_locator),
423 };
424 http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
425 }
426
427 info!(
428 "startup: envd serve: preamble complete in {:?}",
429 serve_start.elapsed()
430 );
431
432 let catalog_init_start = Instant::now();
433 info!("startup: envd serve: catalog init beginning");
434
435 let boot_ts = (config.now)().into();
437
438 let persist_client = config
439 .catalog_config
440 .persist_clients
441 .open(config.controller.persist_location.clone())
442 .await
443 .context("opening persist client")?;
444 let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
445 persist_client.clone(),
446 config.environment_id.organization_id(),
447 BUILD_INFO.semver_version(),
448 Some(config.controller.deploy_generation),
449 Arc::clone(&config.catalog_config.metrics),
450 )
451 .await?;
452
453 info!(
454 "startup: envd serve: catalog init complete in {:?}",
455 catalog_init_start.elapsed()
456 );
457
458 let system_param_sync_start = Instant::now();
459 info!("startup: envd serve: system parameter sync beginning");
460 let system_parameter_sync_config =
462 match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
463 (None, None) => None,
464 (None, Some(f)) => {
465 info!("Using config file path {:?}", f);
466 Some(SystemParameterSyncConfig::new(
467 config.environment_id.clone(),
468 &BUILD_INFO,
469 &config.metrics_registry,
470 config.launchdarkly_key_map,
471 SystemParameterSyncClientConfig::File { path: f },
472 ))
473 }
474 (Some(key), None) => Some(SystemParameterSyncConfig::new(
475 config.environment_id.clone(),
476 &BUILD_INFO,
477 &config.metrics_registry,
478 config.launchdarkly_key_map,
479 SystemParameterSyncClientConfig::LaunchDarkly {
480 sdk_key: key,
481 now_fn: config.now.clone(),
482 },
483 )),
484
485 (Some(_), Some(_)) => {
486 panic!("Cannot configure both file and Launchdarkly based config syncing")
487 }
488 };
489
490 let remote_system_parameters = load_remote_system_parameters(
491 &mut openable_adapter_storage,
492 system_parameter_sync_config.clone(),
493 config.config_sync_timeout,
494 )
495 .await?;
496 info!(
497 "startup: envd serve: system parameter sync complete in {:?}",
498 system_param_sync_start.elapsed()
499 );
500
501 let preflight_checks_start = Instant::now();
502 info!("startup: envd serve: preflight checks beginning");
503
504 let with_0dt_deployment_max_wait = {
506 let cli_default = config
507 .system_parameter_defaults
508 .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
509 .map(|x| {
510 Duration::parse(VarInput::Flat(x)).map_err(|err| {
511 anyhow!(
512 "failed to parse default for {}: {:?}",
513 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
514 err
515 )
516 })
517 })
518 .transpose()?;
519 let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
520 let ld = get_ld_value(
521 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
522 &remote_system_parameters,
523 |x| {
524 Duration::parse(VarInput::Flat(x)).map_err(|err| {
525 format!(
526 "failed to parse LD value {} for {}: {:?}",
527 x,
528 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
529 err
530 )
531 })
532 },
533 )?;
534 let catalog = openable_adapter_storage
535 .get_0dt_deployment_max_wait()
536 .await?;
537 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
538 info!(
539 ?computed,
540 ?ld,
541 ?catalog,
542 ?cli_default,
543 ?compiled_default,
544 "determined value for {} system parameter",
545 WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
546 );
547 computed
548 };
549 let with_0dt_deployment_ddl_check_interval = {
551 let cli_default = config
552 .system_parameter_defaults
553 .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
554 .map(|x| {
555 Duration::parse(VarInput::Flat(x)).map_err(|err| {
556 anyhow!(
557 "failed to parse default for {}: {:?}",
558 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
559 err
560 )
561 })
562 })
563 .transpose()?;
564 let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
565 let ld = get_ld_value(
566 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
567 &remote_system_parameters,
568 |x| {
569 Duration::parse(VarInput::Flat(x)).map_err(|err| {
570 format!(
571 "failed to parse LD value {} for {}: {:?}",
572 x,
573 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
574 err
575 )
576 })
577 },
578 )?;
579 let catalog = openable_adapter_storage
580 .get_0dt_deployment_ddl_check_interval()
581 .await?;
582 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
583 info!(
584 ?computed,
585 ?ld,
586 ?catalog,
587 ?cli_default,
588 ?compiled_default,
589 "determined value for {} system parameter",
590 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
591 );
592 computed
593 };
594
595 let enable_0dt_deployment_panic_after_timeout = {
598 let cli_default = config
599 .system_parameter_defaults
600 .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
601 .map(|x| {
602 strconv::parse_bool(x).map_err(|err| {
603 anyhow!(
604 "failed to parse default for {}: {}",
605 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
606 err
607 )
608 })
609 })
610 .transpose()?;
611 let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
612 let ld = get_ld_value(
613 "enable_0dt_deployment_panic_after_timeout",
614 &remote_system_parameters,
615 |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
616 )?;
617 let catalog = openable_adapter_storage
618 .get_enable_0dt_deployment_panic_after_timeout()
619 .await?;
620 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
621 info!(
622 %computed,
623 ?ld,
624 ?catalog,
625 ?cli_default,
626 ?compiled_default,
627 "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
628 );
629 computed
630 };
631
632 let bootstrap_args = BootstrapArgs {
636 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
637 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
638 bootstrap_role: config.bootstrap_role.clone(),
639 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
640 };
641 let preflight_config = PreflightInput {
642 boot_ts,
643 environment_id: config.environment_id.clone(),
644 persist_client,
645 deploy_generation: config.controller.deploy_generation,
646 deployment_state: deployment_state.clone(),
647 openable_adapter_storage,
648 catalog_metrics: Arc::clone(&config.catalog_config.metrics),
649 caught_up_max_wait: with_0dt_deployment_max_wait,
650 panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
651 bootstrap_args,
652 ddl_check_interval: with_0dt_deployment_ddl_check_interval,
653 };
654 let PreflightOutput {
655 openable_adapter_storage,
656 read_only,
657 caught_up_trigger,
658 } = deployment::preflight::preflight_0dt(preflight_config).await?;
659
660 info!(
661 "startup: envd serve: preflight checks complete in {:?}",
662 preflight_checks_start.elapsed()
663 );
664
665 let catalog_open_start = Instant::now();
666 info!("startup: envd serve: durable catalog open beginning");
667
668 let bootstrap_args = BootstrapArgs {
669 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
670 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
671 bootstrap_role: config.bootstrap_role,
672 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
673 };
674
675 let (adapter_storage, audit_logs_iterator) = if read_only {
677 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
680 .open_savepoint(boot_ts, &bootstrap_args)
681 .await?;
682 (adapter_storage, audit_logs_iterator)
687 } else {
688 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
689 .open(boot_ts, &bootstrap_args)
690 .await?;
691
692 deployment_state.set_is_leader();
696
697 (adapter_storage, audit_logs_iterator)
698 };
699
700 if !read_only {
702 config.controller.persist_clients.cfg().enable_compaction();
703 }
704
705 info!(
706 "startup: envd serve: durable catalog open complete in {:?}",
707 catalog_open_start.elapsed()
708 );
709
710 let coord_init_start = Instant::now();
711 info!("startup: envd serve: coordinator init beginning");
712
713 if !config
714 .cluster_replica_sizes
715 .0
716 .contains_key(&config.bootstrap_default_cluster_replica_size)
717 {
718 return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
719 }
720 let envd_epoch = adapter_storage.epoch();
721
722 let storage_usage_client = StorageUsageClient::open(
724 config
725 .controller
726 .persist_clients
727 .open(config.controller.persist_location.clone())
728 .await
729 .context("opening storage usage client")?,
730 );
731
732 let segment_client = config.segment_api_key.map(|api_key| {
734 mz_segment::Client::new(mz_segment::Config {
735 api_key,
736 client_side: config.segment_client_side,
737 })
738 });
739 let connection_limiter = active_connection_counter.clone();
740 let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
741 connection_limiter.update_limit(limit);
742 connection_limiter.update_superuser_reserved(superuser_reserved);
743 });
744
745 let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
746 connection_context: config.controller.connection_context.clone(),
747 connection_limit_callback,
748 controller_config: config.controller,
749 controller_envd_epoch: envd_epoch,
750 storage: adapter_storage,
751 audit_logs_iterator,
752 timestamp_oracle_url: config.timestamp_oracle_url,
753 unsafe_mode: config.unsafe_mode,
754 all_features: config.all_features,
755 build_info: &BUILD_INFO,
756 environment_id: config.environment_id.clone(),
757 metrics_registry: config.metrics_registry.clone(),
758 now: config.now,
759 secrets_controller: config.secrets_controller,
760 cloud_resource_controller: config.cloud_resource_controller,
761 cluster_replica_sizes: config.cluster_replica_sizes,
762 builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
763 builtin_catalog_server_cluster_config: config
764 .bootstrap_builtin_catalog_server_cluster_config,
765 builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
766 builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
767 builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
768 availability_zones: config.availability_zones,
769 system_parameter_defaults: config.system_parameter_defaults,
770 storage_usage_client,
771 storage_usage_collection_interval: config.storage_usage_collection_interval,
772 storage_usage_retention_period: config.storage_usage_retention_period,
773 segment_client: segment_client.clone(),
774 egress_addresses: config.egress_addresses,
775 remote_system_parameters,
776 aws_account_id: config.aws_account_id,
777 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
778 webhook_concurrency_limit: webhook_concurrency_limit.clone(),
779 http_host_name: config.http_host_name,
780 tracing_handle: config.tracing_handle,
781 read_only_controllers: read_only,
782 caught_up_trigger,
783 helm_chart_version: config.helm_chart_version.clone(),
784 license_key: config.license_key,
785 external_login_password_mz_system: config.external_login_password_mz_system,
786 force_builtin_schema_migration: config.force_builtin_schema_migration,
787 })
788 .instrument(info_span!("adapter::serve"))
789 .await?;
790
791 let oidc = GenericOidcAuthenticator::new(adapter_client.clone());
793
794 info!(
795 "startup: envd serve: coordinator init complete in {:?}",
796 coord_init_start.elapsed()
797 );
798
799 let serve_postamble_start = Instant::now();
800 info!("startup: envd serve: postamble beginning");
801
802 authenticator_oidc_tx
804 .send(oidc.clone())
805 .expect("rx known to be live");
806 adapter_client_tx
807 .send(adapter_client.clone())
808 .expect("internal HTTP server should not drop first");
809
810 let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
811
812 let mut sql_listener_handles = BTreeMap::new();
814 for (name, listener) in self.sql {
815 sql_listener_handles.insert(
816 name.clone(),
817 listener
818 .serve_sql(
819 name,
820 active_connection_counter.clone(),
821 tls_reloading_context.clone(),
822 config.frontegg.clone(),
823 adapter_client.clone(),
824 oidc.clone(),
825 metrics.clone(),
826 config.helm_chart_version.clone(),
827 )
828 .await,
829 );
830 }
831
832 if let Some(segment_client) = segment_client {
834 telemetry::start_reporting(telemetry::Config {
835 segment_client,
836 adapter_client: adapter_client.clone(),
837 environment_id: config.environment_id,
838 report_interval: Duration::from_secs(3600),
839 });
840 } else if config.test_only_dummy_segment_client {
841 tracing::debug!("starting telemetry reporting with a dummy segment client");
847 let segment_client = mz_segment::Client::new_dummy_client();
848 telemetry::start_reporting(telemetry::Config {
849 segment_client,
850 adapter_client: adapter_client.clone(),
851 environment_id: config.environment_id,
852 report_interval: Duration::from_secs(180),
853 });
854 }
855
856 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
859 task::spawn(
860 || "system_parameter_sync",
861 AssertUnwindSafe(system_parameter_sync(
862 system_parameter_sync_config,
863 adapter_client.clone(),
864 config.config_sync_loop_interval,
865 ))
866 .ore_catch_unwind(),
867 );
868 }
869
870 info!(
871 "startup: envd serve: postamble complete in {:?}",
872 serve_postamble_start.elapsed()
873 );
874 info!(
875 "startup: envd serve: complete in {:?}",
876 serve_start.elapsed()
877 );
878
879 Ok(Server {
880 sql_listener_handles,
881 http_listener_handles,
882 _adapter_handle: adapter_handle,
883 })
884 }
885}
886
887fn get_ld_value<V>(
888 name: &str,
889 remote_system_parameters: &Option<BTreeMap<String, String>>,
890 parse: impl Fn(&str) -> Result<V, String>,
891) -> Result<Option<V>, anyhow::Error> {
892 remote_system_parameters
893 .as_ref()
894 .and_then(|params| params.get(name))
895 .map(|x| {
896 parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
897 })
898 .transpose()
899}
900
901pub struct Server {
903 pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
905 pub http_listener_handles: BTreeMap<String, ListenerHandle>,
906 _adapter_handle: mz_adapter::Handle,
907}