1use std::collections::BTreeMap;
17use std::net::{IpAddr, Ipv4Addr, SocketAddr};
18use std::panic::AssertUnwindSafe;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use std::{env, io};
23
24use anyhow::{Context, anyhow};
25use derivative::Derivative;
26use ipnet::IpNet;
27use mz_adapter::config::{SystemParameterSyncConfig, system_parameter_sync};
28use mz_adapter::webhook::WebhookConcurrencyLimiter;
29use mz_adapter::{AdapterError, load_remote_system_parameters};
30use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
31use mz_adapter_types::dyncfgs::{
32 ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT,
33 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
34};
35use mz_build_info::{BuildInfo, build_info};
36use mz_catalog::config::ClusterReplicaSizeMap;
37use mz_catalog::durable::BootstrapArgs;
38use mz_cloud_resources::CloudResourceController;
39use mz_controller::ControllerConfig;
40use mz_frontegg_auth::Authenticator as FronteggAuthentication;
41use mz_license_keys::ValidatedLicenseKey;
42use mz_ore::future::OreFutureExt;
43use mz_ore::metrics::MetricsRegistry;
44use mz_ore::now::NowFn;
45use mz_ore::tracing::TracingHandle;
46use mz_ore::url::SensitiveUrl;
47use mz_ore::{instrument, task};
48use mz_persist_client::cache::PersistClientCache;
49use mz_persist_client::usage::StorageUsageClient;
50use mz_pgwire_common::ConnectionCounter;
51use mz_repr::strconv;
52use mz_secrets::SecretsController;
53use mz_server_core::{ConnectionStream, ListenerHandle, ReloadTrigger, ServeConfig, TlsCertConfig};
54use mz_sql::catalog::EnvironmentId;
55use mz_sql::session::vars::{Value, VarInput};
56use tokio::sync::oneshot;
57use tower_http::cors::AllowOrigin;
58use tracing::{Instrument, info, info_span};
59
60use crate::deployment::preflight::{PreflightInput, PreflightOutput};
61use crate::deployment::state::DeploymentState;
62use crate::http::{HttpConfig, HttpServer, InternalHttpConfig, InternalHttpServer};
63
64pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
65
66mod deployment;
67pub mod environmentd;
68pub mod http;
69mod telemetry;
70#[cfg(feature = "test")]
71pub mod test_util;
72
73pub const BUILD_INFO: BuildInfo = build_info!();
74
75#[derive(Derivative)]
77#[derivative(Debug)]
78pub struct Config {
79 pub unsafe_mode: bool,
83 pub all_features: bool,
86
87 pub tls: Option<TlsCertConfig>,
90 #[derivative(Debug = "ignore")]
92 pub tls_reload_certs: ReloadTrigger,
93 pub frontegg: Option<FronteggAuthentication>,
95 pub cors_allowed_origin: AllowOrigin,
98 pub egress_addresses: Vec<IpNet>,
101 pub http_host_name: Option<String>,
107 pub internal_console_redirect_url: Option<String>,
110 pub self_hosted_auth: bool,
112 pub self_hosted_auth_internal: bool,
114
115 pub controller: ControllerConfig,
118 pub secrets_controller: Arc<dyn SecretsController>,
120 pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
122
123 pub storage_usage_collection_interval: Duration,
126 pub storage_usage_retention_period: Option<Duration>,
128
129 pub catalog_config: CatalogConfig,
132 pub availability_zones: Vec<String>,
135 pub cluster_replica_sizes: ClusterReplicaSizeMap,
137 pub timestamp_oracle_url: Option<SensitiveUrl>,
139 pub segment_api_key: Option<String>,
141 pub segment_client_side: bool,
144 pub launchdarkly_sdk_key: Option<String>,
147 pub launchdarkly_key_map: BTreeMap<String, String>,
150 pub config_sync_timeout: Duration,
152 pub config_sync_loop_interval: Option<Duration>,
154
155 pub environment_id: EnvironmentId,
158 pub bootstrap_role: Option<String>,
160 pub bootstrap_default_cluster_replica_size: String,
162 pub bootstrap_default_cluster_replication_factor: u32,
164 pub bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
166 pub bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
168 pub bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
170 pub bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
172 pub bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
174 pub system_parameter_defaults: BTreeMap<String, String>,
177 pub helm_chart_version: Option<String>,
179 pub license_key: ValidatedLicenseKey,
181
182 pub aws_account_id: Option<String>,
186 pub aws_privatelink_availability_zones: Option<Vec<String>>,
188
189 pub metrics_registry: MetricsRegistry,
192 pub tracing_handle: TracingHandle,
194
195 pub now: NowFn,
198}
199
200pub struct ListenersConfig {
202 pub sql_listen_addr: SocketAddr,
204 pub http_listen_addr: SocketAddr,
206 pub internal_sql_listen_addr: SocketAddr,
209 pub internal_http_listen_addr: SocketAddr,
211}
212
213#[derive(Debug, Clone)]
215pub struct CatalogConfig {
216 pub persist_clients: Arc<PersistClientCache>,
218 pub metrics: Arc<mz_catalog::durable::Metrics>,
220}
221
222pub struct Listeners {
224 sql: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
226 http: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
227 internal_sql: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
228 internal_http: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
229}
230
231impl Listeners {
232 pub async fn bind(
244 ListenersConfig {
245 sql_listen_addr,
246 http_listen_addr,
247 internal_sql_listen_addr,
248 internal_http_listen_addr,
249 }: ListenersConfig,
250 ) -> Result<Listeners, io::Error> {
251 let sql = mz_server_core::listen(&sql_listen_addr).await?;
252 let http = mz_server_core::listen(&http_listen_addr).await?;
253 let internal_sql = mz_server_core::listen(&internal_sql_listen_addr).await?;
254 let internal_http = mz_server_core::listen(&internal_http_listen_addr).await?;
255 Ok(Listeners {
256 sql,
257 http,
258 internal_sql,
259 internal_http,
260 })
261 }
262
263 pub async fn bind_any_local() -> Result<Listeners, io::Error> {
266 Listeners::bind(ListenersConfig {
267 sql_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
268 http_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
269 internal_sql_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
270 internal_http_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
271 })
272 .await
273 }
274
275 #[instrument(name = "environmentd::serve")]
279 pub async fn serve(self, config: Config) -> Result<Server, AdapterError> {
280 let serve_start = Instant::now();
281 info!("startup: envd serve: beginning");
282 info!("startup: envd serve: preamble beginning");
283
284 let Listeners {
285 sql: (sql_listener, sql_conns),
286 http: (http_listener, http_conns),
287 internal_sql: (internal_sql_listener, internal_sql_conns),
288 internal_http: (internal_http_listener, internal_http_conns),
289 } = self;
290
291 let (pgwire_tls, http_tls) = match &config.tls {
293 None => (None, None),
294 Some(tls_config) => {
295 let context = tls_config.reloading_context(config.tls_reload_certs)?;
296 let pgwire_tls = mz_server_core::ReloadingTlsConfig {
297 context: context.clone(),
298 mode: mz_server_core::TlsMode::Require,
299 };
300 let http_tls = http::ReloadingTlsConfig {
301 context,
302 mode: http::TlsMode::Require,
303 };
304 (Some(pgwire_tls), Some(http_tls))
305 }
306 };
307
308 let active_connection_counter = ConnectionCounter::default();
309 let (deployment_state, deployment_state_handle) = DeploymentState::new();
310
311 let (internal_http_adapter_client_tx, internal_http_adapter_client_rx) = oneshot::channel();
318 task::spawn(|| "internal_http_server", {
319 let internal_http_server = InternalHttpServer::new(InternalHttpConfig {
320 metrics_registry: config.metrics_registry.clone(),
321 adapter_client_rx: internal_http_adapter_client_rx,
322 active_connection_counter: active_connection_counter.clone(),
323 helm_chart_version: config.helm_chart_version.clone(),
324 deployment_state_handle,
325 internal_console_redirect_url: config.internal_console_redirect_url,
326 });
327 mz_server_core::serve(ServeConfig {
328 server: internal_http_server,
329 conns: internal_http_conns,
330 dyncfg: None,
333 })
334 });
335
336 info!(
337 "startup: envd serve: preamble complete in {:?}",
338 serve_start.elapsed()
339 );
340
341 let catalog_init_start = Instant::now();
342 info!("startup: envd serve: catalog init beginning");
343
344 let boot_ts = (config.now)().into();
346
347 let persist_client = config
348 .catalog_config
349 .persist_clients
350 .open(config.controller.persist_location.clone())
351 .await
352 .context("opening persist client")?;
353 let mut openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
354 persist_client.clone(),
355 config.environment_id.organization_id(),
356 BUILD_INFO.semver_version(),
357 Some(config.controller.deploy_generation),
358 Arc::clone(&config.catalog_config.metrics),
359 )
360 .await?;
361
362 info!(
363 "startup: envd serve: catalog init complete in {:?}",
364 catalog_init_start.elapsed()
365 );
366
367 let system_param_sync_start = Instant::now();
368 info!("startup: envd serve: system parameter sync beginning");
369 let system_parameter_sync_config = if let Some(ld_sdk_key) = config.launchdarkly_sdk_key {
371 Some(SystemParameterSyncConfig::new(
372 config.environment_id.clone(),
373 &BUILD_INFO,
374 &config.metrics_registry,
375 config.now.clone(),
376 ld_sdk_key,
377 config.launchdarkly_key_map,
378 ))
379 } else {
380 None
381 };
382 let remote_system_parameters = load_remote_system_parameters(
383 &mut openable_adapter_storage,
384 system_parameter_sync_config.clone(),
385 config.config_sync_timeout,
386 )
387 .await?;
388 info!(
389 "startup: envd serve: system parameter sync complete in {:?}",
390 system_param_sync_start.elapsed()
391 );
392
393 let preflight_checks_start = Instant::now();
394 info!("startup: envd serve: preflight checks beginning");
395
396 let enable_0dt_deployment = {
398 let cli_default = config
399 .system_parameter_defaults
400 .get(ENABLE_0DT_DEPLOYMENT.name())
401 .map(|x| {
402 strconv::parse_bool(x).map_err(|err| {
403 anyhow!(
404 "failed to parse default for {}: {}",
405 ENABLE_0DT_DEPLOYMENT.name(),
406 err
407 )
408 })
409 })
410 .transpose()?;
411 let compiled_default = ENABLE_0DT_DEPLOYMENT.default().clone();
412 let ld = get_ld_value("enable_0dt_deployment", &remote_system_parameters, |x| {
413 strconv::parse_bool(x).map_err(|x| x.to_string())
414 })?;
415 let catalog = openable_adapter_storage.get_enable_0dt_deployment().await?;
416 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
417 info!(
418 %computed,
419 ?ld,
420 ?catalog,
421 ?cli_default,
422 ?compiled_default,
423 "determined value for enable_0dt_deployment system parameter",
424 );
425 computed
426 };
427 let with_0dt_deployment_max_wait = {
429 let cli_default = config
430 .system_parameter_defaults
431 .get(WITH_0DT_DEPLOYMENT_MAX_WAIT.name())
432 .map(|x| {
433 Duration::parse(VarInput::Flat(x)).map_err(|err| {
434 anyhow!(
435 "failed to parse default for {}: {:?}",
436 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
437 err
438 )
439 })
440 })
441 .transpose()?;
442 let compiled_default = WITH_0DT_DEPLOYMENT_MAX_WAIT.default().clone();
443 let ld = get_ld_value(
444 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
445 &remote_system_parameters,
446 |x| {
447 Duration::parse(VarInput::Flat(x)).map_err(|err| {
448 format!(
449 "failed to parse LD value {} for {}: {:?}",
450 x,
451 WITH_0DT_DEPLOYMENT_MAX_WAIT.name(),
452 err
453 )
454 })
455 },
456 )?;
457 let catalog = openable_adapter_storage
458 .get_0dt_deployment_max_wait()
459 .await?;
460 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
461 info!(
462 ?computed,
463 ?ld,
464 ?catalog,
465 ?cli_default,
466 ?compiled_default,
467 "determined value for {} system parameter",
468 WITH_0DT_DEPLOYMENT_MAX_WAIT.name()
469 );
470 computed
471 };
472 let with_0dt_deployment_ddl_check_interval = {
474 let cli_default = config
475 .system_parameter_defaults
476 .get(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name())
477 .map(|x| {
478 Duration::parse(VarInput::Flat(x)).map_err(|err| {
479 anyhow!(
480 "failed to parse default for {}: {:?}",
481 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
482 err
483 )
484 })
485 })
486 .transpose()?;
487 let compiled_default = WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.default().clone();
488 let ld = get_ld_value(
489 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
490 &remote_system_parameters,
491 |x| {
492 Duration::parse(VarInput::Flat(x)).map_err(|err| {
493 format!(
494 "failed to parse LD value {} for {}: {:?}",
495 x,
496 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name(),
497 err
498 )
499 })
500 },
501 )?;
502 let catalog = openable_adapter_storage
503 .get_0dt_deployment_ddl_check_interval()
504 .await?;
505 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
506 info!(
507 ?computed,
508 ?ld,
509 ?catalog,
510 ?cli_default,
511 ?compiled_default,
512 "determined value for {} system parameter",
513 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.name()
514 );
515 computed
516 };
517
518 let enable_0dt_deployment_panic_after_timeout = {
521 let cli_default = config
522 .system_parameter_defaults
523 .get(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name())
524 .map(|x| {
525 strconv::parse_bool(x).map_err(|err| {
526 anyhow!(
527 "failed to parse default for {}: {}",
528 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.name(),
529 err
530 )
531 })
532 })
533 .transpose()?;
534 let compiled_default = ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.default().clone();
535 let ld = get_ld_value(
536 "enable_0dt_deployment_panic_after_timeout",
537 &remote_system_parameters,
538 |x| strconv::parse_bool(x).map_err(|x| x.to_string()),
539 )?;
540 let catalog = openable_adapter_storage
541 .get_enable_0dt_deployment_panic_after_timeout()
542 .await?;
543 let computed = ld.or(catalog).or(cli_default).unwrap_or(compiled_default);
544 info!(
545 %computed,
546 ?ld,
547 ?catalog,
548 ?cli_default,
549 ?compiled_default,
550 "determined value for enable_0dt_deployment_panic_after_timeout system parameter",
551 );
552 computed
553 };
554
555 let mut read_only = false;
559 let mut caught_up_trigger = None;
560 let bootstrap_args = BootstrapArgs {
561 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
562 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
563 bootstrap_role: config.bootstrap_role.clone(),
564 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
565 };
566 let preflight_config = PreflightInput {
567 boot_ts,
568 environment_id: config.environment_id.clone(),
569 persist_client,
570 deploy_generation: config.controller.deploy_generation,
571 deployment_state: deployment_state.clone(),
572 openable_adapter_storage,
573 catalog_metrics: Arc::clone(&config.catalog_config.metrics),
574 caught_up_max_wait: with_0dt_deployment_max_wait,
575 panic_after_timeout: enable_0dt_deployment_panic_after_timeout,
576 bootstrap_args,
577 ddl_check_interval: with_0dt_deployment_ddl_check_interval,
578 };
579 if enable_0dt_deployment {
580 PreflightOutput {
581 openable_adapter_storage,
582 read_only,
583 caught_up_trigger,
584 } = deployment::preflight::preflight_0dt(preflight_config).await?;
585 } else {
586 openable_adapter_storage =
587 deployment::preflight::preflight_legacy(preflight_config).await?;
588 };
589
590 info!(
591 "startup: envd serve: preflight checks complete in {:?}",
592 preflight_checks_start.elapsed()
593 );
594
595 let catalog_open_start = Instant::now();
596 info!("startup: envd serve: durable catalog open beginning");
597
598 let bootstrap_args = BootstrapArgs {
599 default_cluster_replica_size: config.bootstrap_default_cluster_replica_size.clone(),
600 default_cluster_replication_factor: config.bootstrap_default_cluster_replication_factor,
601 bootstrap_role: config.bootstrap_role,
602 cluster_replica_size_map: config.cluster_replica_sizes.clone(),
603 };
604
605 let (adapter_storage, audit_logs_iterator) = if read_only {
607 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
610 .open_savepoint(boot_ts, &bootstrap_args)
611 .await?;
612 (adapter_storage, audit_logs_iterator)
617 } else {
618 let (adapter_storage, audit_logs_iterator) = openable_adapter_storage
619 .open(boot_ts, &bootstrap_args)
620 .await?;
621
622 deployment_state.set_is_leader();
626
627 (adapter_storage, audit_logs_iterator)
628 };
629
630 if !read_only {
632 config.controller.persist_clients.cfg().enable_compaction();
633 }
634
635 info!(
636 "startup: envd serve: durable catalog open complete in {:?}",
637 catalog_open_start.elapsed()
638 );
639
640 let coord_init_start = Instant::now();
641 info!("startup: envd serve: coordinator init beginning");
642
643 if !config
644 .cluster_replica_sizes
645 .0
646 .contains_key(&config.bootstrap_default_cluster_replica_size)
647 {
648 return Err(anyhow!("bootstrap default cluster replica size is unknown").into());
649 }
650 let envd_epoch = adapter_storage.epoch();
651
652 let storage_usage_client = StorageUsageClient::open(
654 config
655 .controller
656 .persist_clients
657 .open(config.controller.persist_location.clone())
658 .await
659 .context("opening storage usage client")?,
660 );
661
662 let segment_client = config.segment_api_key.map(|api_key| {
664 mz_segment::Client::new(mz_segment::Config {
665 api_key,
666 client_side: config.segment_client_side,
667 })
668 });
669 let connection_limiter = active_connection_counter.clone();
670 let connection_limit_callback = Box::new(move |limit, superuser_reserved| {
671 connection_limiter.update_limit(limit);
672 connection_limiter.update_superuser_reserved(superuser_reserved);
673 });
674
675 let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
676 let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
677 connection_context: config.controller.connection_context.clone(),
678 connection_limit_callback,
679 controller_config: config.controller,
680 controller_envd_epoch: envd_epoch,
681 storage: adapter_storage,
682 audit_logs_iterator,
683 timestamp_oracle_url: config.timestamp_oracle_url,
684 unsafe_mode: config.unsafe_mode,
685 all_features: config.all_features,
686 build_info: &BUILD_INFO,
687 environment_id: config.environment_id.clone(),
688 metrics_registry: config.metrics_registry.clone(),
689 now: config.now,
690 secrets_controller: config.secrets_controller,
691 cloud_resource_controller: config.cloud_resource_controller,
692 cluster_replica_sizes: config.cluster_replica_sizes,
693 builtin_system_cluster_config: config.bootstrap_builtin_system_cluster_config,
694 builtin_catalog_server_cluster_config: config
695 .bootstrap_builtin_catalog_server_cluster_config,
696 builtin_probe_cluster_config: config.bootstrap_builtin_probe_cluster_config,
697 builtin_support_cluster_config: config.bootstrap_builtin_support_cluster_config,
698 builtin_analytics_cluster_config: config.bootstrap_builtin_analytics_cluster_config,
699 availability_zones: config.availability_zones,
700 system_parameter_defaults: config.system_parameter_defaults,
701 storage_usage_client,
702 storage_usage_collection_interval: config.storage_usage_collection_interval,
703 storage_usage_retention_period: config.storage_usage_retention_period,
704 segment_client: segment_client.clone(),
705 egress_addresses: config.egress_addresses,
706 remote_system_parameters,
707 aws_account_id: config.aws_account_id,
708 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
709 webhook_concurrency_limit: webhook_concurrency_limit.clone(),
710 http_host_name: config.http_host_name,
711 tracing_handle: config.tracing_handle,
712 read_only_controllers: read_only,
713 enable_0dt_deployment,
714 caught_up_trigger,
715 helm_chart_version: config.helm_chart_version.clone(),
716 license_key: config.license_key,
717 })
718 .instrument(info_span!("adapter::serve"))
719 .await?;
720
721 info!(
722 "startup: envd serve: coordinator init complete in {:?}",
723 coord_init_start.elapsed()
724 );
725
726 let serve_postamble_start = Instant::now();
727 info!("startup: envd serve: postamble beginning");
728
729 internal_http_adapter_client_tx
731 .send(adapter_client.clone())
732 .expect("internal HTTP server should not drop first");
733
734 let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
735 task::spawn(|| "sql_server", {
737 let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
738 label: "external_pgwire",
739 tls: pgwire_tls.clone(),
740 adapter_client: adapter_client.clone(),
741 frontegg: config.frontegg.clone(),
742 use_self_hosted_auth: config.self_hosted_auth,
743 metrics: metrics.clone(),
744 internal: false,
745 active_connection_counter: active_connection_counter.clone(),
746 helm_chart_version: config.helm_chart_version.clone(),
747 });
748 mz_server_core::serve(ServeConfig {
749 conns: sql_conns,
750 server: sql_server,
751 dyncfg: None,
754 })
755 });
756
757 task::spawn(|| "internal_sql_server", {
759 let internal_sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
760 label: "internal_pgwire",
761 tls: pgwire_tls.map(|mut pgwire_tls| {
762 pgwire_tls.mode = mz_server_core::TlsMode::Allow;
769 pgwire_tls
770 }),
771 adapter_client: adapter_client.clone(),
772 frontegg: None,
773 use_self_hosted_auth: config.self_hosted_auth_internal,
774 metrics: metrics.clone(),
775 internal: true,
776 active_connection_counter: active_connection_counter.clone(),
777 helm_chart_version: config.helm_chart_version.clone(),
778 });
779 mz_server_core::serve(ServeConfig {
780 conns: internal_sql_conns,
781 server: internal_sql_server,
782 dyncfg: None,
785 })
786 });
787
788 let http_metrics = http::Metrics::register_into(&config.metrics_registry, "mz_http");
790 task::spawn(|| "http_server", {
791 let http_server = HttpServer::new(HttpConfig {
792 source: "external",
793 tls: http_tls,
794 frontegg: config.frontegg.clone(),
795 adapter_client: adapter_client.clone(),
796 allowed_origin: config.cors_allowed_origin.clone(),
797 active_connection_counter: active_connection_counter.clone(),
798 helm_chart_version: config.helm_chart_version.clone(),
799 concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
800 metrics: http_metrics.clone(),
801 });
802 mz_server_core::serve(ServeConfig {
803 conns: http_conns,
804 server: http_server,
805 dyncfg: None,
808 })
809 });
810
811 if let Some(segment_client) = segment_client {
813 telemetry::start_reporting(telemetry::Config {
814 segment_client,
815 adapter_client: adapter_client.clone(),
816 environment_id: config.environment_id,
817 });
818 }
819
820 if let Some(system_parameter_sync_config) = system_parameter_sync_config {
823 task::spawn(
824 || "system_parameter_sync",
825 AssertUnwindSafe(system_parameter_sync(
826 system_parameter_sync_config,
827 adapter_client,
828 config.config_sync_loop_interval,
829 ))
830 .ore_catch_unwind(),
831 );
832 }
833
834 info!(
835 "startup: envd serve: postamble complete in {:?}",
836 serve_postamble_start.elapsed()
837 );
838 info!(
839 "startup: envd serve: complete in {:?}",
840 serve_start.elapsed()
841 );
842
843 Ok(Server {
844 sql_listener,
845 http_listener,
846 internal_sql_listener,
847 internal_http_listener,
848 _adapter_handle: adapter_handle,
849 })
850 }
851
852 pub fn sql_local_addr(&self) -> SocketAddr {
853 self.sql.0.local_addr()
854 }
855
856 pub fn http_local_addr(&self) -> SocketAddr {
857 self.http.0.local_addr()
858 }
859
860 pub fn internal_sql_local_addr(&self) -> SocketAddr {
861 self.internal_sql.0.local_addr()
862 }
863
864 pub fn internal_http_local_addr(&self) -> SocketAddr {
865 self.internal_http.0.local_addr()
866 }
867}
868
869fn get_ld_value<V>(
870 name: &str,
871 remote_system_parameters: &Option<BTreeMap<String, String>>,
872 parse: impl Fn(&str) -> Result<V, String>,
873) -> Result<Option<V>, anyhow::Error> {
874 remote_system_parameters
875 .as_ref()
876 .and_then(|params| params.get(name))
877 .map(|x| {
878 parse(x).map_err(|err| anyhow!("failed to parse remote value for {}: {}", name, err))
879 })
880 .transpose()
881}
882
883pub struct Server {
885 sql_listener: ListenerHandle,
887 http_listener: ListenerHandle,
888 internal_sql_listener: ListenerHandle,
889 internal_http_listener: ListenerHandle,
890 _adapter_handle: mz_adapter::Handle,
891}
892
893impl Server {
894 pub fn sql_local_addr(&self) -> SocketAddr {
895 self.sql_listener.local_addr()
896 }
897
898 pub fn http_local_addr(&self) -> SocketAddr {
899 self.http_listener.local_addr()
900 }
901
902 pub fn internal_sql_local_addr(&self) -> SocketAddr {
903 self.internal_sql_listener.local_addr()
904 }
905
906 pub fn internal_http_local_addr(&self) -> SocketAddr {
907 self.internal_http_listener.local_addr()
908 }
909}