1use std::collections::BTreeMap;
17use std::panic::AssertUnwindSafe;
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use std::{env, io};
23
24use anyhow::{Context, anyhow};
25use derivative::Derivative;
26use futures::FutureExt;
27use ipnet::IpNet;
28use mz_adapter::config::{
29 SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
30};
31use mz_adapter::webhook::WebhookConcurrencyLimiter;
32use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
33use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
34use mz_adapter_types::dyncfgs::{
35 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
36 WITH_0DT_DEPLOYMENT_MAX_WAIT,
37};
38use mz_auth::password::Password;
39use mz_authenticator::Authenticator;
40use mz_build_info::{BuildInfo, build_info};
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_catalog::durable::BootstrapArgs;
43use mz_cloud_resources::CloudResourceController;
44use mz_controller::ControllerConfig;
45use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_ore::future::OreFutureExt;
48use mz_ore::metrics::MetricsRegistry;
49use mz_ore::now::NowFn;
50use mz_ore::tracing::TracingHandle;
51use mz_ore::url::SensitiveUrl;
52use mz_ore::{instrument, task};
53use mz_persist_client::cache::PersistClientCache;
54use mz_persist_client::usage::StorageUsageClient;
55use mz_pgwire::MetricsConfig;
56use mz_pgwire_common::ConnectionCounter;
57use mz_repr::strconv;
58use mz_secrets::SecretsController;
59use mz_server_core::listeners::{
60 AuthenticatorKind, HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
61};
62use mz_server_core::{
63 ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
64 TlsCertConfig, TlsMode,
65};
66use mz_sql::catalog::EnvironmentId;
67use mz_sql::session::vars::{Value, VarInput};
68use tokio::sync::oneshot;
69use tower_http::cors::AllowOrigin;
70use tracing::{Instrument, info, info_span};
71
72use crate::deployment::preflight::{PreflightInput, PreflightOutput};
73use crate::deployment::state::DeploymentState;
74use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
75
76pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
77
78mod deployment;
79pub mod environmentd;
80pub mod http;
81mod telemetry;
82#[cfg(feature = "test")]
83pub mod test_util;
84
85pub const BUILD_INFO: BuildInfo = build_info!();
86
87#[derive(Derivative)]
89#[derivative(Debug)]
90pub struct Config {
91 pub unsafe_mode: bool,
95 pub all_features: bool,
98
99 pub tls: Option<TlsCertConfig>,
102 #[derivative(Debug = "ignore")]
104 pub tls_reload_certs: ReloadTrigger,
105 pub external_login_password_mz_system: Option<Password>,
107 pub frontegg: Option<FronteggAuthenticator>,
109 pub cors_allowed_origin: AllowOrigin,
112 pub egress_addresses: Vec<IpNet>,
115 pub http_host_name: Option<String>,
121 pub internal_console_redirect_url: Option<String>,
124
125 pub controller: ControllerConfig,
128 pub secrets_controller: Arc<dyn SecretsController>,
130 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
132
133 pub storage_usage_collection_interval: Duration,
136 pub storage_usage_retention_period: Option<Duration>,
138
139 pub catalog_config: CatalogConfig,
142 pub availability_zones: Vec<String>,
145 pub cluster_replica_sizes: ClusterReplicaSizeMap,
147 pub timestamp_oracle_url: Option<SensitiveUrl>,
149 pub segment_api_key: Option<String>,
151 pub segment_client_side: bool,
154 pub test_only_dummy_segment_client: bool,
156 pub launchdarkly_sdk_key: Option<String>,
159 pub launchdarkly_key_map: BTreeMap<String, String>,
162 pub config_sync_timeout: Duration,
164 pub config_sync_loop_interval: Option<Duration>,
166 pub config_sync_file_path: Option<PathBuf>,
168
169 pub environment_id: EnvironmentId,
172 pub bootstrap_role: Option<String>,
174 pub bootstrap_default_cluster_replica_size: String,
176 pub bootstrap_default_cluster_replication_factor: u32,
178 pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
180 pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
182 pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
184 pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
186 pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
188 pub system_parameter_defaults: BTreeMap<String, String>,
191 pub helm_chart_version: Option<String>,
193 pub license_key: ValidatedLicenseKey,
195
196 pub aws_account_id: Option<String>,
200 pub aws_privatelink_availability_zones: Option<Vec<String>>,
202
203 pub metrics_registry: MetricsRegistry,
206 pub tracing_handle: TracingHandle,
208
209 pub now: NowFn,
212}
213
214#[derive(Debug, Clone)]
216pub struct CatalogConfig {
217 pub persist_clients: Arc<PersistClientCache>,
219 pub metrics: Arc<mz_catalog::durable::Metrics>,
221}
222
223pub struct Listener<C> {
224 pub handle: ListenerHandle,
225 connection_stream: Pin<Box<dyn ConnectionStream>>,
226 config: C,
227}
228impl<C> Listener<C>
229where
230 C: ListenerConfig,
231{
232 async fn bind(config: C) -> Result<Self, io::Error> {
244 let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
245 Ok(Self {
246 handle,
247 connection_stream,
248 config,
249 })
250 }
251}
252
253impl Listener<SqlListenerConfig> {
254 #[instrument(name = "environmentd::serve_sql")]
255 pub async fn serve_sql(
256 self,
257 name: String,
258 active_connection_counter: ConnectionCounter,
259 tls_reloading_context: Option<ReloadingSslContext>,
260 frontegg: Option<FronteggAuthenticator>,
261 adapter_client: AdapterClient,
262 metrics: MetricsConfig,
263 helm_chart_version: Option<String>,
264 ) -> ListenerHandle {
265 let label: &'static str = Box::leak(name.into_boxed_str());
266 let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
267 context,
268 mode: if self.config.enable_tls {
269 TlsMode::Require
270 } else {
271 TlsMode::Allow
272 },
273 });
274 let authenticator = match self.config.authenticator_kind {
275 AuthenticatorKind::Frontegg => Authenticator::Frontegg(
276 frontegg.expect("Frontegg args are required with AuthenticatorKind::Frontegg"),
277 ),
278 AuthenticatorKind::Password => Authenticator::Password(adapter_client.clone()),
279 AuthenticatorKind::Sasl => Authenticator::Sasl(adapter_client.clone()),
280 AuthenticatorKind::None => Authenticator::None,
281 };
282
283 task::spawn(|| format!("{}_sql_server", label), {
284 let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
285 label,
286 tls,
287 adapter_client,
288 authenticator,
289 metrics,
290 active_connection_counter,
291 helm_chart_version,
292 allowed_roles: self.config.allowed_roles,
293 });
294 mz_server_core::serve(ServeConfig {
295 conns: self.connection_stream,
296 server: sql_server,
297 dyncfg: None,
300 })
301 });
302 self.handle
303 }
304}
305
306impl Listener<HttpListenerConfig> {
307 #[instrument(name = "environmentd::serve_http")]
308 pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
309 let task_name = format!("{}_http_server", &config.source);
310 task::spawn(|| task_name, {
311 let http_server = HttpServer::new(config);
312 mz_server_core::serve(ServeConfig {
313 conns: self.connection_stream,
314 server: http_server,
315 dyncfg: None,
318 })
319 });
320 self.handle
321 }
322}
323
324pub struct Listeners {
325 pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
326 pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
327}
328
329impl Listeners {
330 pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
331 let mut sql = BTreeMap::new();
332 for (name, config) in config.sql {
333 sql.insert(name, Listener::bind(config).await?);
334 }
335
336 let mut http = BTreeMap::new();
337 for (name, config) in config.http {
338 http.insert(name, Listener::bind(config).await?);
339 }
340
341 Ok(Listeners { http, sql })
342 }
343
344 #[instrument(name = "environmentd::serve")]
348 pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
349 let serve_start = Instant::now();
350 info!("startup: envd serve: beginning");
351 info!("startup: envd serve: preamble beginning");
352
353 let tls_reloading_context = match config.tls {
355 Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
356 None => None,
357 };
358
359 let active_connection_counter = ConnectionCounter::default();
360 let (deployment_state, deployment_state_handle) = DeploymentState::new();
361
362 let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
371 let internal_route_config = Arc::new(InternalRouteConfig {
372 deployment_state_handle,
373 internal_console_redirect_url: config.internal_console_redirect_url,
374 });
375
376 let (authenticator_frontegg_tx, authenticator_frontegg_rx) = oneshot::channel();
377 let authenticator_frontegg_rx = authenticator_frontegg_rx.shared();
378 let (authenticator_password_tx, authenticator_password_rx) = oneshot::channel();
379 let authenticator_password_rx = authenticator_password_rx.shared();
380 let (authenticator_none_tx, authenticator_none_rx) = oneshot::channel();
381 let authenticator_none_rx = authenticator_none_rx.shared();
382
383 if let Some(frontegg) = &config.frontegg {
386 authenticator_frontegg_tx
387 .send(Arc::new(Authenticator::Frontegg(frontegg.clone())))
388 .expect("rx known to be live");
389 }
390 authenticator_none_tx
391 .send(Arc::new(Authenticator::None))
392 .expect("rx known to be live");
393
394 let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
395 let adapter_client_rx = adapter_client_rx.shared();
396
397 let metrics_registry = config.metrics_registry.clone();
398 let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
399 let mut http_listener_handles = BTreeMap::new();
400 for (name, listener) in self.http {
401 let authenticator_kind = listener.config.authenticator_kind();
402 let authenticator_rx = match authenticator_kind {
403 AuthenticatorKind::Frontegg => authenticator_frontegg_rx.clone(),
404 AuthenticatorKind::Password => authenticator_password_rx.clone(),
405 AuthenticatorKind::Sasl => authenticator_password_rx.clone(),
406 AuthenticatorKind::None => authenticator_none_rx.clone(),
407 };
408 let source: &'static str = Box::leak(name.clone().into_boxed_str());
409 let tls = if listener.config.enable_tls() {
410 tls_reloading_context.clone()
411 } else {
412 None
413 };
414 let http_config = HttpConfig {
415 adapter_client_rx: adapter_client_rx.clone(),
416 active_connection_counter: active_connection_counter.clone(),
417 helm_chart_version: config.helm_chart_version.clone(),
418 source,
419 tls,
420 authenticator_kind,
421 authenticator_rx,
422 allowed_origin: config.cors_allowed_origin.clone(),
423 concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
424 metrics: metrics.clone(),
425 metrics_registry: metrics_registry.clone(),
426 allowed_roles: listener.config.allowed_roles(),
427 internal_route_config: Arc::clone(&internal_route_config),
428 routes_enabled: listener.config.routes.clone(),
429 };
430 http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
431 }
432
433 info!(
434 "startup: envd serve: preamble complete in {:?}",
435 serve_start.elapsed()
436 );
437
438 let catalog_init_start = Instant::now();
439 info!("startup: envd serve: catalog init beginning");
440
441 let boot_ts = (config.now)().into();
443
444 let persist_client = config
445 .catalog_config
446 .persist_clients
447 .open(config.controller.persist_location.clone())
448 .await
449 .context("opening persist client")?;
450 let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
451 persist_client.clone(),
452 config.environment_id.organization_id(),
453 BUILD_INFO.semver_version(),
454 Some(config.controller.deploy_generation),
455 Arc::clone(&config.catalog_config.metrics),
456 )
457 .await?;
458
459 info!(
460 "startup: envd serve: catalog init complete in {:?}",
461 catalog_init_start.elapsed()
462 );
463
464 let system_param_sync_start = Instant::now();
465 info!("startup: envd serve: system parameter sync beginning");
466 let system_parameter_sync_config =
468 match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
469 (None, None) => None,
470 (None, Some(f)) => Some(SystemParameterSyncConfig::new(
471 config.environment_id.clone(),
472 &BUILD_INFO,
473 &config.metrics_registry,
474 config.launchdarkly_key_map,
475 SystemParameterSyncClientConfig::File { path: f },
476 )),
477 (Some(key), None) => Some(SystemParameterSyncConfig::new(
478 config.environment_id.clone(),
479 &BUILD_INFO,
480 &config.metrics_registry,
481 config.launchdarkly_key_map,
482 SystemParameterSyncClientConfig::LaunchDarkly {
483 sdk_key: key,
484 now_fn: config.now.clone(),
485 },
486 )),
487
488 (Some(_), Some(_)) => {
489 panic!("Cannot configure both file and Launchdarkly based config syncing")
490 }
491 };
492
493 let remote_system_parameters = load_remote_system_parameters(
494 &mut openable_adapter_storage,
495 system_parameter_sync_config.clone(),
496 config.config_sync_timeout,
497 )
498 .await?;
499 info!(
500 "startup: envd serve: system parameter sync complete in {:?}",
501 system_param_sync_start.elapsed()
502 );
503
504 let preflight_checks_start = Instant::now();
505 info!("startup: envd serve: preflight checks beginning");
506
507 let with_0dt_deployment_max_wait = {
509 let cli_default = config
510 .system_parameter_defaults
511 .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
512 .map(|x| {
513 Duration::parse(VarInput::Flat(x)).map_err(|err| {
514 anyhow!(
515 "failed to parse default for {}: {:?}",
516 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
517 err
518 )
519 })
520 })
521 .transpose()?;
522 let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
523 let ld = get_ld_value(
524 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
525 &remote_system_parameters,
526 |x| {
527 Duration::parse(VarInput::Flat(x)).map_err(|err| {
528 format!(
529 "failed to parse LD value {} for {}: {:?}",
530 x,
531 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
532 err
533 )
534 })
535 },
536 )?;
537 let catalog = openable_adapter_storage
538 .get_0dt_deployment_max_wait()
539 .await?;
540 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
541 info!(
542 ?computed,
543 ?ld,
544 ?catalog,
545 ?cli_default,
546 ?compiled_default,
547 "determined value for {} system parameter",
548 WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
549 );
550 computed
551 };
552 let with_0dt_deployment_ddl_check_interval = {
554 let cli_default = config
555 .system_parameter_defaults
556 .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
557 .map(|x| {
558 Duration::parse(VarInput::Flat(x)).map_err(|err| {
559 anyhow!(
560 "failed to parse default for {}: {:?}",
561 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
562 err
563 )
564 })
565 })
566 .transpose()?;
567 let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
568 let ld = get_ld_value(
569 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
570 &remote_system_parameters,
571 |x| {
572 Duration::parse(VarInput::Flat(x)).map_err(|err| {
573 format!(
574 "failed to parse LD value {} for {}: {:?}",
575 x,
576 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
577 err
578 )
579 })
580 },
581 )?;
582 let catalog = openable_adapter_storage
583 .get_0dt_deployment_ddl_check_interval()
584 .await?;
585 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
586 info!(
587 ?computed,
588 ?ld,
589 ?catalog,
590 ?cli_default,
591 ?compiled_default,
592 "determined value for {} system parameter",
593 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
594 );
595 computed
596 };
597
598 let enable_0dt_deployment_panic_after_timeout = {
601 let cli_default = config
602 .system_parameter_defaults
603 .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
604 .map(|x| {
605 strconv::parse_bool(x).map_err(|err| {
606 anyhow!(
607 "failed to parse default for {}: {}",
608 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
609 err
610 )
611 })
612 })
613 .transpose()?;
614 let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
615 let ld = get_ld_value(
616 "enable_0dt_deployment_panic_after_timeout",
617 &remote_system_parameters,
618 |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
619 )?;
620 let catalog = openable_adapter_storage
621 .get_enable_0dt_deployment_panic_after_timeout()
622 .await?;
623 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
624 info!(
625 %computed,
626 ?ld,
627 ?catalog,
628 ?cli_default,
629 ?compiled_default,
630 "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
631 );
632 computed
633 };
634
635 let bootstrap_args = BootstrapArgs {
639 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
640 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
641 bootstrap_role: config.bootstrap_role.clone(),
642 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
643 };
644 let preflight_config = PreflightInput {
645 boot_ts,
646 environment_id: config.environment_id.clone(),
647 persist_client,
648 deploy_generation: config.controller.deploy_generation,
649 deployment_state: deployment_state.clone(),
650 openable_adapter_storage,
651 catalog_metrics: Arc::clone(&config.catalog_config.metrics),
652 caught_up_max_wait: with_0dt_deployment_max_wait,
653 panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
654 bootstrap_args,
655 ddl_check_interval: with_0dt_deployment_ddl_check_interval,
656 };
657 let PreflightOutput {
658 openable_adapter_storage,
659 read_only,
660 caught_up_trigger,
661 } = deployment::preflight::preflight_0dt(preflight_config).await?;
662
663 info!(
664 "startup: envd serve: preflight checks complete in {:?}",
665 preflight_checks_start.elapsed()
666 );
667
668 let catalog_open_start = Instant::now();
669 info!("startup: envd serve: durable catalog open beginning");
670
671 let bootstrap_args = BootstrapArgs {
672 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
673 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
674 bootstrap_role: config.bootstrap_role,
675 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
676 };
677
678 let (adapter_storage, audit_logs_iterator) = if read_only {
680 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
683 .open_savepoint(boot_ts, &bootstrap_args)
684 .await?;
685 (adapter_storage, audit_logs_iterator)
690 } else {
691 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
692 .open(boot_ts, &bootstrap_args)
693 .await?;
694
695 deployment_state.set_is_leader();
699
700 (adapter_storage, audit_logs_iterator)
701 };
702
703 if !read_only {
705 config.controller.persist_clients.cfg().enable_compaction();
706 }
707
708 info!(
709 "startup: envd serve: durable catalog open complete in {:?}",
710 catalog_open_start.elapsed()
711 );
712
713 let coord_init_start = Instant::now();
714 info!("startup: envd serve: coordinator init beginning");
715
716 if !config
717 .cluster_replica_sizes
718 .0
719 .contains_key(&config.bootstrap_default_cluster_replica_size)
720 {
721 return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
722 }
723 let envd_epoch = adapter_storage.epoch();
724
725 let storage_usage_client = StorageUsageClient::open(
727 config
728 .controller
729 .persist_clients
730 .open(config.controller.persist_location.clone())
731 .await
732 .context("opening storage usage client")?,
733 );
734
735 let segment_client = config.segment_api_key.map(|api_key| {
737 mz_segment::Client::new(mz_segment::Config {
738 api_key,
739 client_side: config.segment_client_side,
740 })
741 });
742 let connection_limiter = active_connection_counter.clone();
743 let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
744 connection_limiter.update_limit(limit);
745 connection_limiter.update_superuser_reserved(superuser_reserved);
746 });
747
748 let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
749 connection_context: config.controller.connection_context.clone(),
750 connection_limit_callback,
751 controller_config: config.controller,
752 controller_envd_epoch: envd_epoch,
753 storage: adapter_storage,
754 audit_logs_iterator,
755 timestamp_oracle_url: config.timestamp_oracle_url,
756 unsafe_mode: config.unsafe_mode,
757 all_features: config.all_features,
758 build_info: &BUILD_INFO,
759 environment_id: config.environment_id.clone(),
760 metrics_registry: config.metrics_registry.clone(),
761 now: config.now,
762 secrets_controller: config.secrets_controller,
763 cloud_resource_controller: config.cloud_resource_controller,
764 cluster_replica_sizes: config.cluster_replica_sizes,
765 builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
766 builtin_catalog_server_cluster_config: config
767 .bootstrap_builtin_catalog_server_cluster_config,
768 builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
769 builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
770 builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
771 availability_zones: config.availability_zones,
772 system_parameter_defaults: config.system_parameter_defaults,
773 storage_usage_client,
774 storage_usage_collection_interval: config.storage_usage_collection_interval,
775 storage_usage_retention_period: config.storage_usage_retention_period,
776 segment_client: segment_client.clone(),
777 egress_addresses: config.egress_addresses,
778 remote_system_parameters,
779 aws_account_id: config.aws_account_id,
780 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
781 webhook_concurrency_limit: webhook_concurrency_limit.clone(),
782 http_host_name: config.http_host_name,
783 tracing_handle: config.tracing_handle,
784 read_only_controllers: read_only,
785 caught_up_trigger,
786 helm_chart_version: config.helm_chart_version.clone(),
787 license_key: config.license_key,
788 external_login_password_mz_system: config.external_login_password_mz_system,
789 })
790 .instrument(info_span!("adapter::serve"))
791 .await?;
792
793 info!(
794 "startup: envd serve: coordinator init complete in {:?}",
795 coord_init_start.elapsed()
796 );
797
798 let serve_postamble_start = Instant::now();
799 info!("startup: envd serve: postamble beginning");
800
801 authenticator_password_tx
803 .send(Arc::new(Authenticator::Password(adapter_client.clone())))
804 .expect("rx known to be live");
805 adapter_client_tx
806 .send(adapter_client.clone())
807 .expect("internal HTTP server should not drop first");
808
809 let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
810
811 let mut sql_listener_handles = BTreeMap::new();
813 for (name, listener) in self.sql {
814 sql_listener_handles.insert(
815 name.clone(),
816 listener
817 .serve_sql(
818 name,
819 active_connection_counter.clone(),
820 tls_reloading_context.clone(),
821 config.frontegg.clone(),
822 adapter_client.clone(),
823 metrics.clone(),
824 config.helm_chart_version.clone(),
825 )
826 .await,
827 );
828 }
829
830 if let Some(segment_client) = segment_client {
832 telemetry::start_reporting(telemetry::Config {
833 segment_client,
834 adapter_client: adapter_client.clone(),
835 environment_id: config.environment_id,
836 report_interval: Duration::from_secs(3600),
837 });
838 } else if config.test_only_dummy_segment_client {
839 tracing::debug!("starting telemetry reporting with a dummy segment client");
845 let segment_client = mz_segment::Client::new_dummy_client();
846 telemetry::start_reporting(telemetry::Config {
847 segment_client,
848 adapter_client: adapter_client.clone(),
849 environment_id: config.environment_id,
850 report_interval: Duration::from_secs(180),
851 });
852 }
853
854 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
857 task::spawn(
858 || "system_parameter_sync",
859 AssertUnwindSafe(system_parameter_sync(
860 system_parameter_sync_config,
861 adapter_client.clone(),
862 config.config_sync_loop_interval,
863 ))
864 .ore_catch_unwind(),
865 );
866 }
867
868 info!(
869 "startup: envd serve: postamble complete in {:?}",
870 serve_postamble_start.elapsed()
871 );
872 info!(
873 "startup: envd serve: complete in {:?}",
874 serve_start.elapsed()
875 );
876
877 Ok(Server {
878 sql_listener_handles,
879 http_listener_handles,
880 _adapter_handle: adapter_handle,
881 })
882 }
883}
884
885fn get_ld_value<V>(
886 name: &str,
887 remote_system_parameters: &Option<BTreeMap<String, String>>,
888 parse: impl Fn(&str) -> Result<V, String>,
889) -> Result<Option<V>, anyhow::Error> {
890 remote_system_parameters
891 .as_ref()
892 .and_then(|params| params.get(name))
893 .map(|x| {
894 parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
895 })
896 .transpose()
897}
898
899pub struct Server {
901 pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
903 pub http_listener_handles: BTreeMap<String, ListenerHandle>,
904 _adapter_handle: mz_adapter::Handle,
905}