1use std::collections::BTreeMap;
17use std::panic::AssertUnwindSafe;
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use std::{env, io};
23
24use anyhow::{Context, anyhow};
25use derivative::Derivative;
26use futures::FutureExt;
27use ipnet::IpNet;
28use mz_adapter::config::{
29 SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
30};
31use mz_adapter::webhook::WebhookConcurrencyLimiter;
32use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
33use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
34use mz_adapter_types::dyncfgs::{
35 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
36 WITH_0DT_DEPLOYMENT_MAX_WAIT,
37};
38use mz_auth::password::Password;
39use mz_authenticator::GenericOidcAuthenticator;
40use mz_build_info::{BuildInfo, build_info};
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_catalog::durable::BootstrapArgs;
43use mz_cloud_resources::CloudResourceController;
44use mz_controller::ControllerConfig;
45use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_ore::future::OreFutureExt;
48use mz_ore::metrics::MetricsRegistry;
49use mz_ore::now::NowFn;
50use mz_ore::tracing::TracingHandle;
51use mz_ore::url::SensitiveUrl;
52use mz_ore::{instrument, task};
53use mz_persist_client::cache::PersistClientCache;
54use mz_persist_client::usage::StorageUsageClient;
55use mz_pgwire::MetricsConfig;
56use mz_pgwire_common::ConnectionCounter;
57use mz_repr::strconv;
58use mz_secrets::SecretsController;
59use mz_server_core::listeners::{
60 HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
61};
62use mz_server_core::{
63 ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
64 TlsCertConfig, TlsMode,
65};
66use mz_sql::catalog::EnvironmentId;
67use mz_sql::session::vars::{Value, VarInput};
68use tokio::sync::oneshot;
69use tower_http::cors::AllowOrigin;
70use tracing::{Instrument, info, info_span};
71
72use crate::deployment::preflight::{PreflightInput, PreflightOutput};
73use crate::deployment::state::DeploymentState;
74use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
75
76pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
77
78mod deployment;
79pub mod environmentd;
80pub mod http;
81mod telemetry;
82#[cfg(feature = "test")]
83pub mod test_util;
84
85pub const BUILD_INFO: BuildInfo = build_info!();
86
87#[derive(Derivative)]
89#[derivative(Debug)]
90pub struct Config {
91 pub unsafe_mode: bool,
95 pub all_features: bool,
98
99 pub tls: Option<TlsCertConfig>,
102 #[derivative(Debug = "ignore")]
104 pub tls_reload_certs: ReloadTrigger,
105 pub external_login_password_mz_system: Option<Password>,
107 pub frontegg: Option<FronteggAuthenticator>,
109 pub cors_allowed_origin: AllowOrigin,
112 pub egress_addresses: Vec<IpNet>,
115 pub http_host_name: Option<String>,
121 pub internal_console_redirect_url: Option<String>,
124
125 pub controller: ControllerConfig,
128 pub secrets_controller: Arc<dyn SecretsController>,
130 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
132
133 pub storage_usage_collection_interval: Duration,
136 pub storage_usage_retention_period: Option<Duration>,
138
139 pub catalog_config: CatalogConfig,
142 pub availability_zones: Vec<String>,
145 pub cluster_replica_sizes: ClusterReplicaSizeMap,
147 pub timestamp_oracle_url: Option<SensitiveUrl>,
149 pub segment_api_key: Option<String>,
151 pub segment_client_side: bool,
154 pub test_only_dummy_segment_client: bool,
156 pub launchdarkly_sdk_key: Option<String>,
159 pub launchdarkly_key_map: BTreeMap<String, String>,
162 pub config_sync_timeout: Duration,
164 pub config_sync_loop_interval: Option<Duration>,
166 pub config_sync_file_path: Option<PathBuf>,
168
169 pub environment_id: EnvironmentId,
172 pub bootstrap_role: Option<String>,
174 pub bootstrap_default_cluster_replica_size: String,
176 pub bootstrap_default_cluster_replication_factor: u32,
178 pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
180 pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
182 pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
184 pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
186 pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
188 pub system_parameter_defaults: BTreeMap<String, String>,
191 pub helm_chart_version: Option<String>,
193 pub license_key: ValidatedLicenseKey,
195
196 pub aws_account_id: Option<String>,
200 pub aws_privatelink_availability_zones: Option<Vec<String>>,
202
203 pub metrics_registry: MetricsRegistry,
206 pub tracing_handle: TracingHandle,
208
209 pub now: NowFn,
212 pub force_builtin_schema_migration: Option<String>,
215}
216
217#[derive(Debug, Clone)]
219pub struct CatalogConfig {
220 pub persist_clients: Arc<PersistClientCache>,
222 pub metrics: Arc<mz_catalog::durable::Metrics>,
224}
225
226pub struct Listener<C> {
227 pub handle: ListenerHandle,
228 connection_stream: Pin<Box<dyn ConnectionStream>>,
229 config: C,
230}
231impl<C> Listener<C>
232where
233 C: ListenerConfig,
234{
235 async fn bind(config: C) -> Result<Self, io::Error> {
247 let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
248 Ok(Self {
249 handle,
250 connection_stream,
251 config,
252 })
253 }
254}
255
256impl Listener<SqlListenerConfig> {
257 #[instrument(name = "environmentd::serve_sql")]
258 pub async fn serve_sql(
259 self,
260 name: String,
261 active_connection_counter: ConnectionCounter,
262 tls_reloading_context: Option<ReloadingSslContext>,
263 frontegg: Option<FronteggAuthenticator>,
264 adapter_client: AdapterClient,
265 oidc: GenericOidcAuthenticator,
266 metrics: MetricsConfig,
267 helm_chart_version: Option<String>,
268 ) -> ListenerHandle {
269 let label: &'static str = Box::leak(name.into_boxed_str());
270 let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
271 context,
272 mode: if self.config.enable_tls {
273 TlsMode::Require
274 } else {
275 TlsMode::Allow
276 },
277 });
278
279 task::spawn(|| format!("{}_sql_server", label), {
280 let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
281 label,
282 tls,
283 adapter_client,
284 authenticator_kind: self.config.authenticator_kind,
285 frontegg,
286 oidc,
287 metrics,
288 active_connection_counter,
289 helm_chart_version,
290 allowed_roles: self.config.allowed_roles,
291 });
292 mz_server_core::serve(ServeConfig {
293 conns: self.connection_stream,
294 server: sql_server,
295 dyncfg: None,
298 })
299 });
300 self.handle
301 }
302}
303
304impl Listener<HttpListenerConfig> {
305 #[instrument(name = "environmentd::serve_http")]
306 pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
307 let task_name = format!("{}_http_server", &config.source);
308 task::spawn(|| task_name, {
309 let http_server = HttpServer::new(config);
310 mz_server_core::serve(ServeConfig {
311 conns: self.connection_stream,
312 server: http_server,
313 dyncfg: None,
316 })
317 });
318 self.handle
319 }
320}
321
322pub struct Listeners {
323 pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
324 pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
325}
326
327impl Listeners {
328 pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
329 let mut sql = BTreeMap::new();
330 for (name, config) in config.sql {
331 sql.insert(name, Listener::bind(config).await?);
332 }
333
334 let mut http = BTreeMap::new();
335 for (name, config) in config.http {
336 http.insert(name, Listener::bind(config).await?);
337 }
338
339 Ok(Listeners { http, sql })
340 }
341
342 #[instrument(name = "environmentd::serve")]
346 pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
347 let serve_start = Instant::now();
348 info!("startup: envd serve: beginning");
349 info!("startup: envd serve: preamble beginning");
350
351 let tls_reloading_context = match config.tls {
353 Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
354 None => None,
355 };
356
357 let active_connection_counter = ConnectionCounter::default();
358 let (deployment_state, deployment_state_handle) = DeploymentState::new();
359
360 let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
369 let internal_route_config = Arc::new(InternalRouteConfig {
370 deployment_state_handle,
371 internal_console_redirect_url: config.internal_console_redirect_url,
372 });
373
374 let (authenticator_oidc_tx, authenticator_oidc_rx) = oneshot::channel();
375 let authenticator_oidc_rx = authenticator_oidc_rx.shared();
376 let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
377 let adapter_client_rx = adapter_client_rx.shared();
378
379 let metrics_registry = config.metrics_registry.clone();
380 let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
381 let mut http_listener_handles = BTreeMap::new();
382 for (name, listener) in self.http {
383 let authenticator_kind = listener.config.authenticator_kind();
384 let source: &'static str = Box::leak(name.clone().into_boxed_str());
385 let tls = if listener.config.enable_tls() {
386 tls_reloading_context.clone()
387 } else {
388 None
389 };
390 let http_config = HttpConfig {
391 adapter_client_rx: adapter_client_rx.clone(),
392 active_connection_counter: active_connection_counter.clone(),
393 helm_chart_version: config.helm_chart_version.clone(),
394 source,
395 tls,
396 authenticator_kind,
397 frontegg: config.frontegg.clone(),
398 oidc_rx: authenticator_oidc_rx.clone(),
399 allowed_origin: config.cors_allowed_origin.clone(),
400 concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
401 metrics: metrics.clone(),
402 metrics_registry: metrics_registry.clone(),
403 allowed_roles: listener.config.allowed_roles(),
404 internal_route_config: Arc::clone(&internal_route_config),
405 routes_enabled: listener.config.routes.clone(),
406 replica_http_locator: Arc::clone(&config.controller.replica_http_locator),
407 };
408 http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
409 }
410
411 info!(
412 "startup: envd serve: preamble complete in {:?}",
413 serve_start.elapsed()
414 );
415
416 let catalog_init_start = Instant::now();
417 info!("startup: envd serve: catalog init beginning");
418
419 let boot_ts = (config.now)().into();
421
422 let persist_client = config
423 .catalog_config
424 .persist_clients
425 .open(config.controller.persist_location.clone())
426 .await
427 .context("opening persist client")?;
428 let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
429 persist_client.clone(),
430 config.environment_id.organization_id(),
431 BUILD_INFO.semver_version(),
432 Some(config.controller.deploy_generation),
433 Arc::clone(&config.catalog_config.metrics),
434 )
435 .await?;
436
437 info!(
438 "startup: envd serve: catalog init complete in {:?}",
439 catalog_init_start.elapsed()
440 );
441
442 let system_param_sync_start = Instant::now();
443 info!("startup: envd serve: system parameter sync beginning");
444 let system_parameter_sync_config =
446 match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
447 (None, None) => None,
448 (None, Some(f)) => {
449 info!("Using config file path {:?}", f);
450 Some(SystemParameterSyncConfig::new(
451 config.environment_id.clone(),
452 &BUILD_INFO,
453 &config.metrics_registry,
454 config.launchdarkly_key_map,
455 SystemParameterSyncClientConfig::File { path: f },
456 ))
457 }
458 (Some(key), None) => Some(SystemParameterSyncConfig::new(
459 config.environment_id.clone(),
460 &BUILD_INFO,
461 &config.metrics_registry,
462 config.launchdarkly_key_map,
463 SystemParameterSyncClientConfig::LaunchDarkly {
464 sdk_key: key,
465 now_fn: config.now.clone(),
466 },
467 )),
468
469 (Some(_), Some(_)) => {
470 panic!("Cannot configure both file and Launchdarkly based config syncing")
471 }
472 };
473
474 let remote_system_parameters = load_remote_system_parameters(
475 &mut openable_adapter_storage,
476 system_parameter_sync_config.clone(),
477 config.config_sync_timeout,
478 )
479 .await?;
480 info!(
481 "startup: envd serve: system parameter sync complete in {:?}",
482 system_param_sync_start.elapsed()
483 );
484
485 let preflight_checks_start = Instant::now();
486 info!("startup: envd serve: preflight checks beginning");
487
488 let with_0dt_deployment_max_wait = {
490 let cli_default = config
491 .system_parameter_defaults
492 .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
493 .map(|x| {
494 Duration::parse(VarInput::Flat(x)).map_err(|err| {
495 anyhow!(
496 "failed to parse default for {}: {:?}",
497 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
498 err
499 )
500 })
501 })
502 .transpose()?;
503 let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
504 let ld = get_ld_value(
505 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
506 &remote_system_parameters,
507 |x| {
508 Duration::parse(VarInput::Flat(x)).map_err(|err| {
509 format!(
510 "failed to parse LD value {} for {}: {:?}",
511 x,
512 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
513 err
514 )
515 })
516 },
517 )?;
518 let catalog = openable_adapter_storage
519 .get_0dt_deployment_max_wait()
520 .await?;
521 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
522 info!(
523 ?computed,
524 ?ld,
525 ?catalog,
526 ?cli_default,
527 ?compiled_default,
528 "determined value for {} system parameter",
529 WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
530 );
531 computed
532 };
533 let with_0dt_deployment_ddl_check_interval = {
535 let cli_default = config
536 .system_parameter_defaults
537 .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
538 .map(|x| {
539 Duration::parse(VarInput::Flat(x)).map_err(|err| {
540 anyhow!(
541 "failed to parse default for {}: {:?}",
542 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
543 err
544 )
545 })
546 })
547 .transpose()?;
548 let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
549 let ld = get_ld_value(
550 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
551 &remote_system_parameters,
552 |x| {
553 Duration::parse(VarInput::Flat(x)).map_err(|err| {
554 format!(
555 "failed to parse LD value {} for {}: {:?}",
556 x,
557 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
558 err
559 )
560 })
561 },
562 )?;
563 let catalog = openable_adapter_storage
564 .get_0dt_deployment_ddl_check_interval()
565 .await?;
566 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
567 info!(
568 ?computed,
569 ?ld,
570 ?catalog,
571 ?cli_default,
572 ?compiled_default,
573 "determined value for {} system parameter",
574 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
575 );
576 computed
577 };
578
579 let enable_0dt_deployment_panic_after_timeout = {
582 let cli_default = config
583 .system_parameter_defaults
584 .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
585 .map(|x| {
586 strconv::parse_bool(x).map_err(|err| {
587 anyhow!(
588 "failed to parse default for {}: {}",
589 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
590 err
591 )
592 })
593 })
594 .transpose()?;
595 let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
596 let ld = get_ld_value(
597 "enable_0dt_deployment_panic_after_timeout",
598 &remote_system_parameters,
599 |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
600 )?;
601 let catalog = openable_adapter_storage
602 .get_enable_0dt_deployment_panic_after_timeout()
603 .await?;
604 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
605 info!(
606 %computed,
607 ?ld,
608 ?catalog,
609 ?cli_default,
610 ?compiled_default,
611 "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
612 );
613 computed
614 };
615
616 let bootstrap_args = BootstrapArgs {
620 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
621 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
622 bootstrap_role: config.bootstrap_role.clone(),
623 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
624 };
625 let preflight_config = PreflightInput {
626 boot_ts,
627 environment_id: config.environment_id.clone(),
628 persist_client,
629 deploy_generation: config.controller.deploy_generation,
630 deployment_state: deployment_state.clone(),
631 openable_adapter_storage,
632 catalog_metrics: Arc::clone(&config.catalog_config.metrics),
633 caught_up_max_wait: with_0dt_deployment_max_wait,
634 panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
635 bootstrap_args,
636 ddl_check_interval: with_0dt_deployment_ddl_check_interval,
637 };
638 let PreflightOutput {
639 openable_adapter_storage,
640 read_only,
641 caught_up_trigger,
642 } = deployment::preflight::preflight_0dt(preflight_config).await?;
643
644 info!(
645 "startup: envd serve: preflight checks complete in {:?}",
646 preflight_checks_start.elapsed()
647 );
648
649 let catalog_open_start = Instant::now();
650 info!("startup: envd serve: durable catalog open beginning");
651
652 let bootstrap_args = BootstrapArgs {
653 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
654 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
655 bootstrap_role: config.bootstrap_role,
656 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
657 };
658
659 let (adapter_storage, audit_logs_iterator) = if read_only {
661 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
664 .open_savepoint(boot_ts, &bootstrap_args)
665 .await?;
666 (adapter_storage, audit_logs_iterator)
671 } else {
672 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
673 .open(boot_ts, &bootstrap_args)
674 .await?;
675
676 deployment_state.set_is_leader();
680
681 (adapter_storage, audit_logs_iterator)
682 };
683
684 if !read_only {
686 config.controller.persist_clients.cfg().enable_compaction();
687 }
688
689 info!(
690 "startup: envd serve: durable catalog open complete in {:?}",
691 catalog_open_start.elapsed()
692 );
693
694 let coord_init_start = Instant::now();
695 info!("startup: envd serve: coordinator init beginning");
696
697 if !config
698 .cluster_replica_sizes
699 .0
700 .contains_key(&config.bootstrap_default_cluster_replica_size)
701 {
702 return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
703 }
704 let envd_epoch = adapter_storage.epoch();
705
706 let storage_usage_client = StorageUsageClient::open(
708 config
709 .controller
710 .persist_clients
711 .open(config.controller.persist_location.clone())
712 .await
713 .context("opening storage usage client")?,
714 );
715
716 let segment_client = config.segment_api_key.map(|api_key| {
718 mz_segment::Client::new(mz_segment::Config {
719 api_key,
720 client_side: config.segment_client_side,
721 })
722 });
723 let connection_limiter = active_connection_counter.clone();
724 let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
725 connection_limiter.update_limit(limit);
726 connection_limiter.update_superuser_reserved(superuser_reserved);
727 });
728
729 let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
730 connection_context: config.controller.connection_context.clone(),
731 connection_limit_callback,
732 controller_config: config.controller,
733 controller_envd_epoch: envd_epoch,
734 storage: adapter_storage,
735 audit_logs_iterator,
736 timestamp_oracle_url: config.timestamp_oracle_url,
737 unsafe_mode: config.unsafe_mode,
738 all_features: config.all_features,
739 build_info: &BUILD_INFO,
740 environment_id: config.environment_id.clone(),
741 metrics_registry: config.metrics_registry.clone(),
742 now: config.now,
743 secrets_controller: config.secrets_controller,
744 cloud_resource_controller: config.cloud_resource_controller,
745 cluster_replica_sizes: config.cluster_replica_sizes,
746 builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
747 builtin_catalog_server_cluster_config: config
748 .bootstrap_builtin_catalog_server_cluster_config,
749 builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
750 builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
751 builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
752 availability_zones: config.availability_zones,
753 system_parameter_defaults: config.system_parameter_defaults,
754 storage_usage_client,
755 storage_usage_collection_interval: config.storage_usage_collection_interval,
756 storage_usage_retention_period: config.storage_usage_retention_period,
757 segment_client: segment_client.clone(),
758 egress_addresses: config.egress_addresses,
759 remote_system_parameters,
760 aws_account_id: config.aws_account_id,
761 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
762 webhook_concurrency_limit: webhook_concurrency_limit.clone(),
763 http_host_name: config.http_host_name,
764 tracing_handle: config.tracing_handle,
765 read_only_controllers: read_only,
766 caught_up_trigger,
767 helm_chart_version: config.helm_chart_version.clone(),
768 license_key: config.license_key,
769 external_login_password_mz_system: config.external_login_password_mz_system,
770 force_builtin_schema_migration: config.force_builtin_schema_migration,
771 })
772 .instrument(info_span!("adapter::serve"))
773 .await?;
774
775 let oidc = GenericOidcAuthenticator::new(adapter_client.clone());
777
778 info!(
779 "startup: envd serve: coordinator init complete in {:?}",
780 coord_init_start.elapsed()
781 );
782
783 let serve_postamble_start = Instant::now();
784 info!("startup: envd serve: postamble beginning");
785
786 authenticator_oidc_tx
788 .send(oidc.clone())
789 .expect("rx known to be live");
790 adapter_client_tx
791 .send(adapter_client.clone())
792 .expect("internal HTTP server should not drop first");
793
794 let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
795
796 let mut sql_listener_handles = BTreeMap::new();
798 for (name, listener) in self.sql {
799 sql_listener_handles.insert(
800 name.clone(),
801 listener
802 .serve_sql(
803 name,
804 active_connection_counter.clone(),
805 tls_reloading_context.clone(),
806 config.frontegg.clone(),
807 adapter_client.clone(),
808 oidc.clone(),
809 metrics.clone(),
810 config.helm_chart_version.clone(),
811 )
812 .await,
813 );
814 }
815
816 if let Some(segment_client) = segment_client {
818 telemetry::start_reporting(telemetry::Config {
819 segment_client,
820 adapter_client: adapter_client.clone(),
821 environment_id: config.environment_id,
822 report_interval: Duration::from_secs(3600),
823 });
824 } else if config.test_only_dummy_segment_client {
825 tracing::debug!("starting telemetry reporting with a dummy segment client");
831 let segment_client = mz_segment::Client::new_dummy_client();
832 telemetry::start_reporting(telemetry::Config {
833 segment_client,
834 adapter_client: adapter_client.clone(),
835 environment_id: config.environment_id,
836 report_interval: Duration::from_secs(180),
837 });
838 }
839
840 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
843 task::spawn(
844 || "system_parameter_sync",
845 AssertUnwindSafe(system_parameter_sync(
846 system_parameter_sync_config,
847 adapter_client.clone(),
848 config.config_sync_loop_interval,
849 ))
850 .ore_catch_unwind(),
851 );
852 }
853
854 info!(
855 "startup: envd serve: postamble complete in {:?}",
856 serve_postamble_start.elapsed()
857 );
858 info!(
859 "startup: envd serve: complete in {:?}",
860 serve_start.elapsed()
861 );
862
863 Ok(Server {
864 sql_listener_handles,
865 http_listener_handles,
866 _adapter_handle: adapter_handle,
867 })
868 }
869}
870
871fn get_ld_value<V>(
872 name: &str,
873 remote_system_parameters: &Option<BTreeMap<String, String>>,
874 parse: impl Fn(&str) -> Result<V, String>,
875) -> Result<Option<V>, anyhow::Error> {
876 remote_system_parameters
877 .as_ref()
878 .and_then(|params| params.get(name))
879 .map(|x| {
880 parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
881 })
882 .transpose()
883}
884
885pub struct Server {
887 pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
889 pub http_listener_handles: BTreeMap<String, ListenerHandle>,
890 _adapter_handle: mz_adapter::Handle,
891}