1use std::collections::BTreeMap;
17use std::panic::AssertUnwindSafe;
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use std::{env, io};
23
24use anyhow::{Context, anyhow};
25use derivative::Derivative;
26use futures::FutureExt;
27use ipnet::IpNet;
28use mz_adapter::config::{
29 SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
30};
31use mz_adapter::webhook::WebhookConcurrencyLimiter;
32use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
33use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
34use mz_adapter_types::dyncfgs::{
35 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
36 WITH_0DT_DEPLOYMENT_MAX_WAIT,
37};
38use mz_auth::password::Password;
39use mz_authenticator::Authenticator;
40use mz_build_info::{BuildInfo, build_info};
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_catalog::durable::BootstrapArgs;
43use mz_cloud_resources::CloudResourceController;
44use mz_controller::ControllerConfig;
45use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_ore::future::OreFutureExt;
48use mz_ore::metrics::MetricsRegistry;
49use mz_ore::now::NowFn;
50use mz_ore::tracing::TracingHandle;
51use mz_ore::url::SensitiveUrl;
52use mz_ore::{instrument, task};
53use mz_persist_client::cache::PersistClientCache;
54use mz_persist_client::usage::StorageUsageClient;
55use mz_pgwire::MetricsConfig;
56use mz_pgwire_common::ConnectionCounter;
57use mz_repr::strconv;
58use mz_secrets::SecretsController;
59use mz_server_core::listeners::{
60 AuthenticatorKind, HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
61};
62use mz_server_core::{
63 ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
64 TlsCertConfig, TlsMode,
65};
66use mz_sql::catalog::EnvironmentId;
67use mz_sql::session::vars::{Value, VarInput};
68use tokio::sync::oneshot;
69use tower_http::cors::AllowOrigin;
70use tracing::{Instrument, info, info_span};
71
72use crate::deployment::preflight::{PreflightInput, PreflightOutput};
73use crate::deployment::state::DeploymentState;
74use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
75
76pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
77
78mod deployment;
79pub mod environmentd;
80pub mod http;
81mod telemetry;
82#[cfg(feature = "test")]
83pub mod test_util;
84
85pub const BUILD_INFO: BuildInfo = build_info!();
86
87#[derive(Derivative)]
89#[derivative(Debug)]
90pub struct Config {
91 pub unsafe_mode: bool,
95 pub all_features: bool,
98
99 pub tls: Option<TlsCertConfig>,
102 #[derivative(Debug = "ignore")]
104 pub tls_reload_certs: ReloadTrigger,
105 pub external_login_password_mz_system: Option<Password>,
107 pub frontegg: Option<FronteggAuthenticator>,
109 pub cors_allowed_origin: AllowOrigin,
112 pub egress_addresses: Vec<IpNet>,
115 pub http_host_name: Option<String>,
121 pub internal_console_redirect_url: Option<String>,
124
125 pub controller: ControllerConfig,
128 pub secrets_controller: Arc<dyn SecretsController>,
130 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
132
133 pub storage_usage_collection_interval: Duration,
136 pub storage_usage_retention_period: Option<Duration>,
138
139 pub catalog_config: CatalogConfig,
142 pub availability_zones: Vec<String>,
145 pub cluster_replica_sizes: ClusterReplicaSizeMap,
147 pub timestamp_oracle_url: Option<SensitiveUrl>,
149 pub segment_api_key: Option<String>,
151 pub segment_client_side: bool,
154 pub test_only_dummy_segment_client: bool,
156 pub launchdarkly_sdk_key: Option<String>,
159 pub launchdarkly_key_map: BTreeMap<String, String>,
162 pub config_sync_timeout: Duration,
164 pub config_sync_loop_interval: Option<Duration>,
166 pub config_sync_file_path: Option<PathBuf>,
168
169 pub environment_id: EnvironmentId,
172 pub bootstrap_role: Option<String>,
174 pub bootstrap_default_cluster_replica_size: String,
176 pub bootstrap_default_cluster_replication_factor: u32,
178 pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
180 pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
182 pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
184 pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
186 pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
188 pub system_parameter_defaults: BTreeMap<String, String>,
191 pub helm_chart_version: Option<String>,
193 pub license_key: ValidatedLicenseKey,
195
196 pub aws_account_id: Option<String>,
200 pub aws_privatelink_availability_zones: Option<Vec<String>>,
202
203 pub metrics_registry: MetricsRegistry,
206 pub tracing_handle: TracingHandle,
208
209 pub now: NowFn,
212 pub force_builtin_schema_migration: Option<String>,
215}
216
217#[derive(Debug, Clone)]
219pub struct CatalogConfig {
220 pub persist_clients: Arc<PersistClientCache>,
222 pub metrics: Arc<mz_catalog::durable::Metrics>,
224}
225
226pub struct Listener<C> {
227 pub handle: ListenerHandle,
228 connection_stream: Pin<Box<dyn ConnectionStream>>,
229 config: C,
230}
231impl<C> Listener<C>
232where
233 C: ListenerConfig,
234{
235 async fn bind(config: C) -> Result<Self, io::Error> {
247 let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
248 Ok(Self {
249 handle,
250 connection_stream,
251 config,
252 })
253 }
254}
255
256impl Listener<SqlListenerConfig> {
257 #[instrument(name = "environmentd::serve_sql")]
258 pub async fn serve_sql(
259 self,
260 name: String,
261 active_connection_counter: ConnectionCounter,
262 tls_reloading_context: Option<ReloadingSslContext>,
263 frontegg: Option<FronteggAuthenticator>,
264 adapter_client: AdapterClient,
265 metrics: MetricsConfig,
266 helm_chart_version: Option<String>,
267 ) -> ListenerHandle {
268 let label: &'static str = Box::leak(name.into_boxed_str());
269 let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
270 context,
271 mode: if self.config.enable_tls {
272 TlsMode::Require
273 } else {
274 TlsMode::Allow
275 },
276 });
277 let authenticator = match self.config.authenticator_kind {
278 AuthenticatorKind::Frontegg => Authenticator::Frontegg(
279 frontegg.expect("Frontegg args are required with AuthenticatorKind::Frontegg"),
280 ),
281 AuthenticatorKind::Password => Authenticator::Password(adapter_client.clone()),
282 AuthenticatorKind::Sasl => Authenticator::Sasl(adapter_client.clone()),
283 AuthenticatorKind::None => Authenticator::None,
284 };
285
286 task::spawn(|| format!("{}_sql_server", label), {
287 let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
288 label,
289 tls,
290 adapter_client,
291 authenticator,
292 metrics,
293 active_connection_counter,
294 helm_chart_version,
295 allowed_roles: self.config.allowed_roles,
296 });
297 mz_server_core::serve(ServeConfig {
298 conns: self.connection_stream,
299 server: sql_server,
300 dyncfg: None,
303 })
304 });
305 self.handle
306 }
307}
308
309impl Listener<HttpListenerConfig> {
310 #[instrument(name = "environmentd::serve_http")]
311 pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
312 let task_name = format!("{}_http_server", &config.source);
313 task::spawn(|| task_name, {
314 let http_server = HttpServer::new(config);
315 mz_server_core::serve(ServeConfig {
316 conns: self.connection_stream,
317 server: http_server,
318 dyncfg: None,
321 })
322 });
323 self.handle
324 }
325}
326
327pub struct Listeners {
328 pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
329 pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
330}
331
332impl Listeners {
333 pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
334 let mut sql = BTreeMap::new();
335 for (name, config) in config.sql {
336 sql.insert(name, Listener::bind(config).await?);
337 }
338
339 let mut http = BTreeMap::new();
340 for (name, config) in config.http {
341 http.insert(name, Listener::bind(config).await?);
342 }
343
344 Ok(Listeners { http, sql })
345 }
346
347 #[instrument(name = "environmentd::serve")]
351 pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
352 let serve_start = Instant::now();
353 info!("startup: envd serve: beginning");
354 info!("startup: envd serve: preamble beginning");
355
356 let tls_reloading_context = match config.tls {
358 Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
359 None => None,
360 };
361
362 let active_connection_counter = ConnectionCounter::default();
363 let (deployment_state, deployment_state_handle) = DeploymentState::new();
364
365 let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
374 let internal_route_config = Arc::new(InternalRouteConfig {
375 deployment_state_handle,
376 internal_console_redirect_url: config.internal_console_redirect_url,
377 });
378
379 let (authenticator_frontegg_tx, authenticator_frontegg_rx) = oneshot::channel();
380 let authenticator_frontegg_rx = authenticator_frontegg_rx.shared();
381 let (authenticator_password_tx, authenticator_password_rx) = oneshot::channel();
382 let authenticator_password_rx = authenticator_password_rx.shared();
383 let (authenticator_none_tx, authenticator_none_rx) = oneshot::channel();
384 let authenticator_none_rx = authenticator_none_rx.shared();
385
386 if let Some(frontegg) = &config.frontegg {
389 authenticator_frontegg_tx
390 .send(Arc::new(Authenticator::Frontegg(frontegg.clone())))
391 .expect("rx known to be live");
392 }
393 authenticator_none_tx
394 .send(Arc::new(Authenticator::None))
395 .expect("rx known to be live");
396
397 let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
398 let adapter_client_rx = adapter_client_rx.shared();
399
400 let metrics_registry = config.metrics_registry.clone();
401 let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
402 let mut http_listener_handles = BTreeMap::new();
403 for (name, listener) in self.http {
404 let authenticator_kind = listener.config.authenticator_kind();
405 let authenticator_rx = match authenticator_kind {
406 AuthenticatorKind::Frontegg => authenticator_frontegg_rx.clone(),
407 AuthenticatorKind::Password => authenticator_password_rx.clone(),
408 AuthenticatorKind::Sasl => authenticator_password_rx.clone(),
409 AuthenticatorKind::None => authenticator_none_rx.clone(),
410 };
411 let source: &'static str = Box::leak(name.clone().into_boxed_str());
412 let tls = if listener.config.enable_tls() {
413 tls_reloading_context.clone()
414 } else {
415 None
416 };
417 let http_config = HttpConfig {
418 adapter_client_rx: adapter_client_rx.clone(),
419 active_connection_counter: active_connection_counter.clone(),
420 helm_chart_version: config.helm_chart_version.clone(),
421 source,
422 tls,
423 authenticator_kind,
424 authenticator_rx,
425 allowed_origin: config.cors_allowed_origin.clone(),
426 concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
427 metrics: metrics.clone(),
428 metrics_registry: metrics_registry.clone(),
429 allowed_roles: listener.config.allowed_roles(),
430 internal_route_config: Arc::clone(&internal_route_config),
431 routes_enabled: listener.config.routes.clone(),
432 };
433 http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
434 }
435
436 info!(
437 "startup: envd serve: preamble complete in {:?}",
438 serve_start.elapsed()
439 );
440
441 let catalog_init_start = Instant::now();
442 info!("startup: envd serve: catalog init beginning");
443
444 let boot_ts = (config.now)().into();
446
447 let persist_client = config
448 .catalog_config
449 .persist_clients
450 .open(config.controller.persist_location.clone())
451 .await
452 .context("opening persist client")?;
453 let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
454 persist_client.clone(),
455 config.environment_id.organization_id(),
456 BUILD_INFO.semver_version(),
457 Some(config.controller.deploy_generation),
458 Arc::clone(&config.catalog_config.metrics),
459 )
460 .await?;
461
462 info!(
463 "startup: envd serve: catalog init complete in {:?}",
464 catalog_init_start.elapsed()
465 );
466
467 let system_param_sync_start = Instant::now();
468 info!("startup: envd serve: system parameter sync beginning");
469 let system_parameter_sync_config =
471 match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
472 (None, None) => None,
473 (None, Some(f)) => {
474 info!("Using config file path {:?}", f);
475 Some(SystemParameterSyncConfig::new(
476 config.environment_id.clone(),
477 &BUILD_INFO,
478 &config.metrics_registry,
479 config.launchdarkly_key_map,
480 SystemParameterSyncClientConfig::File { path: f },
481 ))
482 }
483 (Some(key), None) => Some(SystemParameterSyncConfig::new(
484 config.environment_id.clone(),
485 &BUILD_INFO,
486 &config.metrics_registry,
487 config.launchdarkly_key_map,
488 SystemParameterSyncClientConfig::LaunchDarkly {
489 sdk_key: key,
490 now_fn: config.now.clone(),
491 },
492 )),
493
494 (Some(_), Some(_)) => {
495 panic!("Cannot configure both file and Launchdarkly based config syncing")
496 }
497 };
498
499 let remote_system_parameters = load_remote_system_parameters(
500 &mut openable_adapter_storage,
501 system_parameter_sync_config.clone(),
502 config.config_sync_timeout,
503 )
504 .await?;
505 info!(
506 "startup: envd serve: system parameter sync complete in {:?}",
507 system_param_sync_start.elapsed()
508 );
509
510 let preflight_checks_start = Instant::now();
511 info!("startup: envd serve: preflight checks beginning");
512
513 let with_0dt_deployment_max_wait = {
515 let cli_default = config
516 .system_parameter_defaults
517 .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
518 .map(|x| {
519 Duration::parse(VarInput::Flat(x)).map_err(|err| {
520 anyhow!(
521 "failed to parse default for {}: {:?}",
522 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
523 err
524 )
525 })
526 })
527 .transpose()?;
528 let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
529 let ld = get_ld_value(
530 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
531 &remote_system_parameters,
532 |x| {
533 Duration::parse(VarInput::Flat(x)).map_err(|err| {
534 format!(
535 "failed to parse LD value {} for {}: {:?}",
536 x,
537 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
538 err
539 )
540 })
541 },
542 )?;
543 let catalog = openable_adapter_storage
544 .get_0dt_deployment_max_wait()
545 .await?;
546 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
547 info!(
548 ?computed,
549 ?ld,
550 ?catalog,
551 ?cli_default,
552 ?compiled_default,
553 "determined value for {} system parameter",
554 WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
555 );
556 computed
557 };
558 let with_0dt_deployment_ddl_check_interval = {
560 let cli_default = config
561 .system_parameter_defaults
562 .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
563 .map(|x| {
564 Duration::parse(VarInput::Flat(x)).map_err(|err| {
565 anyhow!(
566 "failed to parse default for {}: {:?}",
567 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
568 err
569 )
570 })
571 })
572 .transpose()?;
573 let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
574 let ld = get_ld_value(
575 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
576 &remote_system_parameters,
577 |x| {
578 Duration::parse(VarInput::Flat(x)).map_err(|err| {
579 format!(
580 "failed to parse LD value {} for {}: {:?}",
581 x,
582 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
583 err
584 )
585 })
586 },
587 )?;
588 let catalog = openable_adapter_storage
589 .get_0dt_deployment_ddl_check_interval()
590 .await?;
591 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
592 info!(
593 ?computed,
594 ?ld,
595 ?catalog,
596 ?cli_default,
597 ?compiled_default,
598 "determined value for {} system parameter",
599 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
600 );
601 computed
602 };
603
604 let enable_0dt_deployment_panic_after_timeout = {
607 let cli_default = config
608 .system_parameter_defaults
609 .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
610 .map(|x| {
611 strconv::parse_bool(x).map_err(|err| {
612 anyhow!(
613 "failed to parse default for {}: {}",
614 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
615 err
616 )
617 })
618 })
619 .transpose()?;
620 let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
621 let ld = get_ld_value(
622 "enable_0dt_deployment_panic_after_timeout",
623 &remote_system_parameters,
624 |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
625 )?;
626 let catalog = openable_adapter_storage
627 .get_enable_0dt_deployment_panic_after_timeout()
628 .await?;
629 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
630 info!(
631 %computed,
632 ?ld,
633 ?catalog,
634 ?cli_default,
635 ?compiled_default,
636 "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
637 );
638 computed
639 };
640
641 let bootstrap_args = BootstrapArgs {
645 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
646 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
647 bootstrap_role: config.bootstrap_role.clone(),
648 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
649 };
650 let preflight_config = PreflightInput {
651 boot_ts,
652 environment_id: config.environment_id.clone(),
653 persist_client,
654 deploy_generation: config.controller.deploy_generation,
655 deployment_state: deployment_state.clone(),
656 openable_adapter_storage,
657 catalog_metrics: Arc::clone(&config.catalog_config.metrics),
658 caught_up_max_wait: with_0dt_deployment_max_wait,
659 panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
660 bootstrap_args,
661 ddl_check_interval: with_0dt_deployment_ddl_check_interval,
662 };
663 let PreflightOutput {
664 openable_adapter_storage,
665 read_only,
666 caught_up_trigger,
667 } = deployment::preflight::preflight_0dt(preflight_config).await?;
668
669 info!(
670 "startup: envd serve: preflight checks complete in {:?}",
671 preflight_checks_start.elapsed()
672 );
673
674 let catalog_open_start = Instant::now();
675 info!("startup: envd serve: durable catalog open beginning");
676
677 let bootstrap_args = BootstrapArgs {
678 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
679 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
680 bootstrap_role: config.bootstrap_role,
681 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
682 };
683
684 let (adapter_storage, audit_logs_iterator) = if read_only {
686 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
689 .open_savepoint(boot_ts, &bootstrap_args)
690 .await?;
691 (adapter_storage, audit_logs_iterator)
696 } else {
697 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
698 .open(boot_ts, &bootstrap_args)
699 .await?;
700
701 deployment_state.set_is_leader();
705
706 (adapter_storage, audit_logs_iterator)
707 };
708
709 if !read_only {
711 config.controller.persist_clients.cfg().enable_compaction();
712 }
713
714 info!(
715 "startup: envd serve: durable catalog open complete in {:?}",
716 catalog_open_start.elapsed()
717 );
718
719 let coord_init_start = Instant::now();
720 info!("startup: envd serve: coordinator init beginning");
721
722 if !config
723 .cluster_replica_sizes
724 .0
725 .contains_key(&config.bootstrap_default_cluster_replica_size)
726 {
727 return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
728 }
729 let envd_epoch = adapter_storage.epoch();
730
731 let storage_usage_client = StorageUsageClient::open(
733 config
734 .controller
735 .persist_clients
736 .open(config.controller.persist_location.clone())
737 .await
738 .context("opening storage usage client")?,
739 );
740
741 let segment_client = config.segment_api_key.map(|api_key| {
743 mz_segment::Client::new(mz_segment::Config {
744 api_key,
745 client_side: config.segment_client_side,
746 })
747 });
748 let connection_limiter = active_connection_counter.clone();
749 let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
750 connection_limiter.update_limit(limit);
751 connection_limiter.update_superuser_reserved(superuser_reserved);
752 });
753
754 let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
755 connection_context: config.controller.connection_context.clone(),
756 connection_limit_callback,
757 controller_config: config.controller,
758 controller_envd_epoch: envd_epoch,
759 storage: adapter_storage,
760 audit_logs_iterator,
761 timestamp_oracle_url: config.timestamp_oracle_url,
762 unsafe_mode: config.unsafe_mode,
763 all_features: config.all_features,
764 build_info: &BUILD_INFO,
765 environment_id: config.environment_id.clone(),
766 metrics_registry: config.metrics_registry.clone(),
767 now: config.now,
768 secrets_controller: config.secrets_controller,
769 cloud_resource_controller: config.cloud_resource_controller,
770 cluster_replica_sizes: config.cluster_replica_sizes,
771 builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
772 builtin_catalog_server_cluster_config: config
773 .bootstrap_builtin_catalog_server_cluster_config,
774 builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
775 builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
776 builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
777 availability_zones: config.availability_zones,
778 system_parameter_defaults: config.system_parameter_defaults,
779 storage_usage_client,
780 storage_usage_collection_interval: config.storage_usage_collection_interval,
781 storage_usage_retention_period: config.storage_usage_retention_period,
782 segment_client: segment_client.clone(),
783 egress_addresses: config.egress_addresses,
784 remote_system_parameters,
785 aws_account_id: config.aws_account_id,
786 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
787 webhook_concurrency_limit: webhook_concurrency_limit.clone(),
788 http_host_name: config.http_host_name,
789 tracing_handle: config.tracing_handle,
790 read_only_controllers: read_only,
791 caught_up_trigger,
792 helm_chart_version: config.helm_chart_version.clone(),
793 license_key: config.license_key,
794 external_login_password_mz_system: config.external_login_password_mz_system,
795 force_builtin_schema_migration: config.force_builtin_schema_migration,
796 })
797 .instrument(info_span!("adapter::serve"))
798 .await?;
799
800 info!(
801 "startup: envd serve: coordinator init complete in {:?}",
802 coord_init_start.elapsed()
803 );
804
805 let serve_postamble_start = Instant::now();
806 info!("startup: envd serve: postamble beginning");
807
808 authenticator_password_tx
810 .send(Arc::new(Authenticator::Password(adapter_client.clone())))
811 .expect("rx known to be live");
812 adapter_client_tx
813 .send(adapter_client.clone())
814 .expect("internal HTTP server should not drop first");
815
816 let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
817
818 let mut sql_listener_handles = BTreeMap::new();
820 for (name, listener) in self.sql {
821 sql_listener_handles.insert(
822 name.clone(),
823 listener
824 .serve_sql(
825 name,
826 active_connection_counter.clone(),
827 tls_reloading_context.clone(),
828 config.frontegg.clone(),
829 adapter_client.clone(),
830 metrics.clone(),
831 config.helm_chart_version.clone(),
832 )
833 .await,
834 );
835 }
836
837 if let Some(segment_client) = segment_client {
839 telemetry::start_reporting(telemetry::Config {
840 segment_client,
841 adapter_client: adapter_client.clone(),
842 environment_id: config.environment_id,
843 report_interval: Duration::from_secs(3600),
844 });
845 } else if config.test_only_dummy_segment_client {
846 tracing::debug!("starting telemetry reporting with a dummy segment client");
852 let segment_client = mz_segment::Client::new_dummy_client();
853 telemetry::start_reporting(telemetry::Config {
854 segment_client,
855 adapter_client: adapter_client.clone(),
856 environment_id: config.environment_id,
857 report_interval: Duration::from_secs(180),
858 });
859 }
860
861 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
864 task::spawn(
865 || "system_parameter_sync",
866 AssertUnwindSafe(system_parameter_sync(
867 system_parameter_sync_config,
868 adapter_client.clone(),
869 config.config_sync_loop_interval,
870 ))
871 .ore_catch_unwind(),
872 );
873 }
874
875 info!(
876 "startup: envd serve: postamble complete in {:?}",
877 serve_postamble_start.elapsed()
878 );
879 info!(
880 "startup: envd serve: complete in {:?}",
881 serve_start.elapsed()
882 );
883
884 Ok(Server {
885 sql_listener_handles,
886 http_listener_handles,
887 _adapter_handle: adapter_handle,
888 })
889 }
890}
891
892fn get_ld_value<V>(
893 name: &str,
894 remote_system_parameters: &Option<BTreeMap<String, String>>,
895 parse: impl Fn(&str) -> Result<V, String>,
896) -> Result<Option<V>, anyhow::Error> {
897 remote_system_parameters
898 .as_ref()
899 .and_then(|params| params.get(name))
900 .map(|x| {
901 parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
902 })
903 .transpose()
904}
905
906pub struct Server {
908 pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
910 pub http_listener_handles: BTreeMap<String, ListenerHandle>,
911 _adapter_handle: mz_adapter::Handle,
912}