1use ::http::HeaderValue;
17use std::collections::BTreeMap;
18use std::panic::AssertUnwindSafe;
19use std::path::PathBuf;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use std::{env, io};
24
25use anyhow::{Context, anyhow};
26use derivative::Derivative;
27use futures::FutureExt;
28use ipnet::IpNet;
29use mz_adapter::config::{
30 SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
31};
32use mz_adapter::webhook::WebhookConcurrencyLimiter;
33use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
34use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
35use mz_adapter_types::dyncfgs::{
36 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
37 WITH_0DT_DEPLOYMENT_MAX_WAIT,
38};
39use mz_auth::password::Password;
40use mz_authenticator::GenericOidcAuthenticator;
41use mz_build_info::{BuildInfo, build_info};
42use mz_catalog::config::ClusterReplicaSizeMap;
43use mz_catalog::durable::BootstrapArgs;
44use mz_cloud_resources::CloudResourceController;
45use mz_controller::ControllerConfig;
46use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
47use mz_license_keys::ValidatedLicenseKey;
48use mz_ore::future::OreFutureExt;
49use mz_ore::metrics::MetricsRegistry;
50use mz_ore::now::NowFn;
51use mz_ore::tracing::TracingHandle;
52use mz_ore::url::SensitiveUrl;
53use mz_ore::{instrument, task};
54use mz_persist_client::cache::PersistClientCache;
55use mz_persist_client::usage::StorageUsageClient;
56use mz_pgwire::MetricsConfig;
57use mz_pgwire_common::ConnectionCounter;
58use mz_repr::strconv;
59use mz_secrets::SecretsController;
60use mz_server_core::listeners::{
61 HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
62};
63use mz_server_core::{
64 ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
65 TlsCertConfig, TlsMode,
66};
67use mz_sql::catalog::EnvironmentId;
68use mz_sql::session::vars::{Value, VarInput};
69use tokio::sync::oneshot;
70use tower_http::cors::AllowOrigin;
71use tracing::{Instrument, info, info_span};
72
73use crate::deployment::preflight::{PreflightInput, PreflightOutput};
74use crate::deployment::state::DeploymentState;
75use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
76
77pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
78
79mod deployment;
80pub mod environmentd;
81pub mod http;
82mod telemetry;
83#[cfg(feature = "test")]
84pub mod test_util;
85
86pub const BUILD_INFO: BuildInfo = build_info!();
87
88#[derive(Derivative)]
90#[derivative(Debug)]
91pub struct Config {
92 pub unsafe_mode: bool,
96 pub all_features: bool,
99
100 pub tls: Option<TlsCertConfig>,
103 #[derivative(Debug = "ignore")]
105 pub tls_reload_certs: ReloadTrigger,
106 pub external_login_password_mz_system: Option<Password>,
108 pub frontegg: Option<FronteggAuthenticator>,
110 pub cors_allowed_origin: AllowOrigin,
113 pub cors_allowed_origin_list: Vec<HeaderValue>,
118 pub egress_addresses: Vec<IpNet>,
121 pub http_host_name: Option<String>,
127 pub internal_console_redirect_url: Option<String>,
130
131 pub controller: ControllerConfig,
134 pub secrets_controller: Arc<dyn SecretsController>,
136 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
138
139 pub storage_usage_collection_interval: Duration,
142 pub storage_usage_retention_period: Option<Duration>,
144
145 pub catalog_config: CatalogConfig,
148 pub availability_zones: Vec<String>,
151 pub cluster_replica_sizes: ClusterReplicaSizeMap,
153 pub timestamp_oracle_url: Option<SensitiveUrl>,
155 pub segment_api_key: Option<String>,
157 pub segment_client_side: bool,
160 pub test_only_dummy_segment_client: bool,
162 pub launchdarkly_sdk_key: Option<String>,
165 pub launchdarkly_key_map: BTreeMap<String, String>,
168 pub config_sync_timeout: Duration,
170 pub config_sync_loop_interval: Option<Duration>,
172 pub config_sync_file_path: Option<PathBuf>,
174
175 pub environment_id: EnvironmentId,
178 pub bootstrap_role: Option<String>,
180 pub bootstrap_default_cluster_replica_size: String,
182 pub bootstrap_default_cluster_replication_factor: u32,
184 pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
186 pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
188 pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
190 pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
192 pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
194 pub system_parameter_defaults: BTreeMap<String, String>,
197 pub helm_chart_version: Option<String>,
199 pub license_key: ValidatedLicenseKey,
201
202 pub aws_account_id: Option<String>,
206 pub aws_privatelink_availability_zones: Option<Vec<String>>,
208
209 pub metrics_registry: MetricsRegistry,
212 pub tracing_handle: TracingHandle,
214
215 pub now: NowFn,
218 pub force_builtin_schema_migration: Option<String>,
221}
222
223#[derive(Debug, Clone)]
225pub struct CatalogConfig {
226 pub persist_clients: Arc<PersistClientCache>,
228 pub metrics: Arc<mz_catalog::durable::Metrics>,
230}
231
232pub struct Listener<C> {
233 pub handle: ListenerHandle,
234 connection_stream: Pin<Box<dyn ConnectionStream>>,
235 config: C,
236}
237impl<C> Listener<C>
238where
239 C: ListenerConfig,
240{
241 async fn bind(config: C) -> Result<Self, io::Error> {
253 let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
254 Ok(Self {
255 handle,
256 connection_stream,
257 config,
258 })
259 }
260}
261
262impl Listener<SqlListenerConfig> {
263 #[instrument(name = "environmentd::serve_sql")]
264 pub async fn serve_sql(
265 self,
266 name: String,
267 active_connection_counter: ConnectionCounter,
268 tls_reloading_context: Option<ReloadingSslContext>,
269 frontegg: Option<FronteggAuthenticator>,
270 adapter_client: AdapterClient,
271 oidc: GenericOidcAuthenticator,
272 metrics: MetricsConfig,
273 helm_chart_version: Option<String>,
274 ) -> ListenerHandle {
275 let label: &'static str = Box::leak(name.into_boxed_str());
276 let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
277 context,
278 mode: if self.config.enable_tls {
279 TlsMode::Require
280 } else {
281 TlsMode::Allow
282 },
283 });
284
285 task::spawn(|| format!("{}_sql_server", label), {
286 let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
287 label,
288 tls,
289 adapter_client,
290 authenticator_kind: self.config.authenticator_kind,
291 frontegg,
292 oidc,
293 metrics,
294 active_connection_counter,
295 helm_chart_version,
296 allowed_roles: self.config.allowed_roles,
297 });
298 mz_server_core::serve(ServeConfig {
299 conns: self.connection_stream,
300 server: sql_server,
301 dyncfg: None,
304 })
305 });
306 self.handle
307 }
308}
309
310impl Listener<HttpListenerConfig> {
311 #[instrument(name = "environmentd::serve_http")]
312 pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
313 let task_name = format!("{}_http_server", &config.source);
314 task::spawn(|| task_name, {
315 let http_server = HttpServer::new(config);
316 mz_server_core::serve(ServeConfig {
317 conns: self.connection_stream,
318 server: http_server,
319 dyncfg: None,
322 })
323 });
324 self.handle
325 }
326}
327
328pub struct Listeners {
329 pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
330 pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
331}
332
333impl Listeners {
334 pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
335 let mut sql = BTreeMap::new();
336 for (name, config) in config.sql {
337 sql.insert(name, Listener::bind(config).await?);
338 }
339
340 let mut http = BTreeMap::new();
341 for (name, config) in config.http {
342 http.insert(name, Listener::bind(config).await?);
343 }
344
345 Ok(Listeners { http, sql })
346 }
347
348 #[instrument(name = "environmentd::serve")]
352 pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
353 let serve_start = Instant::now();
354 info!("startup: envd serve: beginning");
355 info!("startup: envd serve: preamble beginning");
356
357 let tls_reloading_context = match config.tls {
359 Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
360 None => None,
361 };
362
363 let active_connection_counter = ConnectionCounter::default();
364 let (deployment_state, deployment_state_handle) = DeploymentState::new();
365
366 let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
375 let internal_route_config = Arc::new(InternalRouteConfig {
376 deployment_state_handle,
377 internal_console_redirect_url: config.internal_console_redirect_url,
378 });
379
380 let (authenticator_oidc_tx, authenticator_oidc_rx) = oneshot::channel();
381 let authenticator_oidc_rx = authenticator_oidc_rx.shared();
382 let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
383 let adapter_client_rx = adapter_client_rx.shared();
384
385 let metrics_registry = config.metrics_registry.clone();
386 let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
387 let mcp_metrics = http::mcp_metrics::McpMetrics::register_into(&metrics_registry);
388 let mut http_listener_handles = BTreeMap::new();
389 for (name, listener) in self.http {
390 let authenticator_kind = listener.config.authenticator_kind();
391 let source: &'static str = Box::leak(name.clone().into_boxed_str());
392 let tls = if listener.config.enable_tls() {
393 tls_reloading_context.clone()
394 } else {
395 None
396 };
397 let http_config = HttpConfig {
398 adapter_client_rx: adapter_client_rx.clone(),
399 active_connection_counter: active_connection_counter.clone(),
400 helm_chart_version: config.helm_chart_version.clone(),
401 source,
402 tls,
403 authenticator_kind,
404 frontegg: config.frontegg.clone(),
405 oidc_rx: authenticator_oidc_rx.clone(),
406 allowed_origin: config.cors_allowed_origin.clone(),
407 allowed_origin_list: config.cors_allowed_origin_list.clone(),
408 concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
409 metrics: metrics.clone(),
410 metrics_registry: metrics_registry.clone(),
411 mcp_metrics: mcp_metrics.clone(),
412 allowed_roles: listener.config.allowed_roles(),
413 internal_route_config: Arc::clone(&internal_route_config),
414 routes_enabled: listener.config.routes.clone(),
415 replica_http_locator: Arc::clone(&config.controller.replica_http_locator),
416 };
417 http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
418 }
419
420 info!(
421 "startup: envd serve: preamble complete in {:?}",
422 serve_start.elapsed()
423 );
424
425 let catalog_init_start = Instant::now();
426 info!("startup: envd serve: catalog init beginning");
427
428 let boot_ts = (config.now)().into();
430
431 let persist_client = config
432 .catalog_config
433 .persist_clients
434 .open(config.controller.persist_location.clone())
435 .await
436 .context("opening persist client")?;
437 let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
438 persist_client.clone(),
439 config.environment_id.organization_id(),
440 BUILD_INFO.semver_version(),
441 Some(config.controller.deploy_generation),
442 Arc::clone(&config.catalog_config.metrics),
443 )
444 .await?;
445
446 info!(
447 "startup: envd serve: catalog init complete in {:?}",
448 catalog_init_start.elapsed()
449 );
450
451 let system_param_sync_start = Instant::now();
452 info!("startup: envd serve: system parameter sync beginning");
453 let system_parameter_sync_config =
455 match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
456 (None, None) => None,
457 (None, Some(f)) => {
458 info!("Using config file path {:?}", f);
459 Some(SystemParameterSyncConfig::new(
460 config.environment_id.clone(),
461 &BUILD_INFO,
462 &config.metrics_registry,
463 config.launchdarkly_key_map,
464 SystemParameterSyncClientConfig::File { path: f },
465 ))
466 }
467 (Some(key), None) => Some(SystemParameterSyncConfig::new(
468 config.environment_id.clone(),
469 &BUILD_INFO,
470 &config.metrics_registry,
471 config.launchdarkly_key_map,
472 SystemParameterSyncClientConfig::LaunchDarkly {
473 sdk_key: key,
474 now_fn: config.now.clone(),
475 },
476 )),
477
478 (Some(_), Some(_)) => {
479 panic!("Cannot configure both file and Launchdarkly based config syncing")
480 }
481 };
482
483 let remote_system_parameters = load_remote_system_parameters(
484 &mut openable_adapter_storage,
485 system_parameter_sync_config.clone(),
486 config.config_sync_timeout,
487 )
488 .await?;
489 info!(
490 "startup: envd serve: system parameter sync complete in {:?}",
491 system_param_sync_start.elapsed()
492 );
493
494 let preflight_checks_start = Instant::now();
495 info!("startup: envd serve: preflight checks beginning");
496
497 let with_0dt_deployment_max_wait = {
499 let cli_default = config
500 .system_parameter_defaults
501 .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
502 .map(|x| {
503 Duration::parse(VarInput::Flat(x)).map_err(|err| {
504 anyhow!(
505 "failed to parse default for {}: {:?}",
506 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
507 err
508 )
509 })
510 })
511 .transpose()?;
512 let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
513 let ld = get_ld_value(
514 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
515 &remote_system_parameters,
516 |x| {
517 Duration::parse(VarInput::Flat(x)).map_err(|err| {
518 format!(
519 "failed to parse LD value {} for {}: {:?}",
520 x,
521 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
522 err
523 )
524 })
525 },
526 )?;
527 let catalog = openable_adapter_storage
528 .get_0dt_deployment_max_wait()
529 .await?;
530 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
531 info!(
532 ?computed,
533 ?ld,
534 ?catalog,
535 ?cli_default,
536 ?compiled_default,
537 "determined value for {} system parameter",
538 WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
539 );
540 computed
541 };
542 let with_0dt_deployment_ddl_check_interval = {
544 let cli_default = config
545 .system_parameter_defaults
546 .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
547 .map(|x| {
548 Duration::parse(VarInput::Flat(x)).map_err(|err| {
549 anyhow!(
550 "failed to parse default for {}: {:?}",
551 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
552 err
553 )
554 })
555 })
556 .transpose()?;
557 let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
558 let ld = get_ld_value(
559 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
560 &remote_system_parameters,
561 |x| {
562 Duration::parse(VarInput::Flat(x)).map_err(|err| {
563 format!(
564 "failed to parse LD value {} for {}: {:?}",
565 x,
566 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
567 err
568 )
569 })
570 },
571 )?;
572 let catalog = openable_adapter_storage
573 .get_0dt_deployment_ddl_check_interval()
574 .await?;
575 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
576 info!(
577 ?computed,
578 ?ld,
579 ?catalog,
580 ?cli_default,
581 ?compiled_default,
582 "determined value for {} system parameter",
583 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
584 );
585 computed
586 };
587
588 let enable_0dt_deployment_panic_after_timeout = {
591 let cli_default = config
592 .system_parameter_defaults
593 .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
594 .map(|x| {
595 strconv::parse_bool(x).map_err(|err| {
596 anyhow!(
597 "failed to parse default for {}: {}",
598 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
599 err
600 )
601 })
602 })
603 .transpose()?;
604 let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
605 let ld = get_ld_value(
606 "enable_0dt_deployment_panic_after_timeout",
607 &remote_system_parameters,
608 |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
609 )?;
610 let catalog = openable_adapter_storage
611 .get_enable_0dt_deployment_panic_after_timeout()
612 .await?;
613 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
614 info!(
615 %computed,
616 ?ld,
617 ?catalog,
618 ?cli_default,
619 ?compiled_default,
620 "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
621 );
622 computed
623 };
624
625 let bootstrap_args = BootstrapArgs {
629 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
630 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
631 bootstrap_role: config.bootstrap_role.clone(),
632 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
633 };
634 let preflight_config = PreflightInput {
635 boot_ts,
636 environment_id: config.environment_id.clone(),
637 persist_client,
638 deploy_generation: config.controller.deploy_generation,
639 deployment_state: deployment_state.clone(),
640 openable_adapter_storage,
641 catalog_metrics: Arc::clone(&config.catalog_config.metrics),
642 caught_up_max_wait: with_0dt_deployment_max_wait,
643 panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
644 bootstrap_args,
645 ddl_check_interval: with_0dt_deployment_ddl_check_interval,
646 };
647 let PreflightOutput {
648 openable_adapter_storage,
649 read_only,
650 caught_up_trigger,
651 } = deployment::preflight::preflight_0dt(preflight_config).await?;
652
653 info!(
654 "startup: envd serve: preflight checks complete in {:?}",
655 preflight_checks_start.elapsed()
656 );
657
658 let catalog_open_start = Instant::now();
659 info!("startup: envd serve: durable catalog open beginning");
660
661 let bootstrap_args = BootstrapArgs {
662 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
663 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
664 bootstrap_role: config.bootstrap_role,
665 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
666 };
667
668 let (adapter_storage, audit_logs_iterator) = if read_only {
670 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
673 .open_savepoint(boot_ts, &bootstrap_args)
674 .await?;
675 (adapter_storage, audit_logs_iterator)
680 } else {
681 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
682 .open(boot_ts, &bootstrap_args)
683 .await?;
684
685 deployment_state.set_is_leader();
689
690 (adapter_storage, audit_logs_iterator)
691 };
692
693 if !read_only {
695 config.controller.persist_clients.cfg().enable_compaction();
696 }
697
698 info!(
699 "startup: envd serve: durable catalog open complete in {:?}",
700 catalog_open_start.elapsed()
701 );
702
703 let coord_init_start = Instant::now();
704 info!("startup: envd serve: coordinator init beginning");
705
706 if !config
707 .cluster_replica_sizes
708 .0
709 .contains_key(&config.bootstrap_default_cluster_replica_size)
710 {
711 return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
712 }
713 let envd_epoch = adapter_storage.epoch();
714
715 let storage_usage_client = StorageUsageClient::open(
717 config
718 .controller
719 .persist_clients
720 .open(config.controller.persist_location.clone())
721 .await
722 .context("opening storage usage client")?,
723 );
724
725 let segment_client = config.segment_api_key.map(|api_key| {
727 mz_segment::Client::new(mz_segment::Config {
728 api_key,
729 client_side: config.segment_client_side,
730 })
731 });
732 let connection_limiter = active_connection_counter.clone();
733 let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
734 connection_limiter.update_limit(limit);
735 connection_limiter.update_superuser_reserved(superuser_reserved);
736 });
737
738 let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
739 connection_context: config.controller.connection_context.clone(),
740 connection_limit_callback,
741 controller_config: config.controller,
742 controller_envd_epoch: envd_epoch,
743 storage: adapter_storage,
744 audit_logs_iterator,
745 timestamp_oracle_url: config.timestamp_oracle_url,
746 unsafe_mode: config.unsafe_mode,
747 all_features: config.all_features,
748 build_info: &BUILD_INFO,
749 environment_id: config.environment_id.clone(),
750 metrics_registry: config.metrics_registry.clone(),
751 now: config.now,
752 secrets_controller: config.secrets_controller,
753 cloud_resource_controller: config.cloud_resource_controller,
754 cluster_replica_sizes: config.cluster_replica_sizes,
755 builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
756 builtin_catalog_server_cluster_config: config
757 .bootstrap_builtin_catalog_server_cluster_config,
758 builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
759 builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
760 builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
761 availability_zones: config.availability_zones,
762 system_parameter_defaults: config.system_parameter_defaults,
763 storage_usage_client,
764 storage_usage_collection_interval: config.storage_usage_collection_interval,
765 storage_usage_retention_period: config.storage_usage_retention_period,
766 segment_client: segment_client.clone(),
767 egress_addresses: config.egress_addresses,
768 remote_system_parameters,
769 aws_account_id: config.aws_account_id,
770 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
771 webhook_concurrency_limit: webhook_concurrency_limit.clone(),
772 http_host_name: config.http_host_name,
773 tracing_handle: config.tracing_handle,
774 read_only_controllers: read_only,
775 caught_up_trigger,
776 helm_chart_version: config.helm_chart_version.clone(),
777 license_key: config.license_key,
778 external_login_password_mz_system: config.external_login_password_mz_system,
779 force_builtin_schema_migration: config.force_builtin_schema_migration,
780 })
781 .instrument(info_span!("adapter::serve"))
782 .await?;
783
784 let oidc = GenericOidcAuthenticator::new(adapter_client.clone());
786
787 info!(
788 "startup: envd serve: coordinator init complete in {:?}",
789 coord_init_start.elapsed()
790 );
791
792 let serve_postamble_start = Instant::now();
793 info!("startup: envd serve: postamble beginning");
794
795 authenticator_oidc_tx
797 .send(oidc.clone())
798 .expect("rx known to be live");
799 adapter_client_tx
800 .send(adapter_client.clone())
801 .expect("internal HTTP server should not drop first");
802
803 let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
804
805 let mut sql_listener_handles = BTreeMap::new();
807 for (name, listener) in self.sql {
808 sql_listener_handles.insert(
809 name.clone(),
810 listener
811 .serve_sql(
812 name,
813 active_connection_counter.clone(),
814 tls_reloading_context.clone(),
815 config.frontegg.clone(),
816 adapter_client.clone(),
817 oidc.clone(),
818 metrics.clone(),
819 config.helm_chart_version.clone(),
820 )
821 .await,
822 );
823 }
824
825 if let Some(segment_client) = segment_client {
827 telemetry::start_reporting(telemetry::Config {
828 segment_client,
829 adapter_client: adapter_client.clone(),
830 environment_id: config.environment_id,
831 report_interval: Duration::from_secs(3600),
832 });
833 } else if config.test_only_dummy_segment_client {
834 tracing::debug!("starting telemetry reporting with a dummy segment client");
840 let segment_client = mz_segment::Client::new_dummy_client();
841 telemetry::start_reporting(telemetry::Config {
842 segment_client,
843 adapter_client: adapter_client.clone(),
844 environment_id: config.environment_id,
845 report_interval: Duration::from_secs(180),
846 });
847 }
848
849 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
852 task::spawn(
853 || "system_parameter_sync",
854 AssertUnwindSafe(system_parameter_sync(
855 system_parameter_sync_config,
856 adapter_client.clone(),
857 config.config_sync_loop_interval,
858 ))
859 .ore_catch_unwind(),
860 );
861 }
862
863 info!(
864 "startup: envd serve: postamble complete in {:?}",
865 serve_postamble_start.elapsed()
866 );
867 info!(
868 "startup: envd serve: complete in {:?}",
869 serve_start.elapsed()
870 );
871
872 Ok(Server {
873 sql_listener_handles,
874 http_listener_handles,
875 _adapter_handle: adapter_handle,
876 })
877 }
878}
879
880fn get_ld_value<V>(
881 name: &str,
882 remote_system_parameters: &Option<BTreeMap<String, String>>,
883 parse: impl Fn(&str) -> Result<V, String>,
884) -> Result<Option<V>, anyhow::Error> {
885 remote_system_parameters
886 .as_ref()
887 .and_then(|params| params.get(name))
888 .map(|x| {
889 parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
890 })
891 .transpose()
892}
893
894pub struct Server {
896 pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
898 pub http_listener_handles: BTreeMap<String, ListenerHandle>,
899 _adapter_handle: mz_adapter::Handle,
900}