1use std::collections::BTreeMap;
17use std::panic::AssertUnwindSafe;
18use std::path::PathBuf;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use std::{env, io};
23
24use anyhow::{Context, anyhow};
25use derivative::Derivative;
26use futures::FutureExt;
27use ipnet::IpNet;
28use mz_adapter::config::{
29 SystemParameterSyncClientConfig, SystemParameterSyncConfig, system_parameter_sync,
30};
31use mz_adapter::webhook::WebhookConcurrencyLimiter;
32use mz_adapter::{AdapterError, Client as AdapterClient, load_remote_system_parameters};
33use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
34use mz_adapter_types::dyncfgs::{
35 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL,
36 WITH_0DT_DEPLOYMENT_MAX_WAIT,
37};
38use mz_auth::password::Password;
39use mz_authenticator::Authenticator;
40use mz_build_info::{BuildInfo, build_info};
41use mz_catalog::config::ClusterReplicaSizeMap;
42use mz_catalog::durable::BootstrapArgs;
43use mz_cloud_resources::CloudResourceController;
44use mz_controller::ControllerConfig;
45use mz_frontegg_auth::Authenticator as FronteggAuthenticator;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_ore::future::OreFutureExt;
48use mz_ore::metrics::MetricsRegistry;
49use mz_ore::now::NowFn;
50use mz_ore::tracing::TracingHandle;
51use mz_ore::url::SensitiveUrl;
52use mz_ore::{instrument, task};
53use mz_persist_client::cache::PersistClientCache;
54use mz_persist_client::usage::StorageUsageClient;
55use mz_pgwire::MetricsConfig;
56use mz_pgwire_common::ConnectionCounter;
57use mz_repr::strconv;
58use mz_secrets::SecretsController;
59use mz_server_core::listeners::{
60 AuthenticatorKind, HttpListenerConfig, ListenerConfig, ListenersConfig, SqlListenerConfig,
61};
62use mz_server_core::{
63 ConnectionStream, ListenerHandle, ReloadTrigger, ReloadingSslContext, ServeConfig,
64 TlsCertConfig, TlsMode,
65};
66use mz_sql::catalog::EnvironmentId;
67use mz_sql::session::vars::{Value, VarInput};
68use tokio::sync::oneshot;
69use tower_http::cors::AllowOrigin;
70use tracing::{Instrument, info, info_span};
71
72use crate::deployment::preflight::{PreflightInput, PreflightOutput};
73use crate::deployment::state::DeploymentState;
74use crate::http::{HttpConfig, HttpServer, InternalRouteConfig};
75
76pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
77
78mod deployment;
79pub mod environmentd;
80pub mod http;
81mod telemetry;
82#[cfg(feature = "test")]
83pub mod test_util;
84
85pub const BUILD_INFO: BuildInfo = build_info!();
86
87#[derive(Derivative)]
89#[derivative(Debug)]
90pub struct Config {
91 pub unsafe_mode: bool,
95 pub all_features: bool,
98
99 pub tls: Option<TlsCertConfig>,
102 #[derivative(Debug = "ignore")]
104 pub tls_reload_certs: ReloadTrigger,
105 pub external_login_password_mz_system: Option<Password>,
107 pub frontegg: Option<FronteggAuthenticator>,
109 pub cors_allowed_origin: AllowOrigin,
112 pub egress_addresses: Vec<IpNet>,
115 pub http_host_name: Option<String>,
121 pub internal_console_redirect_url: Option<String>,
124
125 pub controller: ControllerConfig,
128 pub secrets_controller: Arc<dyn SecretsController>,
130 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
132
133 pub storage_usage_collection_interval: Duration,
136 pub storage_usage_retention_period: Option<Duration>,
138
139 pub catalog_config: CatalogConfig,
142 pub availability_zones: Vec<String>,
145 pub cluster_replica_sizes: ClusterReplicaSizeMap,
147 pub timestamp_oracle_url: Option<SensitiveUrl>,
149 pub segment_api_key: Option<String>,
151 pub segment_client_side: bool,
154 pub test_only_dummy_segment_client: bool,
156 pub launchdarkly_sdk_key: Option<String>,
159 pub launchdarkly_key_map: BTreeMap<String, String>,
162 pub config_sync_timeout: Duration,
164 pub config_sync_loop_interval: Option<Duration>,
166 pub config_sync_file_path: Option<PathBuf>,
168
169 pub environment_id: EnvironmentId,
172 pub bootstrap_role: Option<String>,
174 pub bootstrap_default_cluster_replica_size: String,
176 pub bootstrap_default_cluster_replication_factor: u32,
178 pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
180 pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
182 pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
184 pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
186 pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
188 pub system_parameter_defaults: BTreeMap<String, String>,
191 pub helm_chart_version: Option<String>,
193 pub license_key: ValidatedLicenseKey,
195
196 pub aws_account_id: Option<String>,
200 pub aws_privatelink_availability_zones: Option<Vec<String>>,
202
203 pub metrics_registry: MetricsRegistry,
206 pub tracing_handle: TracingHandle,
208
209 pub now: NowFn,
212 pub force_builtin_schema_migration: Option<String>,
215}
216
217#[derive(Debug, Clone)]
219pub struct CatalogConfig {
220 pub persist_clients: Arc<PersistClientCache>,
222 pub metrics: Arc<mz_catalog::durable::Metrics>,
224}
225
226pub struct Listener<C> {
227 pub handle: ListenerHandle,
228 connection_stream: Pin<Box<dyn ConnectionStream>>,
229 config: C,
230}
231impl<C> Listener<C>
232where
233 C: ListenerConfig,
234{
235 async fn bind(config: C) -> Result<Self, io::Error> {
247 let (handle, connection_stream) = mz_server_core::listen(&config.addr()).await?;
248 Ok(Self {
249 handle,
250 connection_stream,
251 config,
252 })
253 }
254}
255
256impl Listener<SqlListenerConfig> {
257 #[instrument(name = "environmentd::serve_sql")]
258 pub async fn serve_sql(
259 self,
260 name: String,
261 active_connection_counter: ConnectionCounter,
262 tls_reloading_context: Option<ReloadingSslContext>,
263 frontegg: Option<FronteggAuthenticator>,
264 adapter_client: AdapterClient,
265 metrics: MetricsConfig,
266 helm_chart_version: Option<String>,
267 ) -> ListenerHandle {
268 let label: &'static str = Box::leak(name.into_boxed_str());
269 let tls = tls_reloading_context.map(|context| mz_server_core::ReloadingTlsConfig {
270 context,
271 mode: if self.config.enable_tls {
272 TlsMode::Require
273 } else {
274 TlsMode::Allow
275 },
276 });
277 let authenticator = match self.config.authenticator_kind {
278 AuthenticatorKind::Frontegg => Authenticator::Frontegg(
279 frontegg.expect("Frontegg args are required with AuthenticatorKind::Frontegg"),
280 ),
281 AuthenticatorKind::Password => Authenticator::Password(adapter_client.clone()),
282 AuthenticatorKind::Sasl => Authenticator::Sasl(adapter_client.clone()),
283 AuthenticatorKind::None => Authenticator::None,
284 };
285
286 task::spawn(|| format!("{}_sql_server", label), {
287 let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
288 label,
289 tls,
290 adapter_client,
291 authenticator,
292 metrics,
293 active_connection_counter,
294 helm_chart_version,
295 allowed_roles: self.config.allowed_roles,
296 });
297 mz_server_core::serve(ServeConfig {
298 conns: self.connection_stream,
299 server: sql_server,
300 dyncfg: None,
303 })
304 });
305 self.handle
306 }
307}
308
309impl Listener<HttpListenerConfig> {
310 #[instrument(name = "environmentd::serve_http")]
311 pub async fn serve_http(self, config: HttpConfig) -> ListenerHandle {
312 let task_name = format!("{}_http_server", &config.source);
313 task::spawn(|| task_name, {
314 let http_server = HttpServer::new(config);
315 mz_server_core::serve(ServeConfig {
316 conns: self.connection_stream,
317 server: http_server,
318 dyncfg: None,
321 })
322 });
323 self.handle
324 }
325}
326
327pub struct Listeners {
328 pub http: BTreeMap<String, Listener<HttpListenerConfig>>,
329 pub sql: BTreeMap<String, Listener<SqlListenerConfig>>,
330}
331
332impl Listeners {
333 pub async fn bind(config: ListenersConfig) -> Result<Self, io::Error> {
334 let mut sql = BTreeMap::new();
335 for (name, config) in config.sql {
336 sql.insert(name, Listener::bind(config).await?);
337 }
338
339 let mut http = BTreeMap::new();
340 for (name, config) in config.http {
341 http.insert(name, Listener::bind(config).await?);
342 }
343
344 Ok(Listeners { http, sql })
345 }
346
347 #[instrument(name = "environmentd::serve")]
351 pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
352 let serve_start = Instant::now();
353 info!("startup: envd serve: beginning");
354 info!("startup: envd serve: preamble beginning");
355
356 let tls_reloading_context = match config.tls {
358 Some(tls_config) => Some(tls_config.reloading_context(config.tls_reload_certs)?),
359 None => None,
360 };
361
362 let active_connection_counter = ConnectionCounter::default();
363 let (deployment_state, deployment_state_handle) = DeploymentState::new();
364
365 let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
374 let internal_route_config = Arc::new(InternalRouteConfig {
375 deployment_state_handle,
376 internal_console_redirect_url: config.internal_console_redirect_url,
377 });
378
379 let (authenticator_frontegg_tx, authenticator_frontegg_rx) = oneshot::channel();
380 let authenticator_frontegg_rx = authenticator_frontegg_rx.shared();
381 let (authenticator_password_tx, authenticator_password_rx) = oneshot::channel();
382 let authenticator_password_rx = authenticator_password_rx.shared();
383 let (authenticator_none_tx, authenticator_none_rx) = oneshot::channel();
384 let authenticator_none_rx = authenticator_none_rx.shared();
385
386 if let Some(frontegg) = &config.frontegg {
389 authenticator_frontegg_tx
390 .send(Arc::new(Authenticator::Frontegg(frontegg.clone())))
391 .expect("rx known to be live");
392 }
393 authenticator_none_tx
394 .send(Arc::new(Authenticator::None))
395 .expect("rx known to be live");
396
397 let (adapter_client_tx, adapter_client_rx) = oneshot::channel();
398 let adapter_client_rx = adapter_client_rx.shared();
399
400 let metrics_registry = config.metrics_registry.clone();
401 let metrics = http::Metrics::register_into(&metrics_registry, "mz_http");
402 let mut http_listener_handles = BTreeMap::new();
403 for (name, listener) in self.http {
404 let authenticator_kind = listener.config.authenticator_kind();
405 let authenticator_rx = match authenticator_kind {
406 AuthenticatorKind::Frontegg => authenticator_frontegg_rx.clone(),
407 AuthenticatorKind::Password => authenticator_password_rx.clone(),
408 AuthenticatorKind::Sasl => authenticator_password_rx.clone(),
409 AuthenticatorKind::None => authenticator_none_rx.clone(),
410 };
411 let source: &'static str = Box::leak(name.clone().into_boxed_str());
412 let tls = if listener.config.enable_tls() {
413 tls_reloading_context.clone()
414 } else {
415 None
416 };
417 let http_config = HttpConfig {
418 adapter_client_rx: adapter_client_rx.clone(),
419 active_connection_counter: active_connection_counter.clone(),
420 helm_chart_version: config.helm_chart_version.clone(),
421 source,
422 tls,
423 authenticator_kind,
424 authenticator_rx,
425 allowed_origin: config.cors_allowed_origin.clone(),
426 concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
427 metrics: metrics.clone(),
428 metrics_registry: metrics_registry.clone(),
429 allowed_roles: listener.config.allowed_roles(),
430 internal_route_config: Arc::clone(&internal_route_config),
431 routes_enabled: listener.config.routes.clone(),
432 };
433 http_listener_handles.insert(name.clone(), listener.serve_http(http_config).await);
434 }
435
436 info!(
437 "startup: envd serve: preamble complete in {:?}",
438 serve_start.elapsed()
439 );
440
441 let catalog_init_start = Instant::now();
442 info!("startup: envd serve: catalog init beginning");
443
444 let boot_ts = (config.now)().into();
446
447 let persist_client = config
448 .catalog_config
449 .persist_clients
450 .open(config.controller.persist_location.clone())
451 .await
452 .context("opening persist client")?;
453 let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
454 persist_client.clone(),
455 config.environment_id.organization_id(),
456 BUILD_INFO.semver_version(),
457 Some(config.controller.deploy_generation),
458 Arc::clone(&config.catalog_config.metrics),
459 )
460 .await?;
461
462 info!(
463 "startup: envd serve: catalog init complete in {:?}",
464 catalog_init_start.elapsed()
465 );
466
467 let system_param_sync_start = Instant::now();
468 info!("startup: envd serve: system parameter sync beginning");
469 let system_parameter_sync_config =
471 match (config.launchdarkly_sdk_key, config.config_sync_file_path) {
472 (None, None) => None,
473 (None, Some(f)) => Some(SystemParameterSyncConfig::new(
474 config.environment_id.clone(),
475 &BUILD_INFO,
476 &config.metrics_registry,
477 config.launchdarkly_key_map,
478 SystemParameterSyncClientConfig::File { path: f },
479 )),
480 (Some(key), None) => Some(SystemParameterSyncConfig::new(
481 config.environment_id.clone(),
482 &BUILD_INFO,
483 &config.metrics_registry,
484 config.launchdarkly_key_map,
485 SystemParameterSyncClientConfig::LaunchDarkly {
486 sdk_key: key,
487 now_fn: config.now.clone(),
488 },
489 )),
490
491 (Some(_), Some(_)) => {
492 panic!("Cannot configure both file and Launchdarkly based config syncing")
493 }
494 };
495
496 let remote_system_parameters = load_remote_system_parameters(
497 &mut openable_adapter_storage,
498 system_parameter_sync_config.clone(),
499 config.config_sync_timeout,
500 )
501 .await?;
502 info!(
503 "startup: envd serve: system parameter sync complete in {:?}",
504 system_param_sync_start.elapsed()
505 );
506
507 let preflight_checks_start = Instant::now();
508 info!("startup: envd serve: preflight checks beginning");
509
510 let with_0dt_deployment_max_wait = {
512 let cli_default = config
513 .system_parameter_defaults
514 .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
515 .map(|x| {
516 Duration::parse(VarInput::Flat(x)).map_err(|err| {
517 anyhow!(
518 "failed to parse default for {}: {:?}",
519 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
520 err
521 )
522 })
523 })
524 .transpose()?;
525 let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
526 let ld = get_ld_value(
527 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
528 &remote_system_parameters,
529 |x| {
530 Duration::parse(VarInput::Flat(x)).map_err(|err| {
531 format!(
532 "failed to parse LD value {} for {}: {:?}",
533 x,
534 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
535 err
536 )
537 })
538 },
539 )?;
540 let catalog = openable_adapter_storage
541 .get_0dt_deployment_max_wait()
542 .await?;
543 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
544 info!(
545 ?computed,
546 ?ld,
547 ?catalog,
548 ?cli_default,
549 ?compiled_default,
550 "determined value for {} system parameter",
551 WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
552 );
553 computed
554 };
555 let with_0dt_deployment_ddl_check_interval = {
557 let cli_default = config
558 .system_parameter_defaults
559 .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
560 .map(|x| {
561 Duration::parse(VarInput::Flat(x)).map_err(|err| {
562 anyhow!(
563 "failed to parse default for {}: {:?}",
564 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
565 err
566 )
567 })
568 })
569 .transpose()?;
570 let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
571 let ld = get_ld_value(
572 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
573 &remote_system_parameters,
574 |x| {
575 Duration::parse(VarInput::Flat(x)).map_err(|err| {
576 format!(
577 "failed to parse LD value {} for {}: {:?}",
578 x,
579 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
580 err
581 )
582 })
583 },
584 )?;
585 let catalog = openable_adapter_storage
586 .get_0dt_deployment_ddl_check_interval()
587 .await?;
588 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
589 info!(
590 ?computed,
591 ?ld,
592 ?catalog,
593 ?cli_default,
594 ?compiled_default,
595 "determined value for {} system parameter",
596 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
597 );
598 computed
599 };
600
601 let enable_0dt_deployment_panic_after_timeout = {
604 let cli_default = config
605 .system_parameter_defaults
606 .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
607 .map(|x| {
608 strconv::parse_bool(x).map_err(|err| {
609 anyhow!(
610 "failed to parse default for {}: {}",
611 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
612 err
613 )
614 })
615 })
616 .transpose()?;
617 let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
618 let ld = get_ld_value(
619 "enable_0dt_deployment_panic_after_timeout",
620 &remote_system_parameters,
621 |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
622 )?;
623 let catalog = openable_adapter_storage
624 .get_enable_0dt_deployment_panic_after_timeout()
625 .await?;
626 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
627 info!(
628 %computed,
629 ?ld,
630 ?catalog,
631 ?cli_default,
632 ?compiled_default,
633 "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
634 );
635 computed
636 };
637
638 let bootstrap_args = BootstrapArgs {
642 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
643 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
644 bootstrap_role: config.bootstrap_role.clone(),
645 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
646 };
647 let preflight_config = PreflightInput {
648 boot_ts,
649 environment_id: config.environment_id.clone(),
650 persist_client,
651 deploy_generation: config.controller.deploy_generation,
652 deployment_state: deployment_state.clone(),
653 openable_adapter_storage,
654 catalog_metrics: Arc::clone(&config.catalog_config.metrics),
655 caught_up_max_wait: with_0dt_deployment_max_wait,
656 panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
657 bootstrap_args,
658 ddl_check_interval: with_0dt_deployment_ddl_check_interval,
659 };
660 let PreflightOutput {
661 openable_adapter_storage,
662 read_only,
663 caught_up_trigger,
664 } = deployment::preflight::preflight_0dt(preflight_config).await?;
665
666 info!(
667 "startup: envd serve: preflight checks complete in {:?}",
668 preflight_checks_start.elapsed()
669 );
670
671 let catalog_open_start = Instant::now();
672 info!("startup: envd serve: durable catalog open beginning");
673
674 let bootstrap_args = BootstrapArgs {
675 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
676 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
677 bootstrap_role: config.bootstrap_role,
678 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
679 };
680
681 let (adapter_storage, audit_logs_iterator) = if read_only {
683 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
686 .open_savepoint(boot_ts, &bootstrap_args)
687 .await?;
688 (adapter_storage, audit_logs_iterator)
693 } else {
694 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
695 .open(boot_ts, &bootstrap_args)
696 .await?;
697
698 deployment_state.set_is_leader();
702
703 (adapter_storage, audit_logs_iterator)
704 };
705
706 if !read_only {
708 config.controller.persist_clients.cfg().enable_compaction();
709 }
710
711 info!(
712 "startup: envd serve: durable catalog open complete in {:?}",
713 catalog_open_start.elapsed()
714 );
715
716 let coord_init_start = Instant::now();
717 info!("startup: envd serve: coordinator init beginning");
718
719 if !config
720 .cluster_replica_sizes
721 .0
722 .contains_key(&config.bootstrap_default_cluster_replica_size)
723 {
724 return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
725 }
726 let envd_epoch = adapter_storage.epoch();
727
728 let storage_usage_client = StorageUsageClient::open(
730 config
731 .controller
732 .persist_clients
733 .open(config.controller.persist_location.clone())
734 .await
735 .context("opening storage usage client")?,
736 );
737
738 let segment_client = config.segment_api_key.map(|api_key| {
740 mz_segment::Client::new(mz_segment::Config {
741 api_key,
742 client_side: config.segment_client_side,
743 })
744 });
745 let connection_limiter = active_connection_counter.clone();
746 let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
747 connection_limiter.update_limit(limit);
748 connection_limiter.update_superuser_reserved(superuser_reserved);
749 });
750
751 let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
752 connection_context: config.controller.connection_context.clone(),
753 connection_limit_callback,
754 controller_config: config.controller,
755 controller_envd_epoch: envd_epoch,
756 storage: adapter_storage,
757 audit_logs_iterator,
758 timestamp_oracle_url: config.timestamp_oracle_url,
759 unsafe_mode: config.unsafe_mode,
760 all_features: config.all_features,
761 build_info: &BUILD_INFO,
762 environment_id: config.environment_id.clone(),
763 metrics_registry: config.metrics_registry.clone(),
764 now: config.now,
765 secrets_controller: config.secrets_controller,
766 cloud_resource_controller: config.cloud_resource_controller,
767 cluster_replica_sizes: config.cluster_replica_sizes,
768 builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
769 builtin_catalog_server_cluster_config: config
770 .bootstrap_builtin_catalog_server_cluster_config,
771 builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
772 builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
773 builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
774 availability_zones: config.availability_zones,
775 system_parameter_defaults: config.system_parameter_defaults,
776 storage_usage_client,
777 storage_usage_collection_interval: config.storage_usage_collection_interval,
778 storage_usage_retention_period: config.storage_usage_retention_period,
779 segment_client: segment_client.clone(),
780 egress_addresses: config.egress_addresses,
781 remote_system_parameters,
782 aws_account_id: config.aws_account_id,
783 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
784 webhook_concurrency_limit: webhook_concurrency_limit.clone(),
785 http_host_name: config.http_host_name,
786 tracing_handle: config.tracing_handle,
787 read_only_controllers: read_only,
788 caught_up_trigger,
789 helm_chart_version: config.helm_chart_version.clone(),
790 license_key: config.license_key,
791 external_login_password_mz_system: config.external_login_password_mz_system,
792 force_builtin_schema_migration: config.force_builtin_schema_migration,
793 })
794 .instrument(info_span!("adapter::serve"))
795 .await?;
796
797 info!(
798 "startup: envd serve: coordinator init complete in {:?}",
799 coord_init_start.elapsed()
800 );
801
802 let serve_postamble_start = Instant::now();
803 info!("startup: envd serve: postamble beginning");
804
805 authenticator_password_tx
807 .send(Arc::new(Authenticator::Password(adapter_client.clone())))
808 .expect("rx known to be live");
809 adapter_client_tx
810 .send(adapter_client.clone())
811 .expect("internal HTTP server should not drop first");
812
813 let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
814
815 let mut sql_listener_handles = BTreeMap::new();
817 for (name, listener) in self.sql {
818 sql_listener_handles.insert(
819 name.clone(),
820 listener
821 .serve_sql(
822 name,
823 active_connection_counter.clone(),
824 tls_reloading_context.clone(),
825 config.frontegg.clone(),
826 adapter_client.clone(),
827 metrics.clone(),
828 config.helm_chart_version.clone(),
829 )
830 .await,
831 );
832 }
833
834 if let Some(segment_client) = segment_client {
836 telemetry::start_reporting(telemetry::Config {
837 segment_client,
838 adapter_client: adapter_client.clone(),
839 environment_id: config.environment_id,
840 report_interval: Duration::from_secs(3600),
841 });
842 } else if config.test_only_dummy_segment_client {
843 tracing::debug!("starting telemetry reporting with a dummy segment client");
849 let segment_client = mz_segment::Client::new_dummy_client();
850 telemetry::start_reporting(telemetry::Config {
851 segment_client,
852 adapter_client: adapter_client.clone(),
853 environment_id: config.environment_id,
854 report_interval: Duration::from_secs(180),
855 });
856 }
857
858 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
861 task::spawn(
862 || "system_parameter_sync",
863 AssertUnwindSafe(system_parameter_sync(
864 system_parameter_sync_config,
865 adapter_client.clone(),
866 config.config_sync_loop_interval,
867 ))
868 .ore_catch_unwind(),
869 );
870 }
871
872 info!(
873 "startup: envd serve: postamble complete in {:?}",
874 serve_postamble_start.elapsed()
875 );
876 info!(
877 "startup: envd serve: complete in {:?}",
878 serve_start.elapsed()
879 );
880
881 Ok(Server {
882 sql_listener_handles,
883 http_listener_handles,
884 _adapter_handle: adapter_handle,
885 })
886 }
887}
888
889fn get_ld_value<V>(
890 name: &str,
891 remote_system_parameters: &Option<BTreeMap<String, String>>,
892 parse: impl Fn(&str) -> Result<V, String>,
893) -> Result<Option<V>, anyhow::Error> {
894 remote_system_parameters
895 .as_ref()
896 .and_then(|params| params.get(name))
897 .map(|x| {
898 parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
899 })
900 .transpose()
901}
902
903pub struct Server {
905 pub sql_listener_handles: BTreeMap<String, ListenerHandle>,
907 pub http_listener_handles: BTreeMap<String, ListenerHandle>,
908 _adapter_handle: mz_adapter::Handle,
909}