1use ::http::HeaderValue;
17use std::collections::BTreeMap;
18use std::panic::AssertUnwindSafe;
19use std::path::PathBuf;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use std::{env, io};
24
25use anyhow::{Context, anyhow};
26use derivative::Derivative;
27use futures::FutureExt;
28use ipnet::IpNet;
29use mz_adapter::config::{
30 SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
31};
32use mz_adapter::webhook::WebhookConcurrencyLimiter;
33use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
34use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
35use mz_adapter_types::dyncfgs::{
36 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
37 WITH_0DT_DEPLOYMENT_MAX_WAIT,
38};
39use mz_auth::password::Password;
40use mz_authenticator::GenericOidcAuthenticator;
41use mz_build_info::{BuildInfo, build_info};
42use mz_catalog::config::ClusterReplicaSizeMap;
43use mz_catalog::durable::BootstrapArgs;
44use mz_cloud_resources::CloudResourceController;
45use mz_controller::ControllerConfig;
46use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
47use mz_license_keys::ValidatedLicenseKey;
48use mz_ore::future::OreFutureExt;
49use mz_ore::metrics::MetricsRegistry;
50use mz_ore::now::NowFn;
51use mz_ore::tracing::TracingHandle;
52use mz_ore::url::SensitiveUrl;
53use mz_ore::{instrument, task};
54use mz_persist_client::cache::PersistClientCache;
55use mz_persist_client::usage::StorageUsageClient;
56use mz_pgwire::MetricsConfig;
57use mz_pgwire_common::ConnectionCounter;
58use mz_repr::strconv;
59use mz_secrets::SecretsController;
60use mz_server_core::listeners::{
61 HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
62};
63use mz_server_core::{
64 ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
65 TlsCertConfig, TlsMode,
66};
67use mz_sql::catalog::EnvironmentId;
68use mz_sql::session::vars::{Value, VarInput};
69use tokio::sync::oneshot;
70use tower_http::cors::AllowOrigin;
71use tracing::{Instrument, info, info_span};
72
73use crate::deployment::preflight::{PreflightInput, PreflightOutput};
74use crate::deployment::state::DeploymentState;
75use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
76
77pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
78
79mod deployment;
80pub mod environmentd;
81pub mod http;
82mod telemetry;
83#[cfg(feature = "test")]
84pub mod test_util;
85
86pub const BUILD_INFO: BuildInfo = build_info!();
87
88#[derive(Derivative)]
90#[derivative(Debug)]
91pub struct Config {
92 pub unsafe_mode: bool,
96 pub all_features: bool,
99
100 pub tls: Option<TlsCertConfig>,
103 #[derivative(Debug = "ignore")]
105 pub tls_reload_certs: ReloadTrigger,
106 pub external_login_password_mz_system: Option<Password>,
108 pub frontegg: Option<FronteggAuthenticator>,
110 pub cors_allowed_origin: AllowOrigin,
113 pub cors_allowed_origin_list: Vec<HeaderValue>,
118 pub egress_addresses: Vec<IpNet>,
121 pub http_host_name: Option<String>,
127 pub internal_console_redirect_url: Option<String>,
130
131 pub controller: ControllerConfig,
134 pub secrets_controller: Arc<dyn SecretsController>,
136 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
138
139 pub storage_usage_collection_interval: Duration,
142 pub storage_usage_retention_period: Option<Duration>,
144
145 pub catalog_config: CatalogConfig,
148 pub availability_zones: Vec<String>,
151 pub cluster_replica_sizes: ClusterReplicaSizeMap,
153 pub timestamp_oracle_url: Option<SensitiveUrl>,
155 pub segment_api_key: Option<String>,
157 pub segment_client_side: bool,
160 pub test_only_dummy_segment_client: bool,
162 pub launchdarkly_sdk_key: Option<String>,
165 pub launchdarkly_key_map: BTreeMap<String, String>,
168 pub config_sync_timeout: Duration,
170 pub config_sync_loop_interval: Option<Duration>,
172 pub config_sync_file_path: Option<PathBuf>,
174
175 pub environment_id: EnvironmentId,
178 pub bootstrap_role: Option<String>,
180 pub bootstrap_default_cluster_replica_size: String,
182 pub bootstrap_default_cluster_replication_factor: u32,
184 pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
186 pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
188 pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
190 pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
192 pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
194 pub system_parameter_defaults: BTreeMap<String, String>,
197 pub helm_chart_version: Option<String>,
199 pub license_key: ValidatedLicenseKey,
201
202 pub aws_account_id: Option<String>,
206 pub aws_privatelink_availability_zones: Option<Vec<String>>,
208
209 pub metrics_registry: MetricsRegistry,
212 pub tracing_handle: TracingHandle,
214
215 pub now: NowFn,
218 pub force_builtin_schema_migration: Option<String>,
221}
222
223#[derive(Debug, Clone)]
225pub struct CatalogConfig {
226 pub persist_clients: Arc<PersistClientCache>,
228 pub metrics: Arc<mz_catalog::durable::Metrics>,
230}
231
232pub struct Listener<C> {
233 pub handle: ListenerHandle,
234 connection_stream: Pin<Box<dyn ConnectionStream>>,
235 config: C,
236}
237impl<C> Listener<C>
238where
239 C: ListenerConfig,
240{
241 async fn bind(config: C) -> Result<Self, io::Error> {
253 let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
254 Ok(Self {
255 handle,
256 connection_stream,
257 config,
258 })
259 }
260}
261
262impl Listener<SqlListenerConfig> {
263 #[instrument(name = "environmentd::serve_sql")]
264 pub async fn serve_sql(
265 self,
266 name: String,
267 active_connection_counter: ConnectionCounter,
268 tls_reloading_context: Option<ReloadingSslContext>,
269 frontegg: Option<FronteggAuthenticator>,
270 adapter_client: AdapterClient,
271 oidc: GenericOidcAuthenticator,
272 metrics: MetricsConfig,
273 helm_chart_version: Option<String>,
274 ) -> ListenerHandle {
275 let label: &'static str = Box::leak(name.into_boxed_str());
276 let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
277 context,
278 mode: if self.config.enable_tls {
279 TlsMode::Require
280 } else {
281 TlsMode::Allow
282 },
283 });
284
285 task::spawn(|| format!("{}_sql_server", label), {
286 let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
287 label,
288 tls,
289 adapter_client,
290 authenticator_kind: self.config.authenticator_kind,
291 frontegg,
292 oidc,
293 metrics,
294 active_connection_counter,
295 helm_chart_version,
296 allowed_roles: self.config.allowed_roles,
297 });
298 mz_server_core::serve(ServeConfig {
299 conns: self.connection_stream,
300 server: sql_server,
301 dyncfg: None,
304 })
305 });
306 self.handle
307 }
308}
309
310impl Listener<HttpListenerConfig> {
311 #[instrument(name = "environmentd::serve_http")]
312 pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
313 let task_name = format!("{}_http_server", &config.source);
314 task::spawn(|| task_name, {
315 let http_server = HttpServer::new(config);
316 mz_server_core::serve(ServeConfig {
317 conns: self.connection_stream,
318 server: http_server,
319 dyncfg: None,
322 })
323 });
324 self.handle
325 }
326}
327
328pub struct Listeners {
329 pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
330 pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
331}
332
333impl Listeners {
334 pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
335 let mut sql = BTreeMap::new();
336 for (name, config) in config.sql {
337 sql.insert(name, Listener::bind(config).await?);
338 }
339
340 let mut http = BTreeMap::new();
341 for (name, config) in config.http {
342 http.insert(name, Listener::bind(config).await?);
343 }
344
345 Ok(Listeners { http, sql })
346 }
347
348 #[instrument(name = "environmentd::serve")]
352 pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
353 let serve_start = Instant::now();
354 info!("startup: envd serve: beginning");
355 info!("startup: envd serve: preamble beginning");
356
357 let tls_reloading_context = match config.tls {
359 Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
360 None => None,
361 };
362
363 let active_connection_counter = ConnectionCounter::default();
364 let (deployment_state, deployment_state_handle) = DeploymentState::new();
365
366 let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
375 let internal_route_config = Arc::new(InternalRouteConfig {
376 deployment_state_handle,
377 internal_console_redirect_url: config.internal_console_redirect_url,
378 });
379
380 let (authenticator_oidc_tx, authenticator_oidc_rx) = oneshot::channel();
381 let authenticator_oidc_rx = authenticator_oidc_rx.shared();
382 let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
383 let adapter_client_rx = adapter_client_rx.shared();
384
385 let metrics_registry = config.metrics_registry.clone();
386 let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
387 let mut http_listener_handles = BTreeMap::new();
388 for (name, listener) in self.http {
389 let authenticator_kind = listener.config.authenticator_kind();
390 let source: &'static str = Box::leak(name.clone().into_boxed_str());
391 let tls = if listener.config.enable_tls() {
392 tls_reloading_context.clone()
393 } else {
394 None
395 };
396 let http_config = HttpConfig {
397 adapter_client_rx: adapter_client_rx.clone(),
398 active_connection_counter: active_connection_counter.clone(),
399 helm_chart_version: config.helm_chart_version.clone(),
400 source,
401 tls,
402 authenticator_kind,
403 frontegg: config.frontegg.clone(),
404 oidc_rx: authenticator_oidc_rx.clone(),
405 allowed_origin: config.cors_allowed_origin.clone(),
406 allowed_origin_list: config.cors_allowed_origin_list.clone(),
407 concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
408 metrics: metrics.clone(),
409 metrics_registry: metrics_registry.clone(),
410 allowed_roles: listener.config.allowed_roles(),
411 internal_route_config: Arc::clone(&internal_route_config),
412 routes_enabled: listener.config.routes.clone(),
413 replica_http_locator: Arc::clone(&config.controller.replica_http_locator),
414 };
415 http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
416 }
417
418 info!(
419 "startup: envd serve: preamble complete in {:?}",
420 serve_start.elapsed()
421 );
422
423 let catalog_init_start = Instant::now();
424 info!("startup: envd serve: catalog init beginning");
425
426 let boot_ts = (config.now)().into();
428
429 let persist_client = config
430 .catalog_config
431 .persist_clients
432 .open(config.controller.persist_location.clone())
433 .await
434 .context("opening persist client")?;
435 let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
436 persist_client.clone(),
437 config.environment_id.organization_id(),
438 BUILD_INFO.semver_version(),
439 Some(config.controller.deploy_generation),
440 Arc::clone(&config.catalog_config.metrics),
441 )
442 .await?;
443
444 info!(
445 "startup: envd serve: catalog init complete in {:?}",
446 catalog_init_start.elapsed()
447 );
448
449 let system_param_sync_start = Instant::now();
450 info!("startup: envd serve: system parameter sync beginning");
451 let system_parameter_sync_config =
453 match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
454 (None, None) => None,
455 (None, Some(f)) => {
456 info!("Using config file path {:?}", f);
457 Some(SystemParameterSyncConfig::new(
458 config.environment_id.clone(),
459 &BUILD_INFO,
460 &config.metrics_registry,
461 config.launchdarkly_key_map,
462 SystemParameterSyncClientConfig::File { path: f },
463 ))
464 }
465 (Some(key), None) => Some(SystemParameterSyncConfig::new(
466 config.environment_id.clone(),
467 &BUILD_INFO,
468 &config.metrics_registry,
469 config.launchdarkly_key_map,
470 SystemParameterSyncClientConfig::LaunchDarkly {
471 sdk_key: key,
472 now_fn: config.now.clone(),
473 },
474 )),
475
476 (Some(_), Some(_)) => {
477 panic!("Cannot configure both file and Launchdarkly based config syncing")
478 }
479 };
480
481 let remote_system_parameters = load_remote_system_parameters(
482 &mut openable_adapter_storage,
483 system_parameter_sync_config.clone(),
484 config.config_sync_timeout,
485 )
486 .await?;
487 info!(
488 "startup: envd serve: system parameter sync complete in {:?}",
489 system_param_sync_start.elapsed()
490 );
491
492 let preflight_checks_start = Instant::now();
493 info!("startup: envd serve: preflight checks beginning");
494
495 let with_0dt_deployment_max_wait = {
497 let cli_default = config
498 .system_parameter_defaults
499 .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
500 .map(|x| {
501 Duration::parse(VarInput::Flat(x)).map_err(|err| {
502 anyhow!(
503 "failed to parse default for {}: {:?}",
504 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
505 err
506 )
507 })
508 })
509 .transpose()?;
510 let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
511 let ld = get_ld_value(
512 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
513 &remote_system_parameters,
514 |x| {
515 Duration::parse(VarInput::Flat(x)).map_err(|err| {
516 format!(
517 "failed to parse LD value {} for {}: {:?}",
518 x,
519 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
520 err
521 )
522 })
523 },
524 )?;
525 let catalog = openable_adapter_storage
526 .get_0dt_deployment_max_wait()
527 .await?;
528 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
529 info!(
530 ?computed,
531 ?ld,
532 ?catalog,
533 ?cli_default,
534 ?compiled_default,
535 "determined value for {} system parameter",
536 WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
537 );
538 computed
539 };
540 let with_0dt_deployment_ddl_check_interval = {
542 let cli_default = config
543 .system_parameter_defaults
544 .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
545 .map(|x| {
546 Duration::parse(VarInput::Flat(x)).map_err(|err| {
547 anyhow!(
548 "failed to parse default for {}: {:?}",
549 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
550 err
551 )
552 })
553 })
554 .transpose()?;
555 let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
556 let ld = get_ld_value(
557 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
558 &remote_system_parameters,
559 |x| {
560 Duration::parse(VarInput::Flat(x)).map_err(|err| {
561 format!(
562 "failed to parse LD value {} for {}: {:?}",
563 x,
564 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
565 err
566 )
567 })
568 },
569 )?;
570 let catalog = openable_adapter_storage
571 .get_0dt_deployment_ddl_check_interval()
572 .await?;
573 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
574 info!(
575 ?computed,
576 ?ld,
577 ?catalog,
578 ?cli_default,
579 ?compiled_default,
580 "determined value for {} system parameter",
581 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
582 );
583 computed
584 };
585
586 let enable_0dt_deployment_panic_after_timeout = {
589 let cli_default = config
590 .system_parameter_defaults
591 .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
592 .map(|x| {
593 strconv::parse_bool(x).map_err(|err| {
594 anyhow!(
595 "failed to parse default for {}: {}",
596 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
597 err
598 )
599 })
600 })
601 .transpose()?;
602 let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
603 let ld = get_ld_value(
604 "enable_0dt_deployment_panic_after_timeout",
605 &remote_system_parameters,
606 |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
607 )?;
608 let catalog = openable_adapter_storage
609 .get_enable_0dt_deployment_panic_after_timeout()
610 .await?;
611 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
612 info!(
613 %computed,
614 ?ld,
615 ?catalog,
616 ?cli_default,
617 ?compiled_default,
618 "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
619 );
620 computed
621 };
622
623 let bootstrap_args = BootstrapArgs {
627 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
628 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
629 bootstrap_role: config.bootstrap_role.clone(),
630 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
631 };
632 let preflight_config = PreflightInput {
633 boot_ts,
634 environment_id: config.environment_id.clone(),
635 persist_client,
636 deploy_generation: config.controller.deploy_generation,
637 deployment_state: deployment_state.clone(),
638 openable_adapter_storage,
639 catalog_metrics: Arc::clone(&config.catalog_config.metrics),
640 caught_up_max_wait: with_0dt_deployment_max_wait,
641 panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
642 bootstrap_args,
643 ddl_check_interval: with_0dt_deployment_ddl_check_interval,
644 };
645 let PreflightOutput {
646 openable_adapter_storage,
647 read_only,
648 caught_up_trigger,
649 } = deployment::preflight::preflight_0dt(preflight_config).await?;
650
651 info!(
652 "startup: envd serve: preflight checks complete in {:?}",
653 preflight_checks_start.elapsed()
654 );
655
656 let catalog_open_start = Instant::now();
657 info!("startup: envd serve: durable catalog open beginning");
658
659 let bootstrap_args = BootstrapArgs {
660 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
661 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
662 bootstrap_role: config.bootstrap_role,
663 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
664 };
665
666 let (adapter_storage, audit_logs_iterator) = if read_only {
668 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
671 .open_savepoint(boot_ts, &bootstrap_args)
672 .await?;
673 (adapter_storage, audit_logs_iterator)
678 } else {
679 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
680 .open(boot_ts, &bootstrap_args)
681 .await?;
682
683 deployment_state.set_is_leader();
687
688 (adapter_storage, audit_logs_iterator)
689 };
690
691 if !read_only {
693 config.controller.persist_clients.cfg().enable_compaction();
694 }
695
696 info!(
697 "startup: envd serve: durable catalog open complete in {:?}",
698 catalog_open_start.elapsed()
699 );
700
701 let coord_init_start = Instant::now();
702 info!("startup: envd serve: coordinator init beginning");
703
704 if !config
705 .cluster_replica_sizes
706 .0
707 .contains_key(&config.bootstrap_default_cluster_replica_size)
708 {
709 return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
710 }
711 let envd_epoch = adapter_storage.epoch();
712
713 let storage_usage_client = StorageUsageClient::open(
715 config
716 .controller
717 .persist_clients
718 .open(config.controller.persist_location.clone())
719 .await
720 .context("opening storage usage client")?,
721 );
722
723 let segment_client = config.segment_api_key.map(|api_key| {
725 mz_segment::Client::new(mz_segment::Config {
726 api_key,
727 client_side: config.segment_client_side,
728 })
729 });
730 let connection_limiter = active_connection_counter.clone();
731 let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
732 connection_limiter.update_limit(limit);
733 connection_limiter.update_superuser_reserved(superuser_reserved);
734 });
735
736 let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
737 connection_context: config.controller.connection_context.clone(),
738 connection_limit_callback,
739 controller_config: config.controller,
740 controller_envd_epoch: envd_epoch,
741 storage: adapter_storage,
742 audit_logs_iterator,
743 timestamp_oracle_url: config.timestamp_oracle_url,
744 unsafe_mode: config.unsafe_mode,
745 all_features: config.all_features,
746 build_info: &BUILD_INFO,
747 environment_id: config.environment_id.clone(),
748 metrics_registry: config.metrics_registry.clone(),
749 now: config.now,
750 secrets_controller: config.secrets_controller,
751 cloud_resource_controller: config.cloud_resource_controller,
752 cluster_replica_sizes: config.cluster_replica_sizes,
753 builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
754 builtin_catalog_server_cluster_config: config
755 .bootstrap_builtin_catalog_server_cluster_config,
756 builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
757 builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
758 builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
759 availability_zones: config.availability_zones,
760 system_parameter_defaults: config.system_parameter_defaults,
761 storage_usage_client,
762 storage_usage_collection_interval: config.storage_usage_collection_interval,
763 storage_usage_retention_period: config.storage_usage_retention_period,
764 segment_client: segment_client.clone(),
765 egress_addresses: config.egress_addresses,
766 remote_system_parameters,
767 aws_account_id: config.aws_account_id,
768 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
769 webhook_concurrency_limit: webhook_concurrency_limit.clone(),
770 http_host_name: config.http_host_name,
771 tracing_handle: config.tracing_handle,
772 read_only_controllers: read_only,
773 caught_up_trigger,
774 helm_chart_version: config.helm_chart_version.clone(),
775 license_key: config.license_key,
776 external_login_password_mz_system: config.external_login_password_mz_system,
777 force_builtin_schema_migration: config.force_builtin_schema_migration,
778 })
779 .instrument(info_span!("adapter::serve"))
780 .await?;
781
782 let oidc = GenericOidcAuthenticator::new(adapter_client.clone());
784
785 info!(
786 "startup: envd serve: coordinator init complete in {:?}",
787 coord_init_start.elapsed()
788 );
789
790 let serve_postamble_start = Instant::now();
791 info!("startup: envd serve: postamble beginning");
792
793 authenticator_oidc_tx
795 .send(oidc.clone())
796 .expect("rx known to be live");
797 adapter_client_tx
798 .send(adapter_client.clone())
799 .expect("internal HTTP server should not drop first");
800
801 let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
802
803 let mut sql_listener_handles = BTreeMap::new();
805 for (name, listener) in self.sql {
806 sql_listener_handles.insert(
807 name.clone(),
808 listener
809 .serve_sql(
810 name,
811 active_connection_counter.clone(),
812 tls_reloading_context.clone(),
813 config.frontegg.clone(),
814 adapter_client.clone(),
815 oidc.clone(),
816 metrics.clone(),
817 config.helm_chart_version.clone(),
818 )
819 .await,
820 );
821 }
822
823 if let Some(segment_client) = segment_client {
825 telemetry::start_reporting(telemetry::Config {
826 segment_client,
827 adapter_client: adapter_client.clone(),
828 environment_id: config.environment_id,
829 report_interval: Duration::from_secs(3600),
830 });
831 } else if config.test_only_dummy_segment_client {
832 tracing::debug!("starting telemetry reporting with a dummy segment client");
838 let segment_client = mz_segment::Client::new_dummy_client();
839 telemetry::start_reporting(telemetry::Config {
840 segment_client,
841 adapter_client: adapter_client.clone(),
842 environment_id: config.environment_id,
843 report_interval: Duration::from_secs(180),
844 });
845 }
846
847 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
850 task::spawn(
851 || "system_parameter_sync",
852 AssertUnwindSafe(system_parameter_sync(
853 system_parameter_sync_config,
854 adapter_client.clone(),
855 config.config_sync_loop_interval,
856 ))
857 .ore_catch_unwind(),
858 );
859 }
860
861 info!(
862 "startup: envd serve: postamble complete in {:?}",
863 serve_postamble_start.elapsed()
864 );
865 info!(
866 "startup: envd serve: complete in {:?}",
867 serve_start.elapsed()
868 );
869
870 Ok(Server {
871 sql_listener_handles,
872 http_listener_handles,
873 _adapter_handle: adapter_handle,
874 })
875 }
876}
877
878fn get_ld_value<V>(
879 name: &str,
880 remote_system_parameters: &Option<BTreeMap<String, String>>,
881 parse: impl Fn(&str) -> Result<V, String>,
882) -> Result<Option<V>, anyhow::Error> {
883 remote_system_parameters
884 .as_ref()
885 .and_then(|params| params.get(name))
886 .map(|x| {
887 parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
888 })
889 .transpose()
890}
891
892pub struct Server {
894 pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
896 pub http_listener_handles: BTreeMap<String, ListenerHandle>,
897 _adapter_handle: mz_adapter::Handle,
898}