#![allow(unknown_lints)]
#![allow(clippy::style)]
#![allow(clippy::complexity)]
#![allow(clippy::large_enum_variant)]
#![allow(clippy::mutable_key_type)]
#![allow(clippy::stable_sort_primitive)]
#![allow(clippy::map_entry)]
#![allow(clippy::box_default)]
#![allow(clippy::drain_collect)]
#![warn(clippy::bool_comparison)]
#![warn(clippy::clone_on_ref_ptr)]
#![warn(clippy::no_effect)]
#![warn(clippy::unnecessary_unwrap)]
#![warn(clippy::dbg_macro)]
#![warn(clippy::todo)]
#![warn(clippy::wildcard_dependencies)]
#![warn(clippy::zero_prefixed_literal)]
#![warn(clippy::borrowed_box)]
#![warn(clippy::deref_addrof)]
#![warn(clippy::double_must_use)]
#![warn(clippy::double_parens)]
#![warn(clippy::extra_unused_lifetimes)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_question_mark)]
#![warn(clippy::needless_return)]
#![warn(clippy::redundant_pattern)]
#![warn(clippy::redundant_slicing)]
#![warn(clippy::redundant_static_lifetimes)]
#![warn(clippy::single_component_path_imports)]
#![warn(clippy::unnecessary_cast)]
#![warn(clippy::useless_asref)]
#![warn(clippy::useless_conversion)]
#![warn(clippy::builtin_type_shadow)]
#![warn(clippy::duplicate_underscore_argument)]
#![warn(clippy::double_neg)]
#![warn(clippy::unnecessary_mut_passed)]
#![warn(clippy::wildcard_in_or_patterns)]
#![warn(clippy::crosspointer_transmute)]
#![warn(clippy::excessive_precision)]
#![warn(clippy::overflow_check_conditional)]
#![warn(clippy::as_conversions)]
#![warn(clippy::match_overlapping_arm)]
#![warn(clippy::zero_divided_by_zero)]
#![warn(clippy::must_use_unit)]
#![warn(clippy::suspicious_assignment_formatting)]
#![warn(clippy::suspicious_else_formatting)]
#![warn(clippy::suspicious_unary_op_formatting)]
#![warn(clippy::mut_mutex_lock)]
#![warn(clippy::print_literal)]
#![warn(clippy::same_item_push)]
#![warn(clippy::useless_format)]
#![warn(clippy::write_literal)]
#![warn(clippy::redundant_closure)]
#![warn(clippy::redundant_closure_call)]
#![warn(clippy::unnecessary_lazy_evaluations)]
#![warn(clippy::partialeq_ne_impl)]
#![warn(clippy::redundant_field_names)]
#![warn(clippy::transmutes_expressible_as_ptr_casts)]
#![warn(clippy::unused_async)]
#![warn(clippy::disallowed_methods)]
#![warn(clippy::disallowed_macros)]
#![warn(clippy::disallowed_types)]
#![warn(clippy::from_over_into)]
use std::collections::BTreeMap;
use std::env;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::panic::AssertUnwindSafe;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::{anyhow, bail, Context};
use futures::FutureExt;
use mz_adapter::catalog::ClusterReplicaSizeMap;
use mz_adapter::config::{system_parameter_sync, SystemParameterSyncConfig};
use mz_adapter::webhook::WebhookConcurrencyLimiter;
use mz_build_info::{build_info, BuildInfo};
use mz_catalog::durable::{BootstrapArgs, OpenableDurableCatalogState, StashConfig};
use mz_cloud_resources::CloudResourceController;
use mz_controller::ControllerConfig;
use mz_frontegg_auth::Authentication as FronteggAuthentication;
use mz_ore::future::OreFutureExt;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::NowFn;
use mz_ore::task;
use mz_ore::tracing::TracingHandle;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::usage::StorageUsageClient;
use mz_secrets::SecretsController;
use mz_server_core::{ConnectionStream, ListenerHandle, TlsCertConfig};
use mz_sql::catalog::EnvironmentId;
use mz_sql::session::vars::ConnectionCounter;
use mz_storage_types::controller::PersistTxnTablesImpl;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::RecvError;
use tower_http::cors::AllowOrigin;
use tracing::info;
use crate::http::{HttpConfig, HttpServer, InternalHttpConfig, InternalHttpServer};
pub mod http;
mod telemetry;
#[cfg(feature = "test")]
pub mod test_util;
pub use crate::http::{SqlResponse, WebSocketAuth, WebSocketResponse};
pub const BUILD_INFO: BuildInfo = build_info!();
#[derive(Debug, Clone)]
pub struct Config {
pub unsafe_mode: bool,
pub all_features: bool,
pub cors_allowed_origin: AllowOrigin,
pub tls: Option<TlsCertConfig>,
pub frontegg: Option<FronteggAuthentication>,
pub controller: ControllerConfig,
pub secrets_controller: Arc<dyn SecretsController>,
pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
pub persist_txn_tables_cli: Option<PersistTxnTablesImpl>,
pub catalog_config: CatalogConfig,
pub timestamp_oracle_url: Option<String>,
pub environment_id: EnvironmentId,
pub availability_zones: Vec<String>,
pub cluster_replica_sizes: ClusterReplicaSizeMap,
pub default_storage_cluster_size: Option<String>,
pub bootstrap_default_cluster_replica_size: String,
pub bootstrap_builtin_cluster_replica_size: String,
pub system_parameter_defaults: BTreeMap<String, String>,
pub storage_usage_collection_interval: Duration,
pub storage_usage_retention_period: Option<Duration>,
pub segment_api_key: Option<String>,
pub egress_ips: Vec<Ipv4Addr>,
pub aws_account_id: Option<String>,
pub aws_privatelink_availability_zones: Option<Vec<String>>,
pub launchdarkly_sdk_key: Option<String>,
pub config_sync_loop_interval: Option<Duration>,
pub launchdarkly_key_map: BTreeMap<String, String>,
pub bootstrap_role: Option<String>,
pub deploy_generation: Option<u64>,
pub http_host_name: Option<String>,
pub internal_console_redirect_url: Option<String>,
pub metrics_registry: MetricsRegistry,
pub tracing_handle: TracingHandle,
pub now: NowFn,
}
pub struct ListenersConfig {
pub sql_listen_addr: SocketAddr,
pub http_listen_addr: SocketAddr,
pub balancer_sql_listen_addr: SocketAddr,
pub balancer_http_listen_addr: SocketAddr,
pub internal_sql_listen_addr: SocketAddr,
pub internal_http_listen_addr: SocketAddr,
}
#[derive(Debug, Clone)]
pub enum CatalogConfig {
Stash {
url: String,
},
Persist {
persist_clients: Arc<PersistClientCache>,
metrics: Arc<mz_catalog::durable::Metrics>,
},
Shadow {
url: String,
persist_clients: Arc<PersistClientCache>,
},
}
pub struct Listeners {
sql: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
http: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
balancer_sql: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
balancer_http: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
internal_sql: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
internal_http: (ListenerHandle, Pin<Box<dyn ConnectionStream>>),
}
impl Listeners {
pub async fn bind(
ListenersConfig {
sql_listen_addr,
http_listen_addr,
balancer_sql_listen_addr,
balancer_http_listen_addr,
internal_sql_listen_addr,
internal_http_listen_addr,
}: ListenersConfig,
) -> Result<Listeners, anyhow::Error> {
let sql = mz_server_core::listen(&sql_listen_addr).await?;
let http = mz_server_core::listen(&http_listen_addr).await?;
let balancer_sql = mz_server_core::listen(&balancer_sql_listen_addr).await?;
let balancer_http = mz_server_core::listen(&balancer_http_listen_addr).await?;
let internal_sql = mz_server_core::listen(&internal_sql_listen_addr).await?;
let internal_http = mz_server_core::listen(&internal_http_listen_addr).await?;
Ok(Listeners {
sql,
http,
balancer_sql,
balancer_http,
internal_sql,
internal_http,
})
}
pub async fn bind_any_local() -> Result<Listeners, anyhow::Error> {
Listeners::bind(ListenersConfig {
sql_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
http_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
balancer_sql_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
balancer_http_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
internal_sql_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
internal_http_listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
})
.await
}
#[tracing::instrument(name = "environmentd::serve", level = "info", skip_all)]
pub async fn serve(self, config: Config) -> Result<Server, anyhow::Error> {
let Listeners {
sql: (sql_listener, sql_conns),
http: (http_listener, http_conns),
balancer_sql: (balancer_sql_listener, balancer_sql_conns),
balancer_http: (balancer_http_listener, balancer_http_conns),
internal_sql: (internal_sql_listener, internal_sql_conns),
internal_http: (internal_http_listener, internal_http_conns),
} = self;
let (pgwire_tls, http_tls) = match &config.tls {
None => (None, None),
Some(tls_config) => {
let context = tls_config.context()?;
let pgwire_tls = mz_server_core::TlsConfig {
context: context.clone(),
mode: mz_server_core::TlsMode::Require,
};
let http_tls = http::TlsConfig {
context,
mode: http::TlsMode::Require,
};
(Some(pgwire_tls), Some(http_tls))
}
};
let active_connection_count = Arc::new(Mutex::new(ConnectionCounter::new(0)));
let (ready_to_promote_tx, ready_to_promote_rx) = oneshot::channel();
let (promote_leader_tx, promote_leader_rx) = oneshot::channel();
let (internal_http_adapter_client_tx, internal_http_adapter_client_rx) = oneshot::channel();
task::spawn(|| "internal_http_server", {
let internal_http_server = InternalHttpServer::new(InternalHttpConfig {
metrics_registry: config.metrics_registry.clone(),
adapter_client_rx: internal_http_adapter_client_rx,
active_connection_count: Arc::clone(&active_connection_count),
promote_leader: promote_leader_tx,
ready_to_promote: ready_to_promote_rx,
internal_console_redirect_url: config.internal_console_redirect_url,
});
mz_server_core::serve(internal_http_conns, internal_http_server)
});
let boot_ts = (config.now)();
'leader_promotion: {
let Some(deploy_generation) = config.deploy_generation else {
break 'leader_promotion;
};
tracing::info!("Requested deploy generation {deploy_generation}");
let mut openable_adapter_storage = catalog_opener(
&config.catalog_config,
&config.controller,
&config.environment_id,
)
.boxed()
.await?;
if !openable_adapter_storage.is_initialized().await? {
tracing::info!("Catalog storage doesn't exist so there's no current deploy generation. We won't wait to be leader");
openable_adapter_storage.expire().await;
break 'leader_promotion;
}
let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
tracing::info!("Found catalog generation {catalog_generation:?}");
if catalog_generation < Some(deploy_generation) {
tracing::info!("Catalog generation {catalog_generation:?} is less than deploy generation {deploy_generation}. Performing pre-flight checks");
match openable_adapter_storage
.open_savepoint(
boot_ts.clone(),
&BootstrapArgs {
default_cluster_replica_size: config
.bootstrap_default_cluster_replica_size
.clone(),
bootstrap_role: config.bootstrap_role.clone(),
},
None,
)
.await
{
Ok(adapter_storage) => Box::new(adapter_storage).expire().await,
Err(e) => {
return Err(
anyhow!(e).context("Catalog upgrade would have failed with this error")
)
}
}
if let Err(()) = ready_to_promote_tx.send(()) {
return Err(anyhow!(
"internal http server closed its end of ready_to_promote"
));
}
tracing::info!("Waiting for user to promote this envd to leader");
if let Err(RecvError { .. }) = promote_leader_rx.await {
return Err(anyhow!(
"internal http server closed its end of promote_leader"
));
}
} else if catalog_generation == Some(deploy_generation) {
tracing::info!("Server requested generation {deploy_generation} which is equal to catalog's generation");
openable_adapter_storage.expire().await;
} else {
openable_adapter_storage.expire().await;
mz_ore::halt!("Server started with requested generation {deploy_generation} but catalog was already at {catalog_generation:?}. Deploy generations must increase monotonically");
}
}
let openable_adapter_storage = catalog_opener(
&config.catalog_config,
&config.controller,
&config.environment_id,
)
.boxed()
.await?;
let mut adapter_storage = openable_adapter_storage
.open(
boot_ts,
&BootstrapArgs {
default_cluster_replica_size: config
.bootstrap_default_cluster_replica_size
.clone(),
bootstrap_role: config.bootstrap_role,
},
config.deploy_generation,
)
.await?;
let persist_txn_tables_stash_ld = adapter_storage.get_persist_txn_tables().await?;
if !config
.cluster_replica_sizes
.0
.contains_key(&config.bootstrap_default_cluster_replica_size)
{
bail!("bootstrap default cluster replica size is unknown");
}
let envd_epoch = adapter_storage.epoch();
let storage_usage_client = StorageUsageClient::open(
config
.controller
.persist_clients
.open(config.controller.persist_location.clone())
.await
.context("opening storage usage client")?,
);
let mut persist_txn_tables =
persist_txn_tables_stash_ld.unwrap_or(PersistTxnTablesImpl::Off);
if let Some(value) = config.persist_txn_tables_cli {
persist_txn_tables = value;
}
info!(
"persist_txn_tables value of {} computed from catalog {:?} and flag {:?}",
persist_txn_tables, persist_txn_tables_stash_ld, config.persist_txn_tables_cli,
);
let controller =
mz_controller::Controller::new(config.controller, envd_epoch, persist_txn_tables)
.boxed()
.await;
let system_parameter_sync_config = if let Some(ld_sdk_key) = config.launchdarkly_sdk_key {
Some(SystemParameterSyncConfig::new(
config.environment_id.clone(),
&BUILD_INFO,
&config.metrics_registry,
config.now.clone(),
ld_sdk_key,
config.launchdarkly_key_map,
))
} else {
None
};
let segment_client = config.segment_api_key.map(mz_segment::Client::new);
let webhook_concurrency_limit = WebhookConcurrencyLimiter::default();
let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
dataflow_client: controller,
storage: adapter_storage,
timestamp_oracle_url: config.timestamp_oracle_url,
unsafe_mode: config.unsafe_mode,
all_features: config.all_features,
build_info: &BUILD_INFO,
environment_id: config.environment_id.clone(),
metrics_registry: config.metrics_registry.clone(),
now: config.now,
secrets_controller: config.secrets_controller,
cloud_resource_controller: config.cloud_resource_controller,
cluster_replica_sizes: config.cluster_replica_sizes,
default_storage_cluster_size: config.default_storage_cluster_size,
builtin_cluster_replica_size: config.bootstrap_builtin_cluster_replica_size,
availability_zones: config.availability_zones,
system_parameter_defaults: config.system_parameter_defaults,
storage_usage_client,
storage_usage_collection_interval: config.storage_usage_collection_interval,
storage_usage_retention_period: config.storage_usage_retention_period,
segment_client: segment_client.clone(),
egress_ips: config.egress_ips,
system_parameter_sync_config: system_parameter_sync_config.clone(),
aws_account_id: config.aws_account_id,
aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
active_connection_count: Arc::clone(&active_connection_count),
webhook_concurrency_limit: webhook_concurrency_limit.clone(),
http_host_name: config.http_host_name,
tracing_handle: config.tracing_handle,
})
.await?;
internal_http_adapter_client_tx
.send(adapter_client.clone())
.expect("internal HTTP server should not drop first");
let metrics = mz_pgwire::MetricsConfig::register_into(&config.metrics_registry);
task::spawn(|| "sql_server", {
let sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
tls: pgwire_tls.clone(),
adapter_client: adapter_client.clone(),
frontegg: config.frontegg.clone(),
metrics: metrics.clone(),
internal: false,
active_connection_count: Arc::clone(&active_connection_count),
});
mz_server_core::serve(sql_conns, sql_server)
});
task::spawn(|| "internal_sql_server", {
let internal_sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
tls: pgwire_tls.map(|mut pgwire_tls| {
pgwire_tls.mode = mz_server_core::TlsMode::Allow;
pgwire_tls
}),
adapter_client: adapter_client.clone(),
frontegg: None,
metrics: metrics.clone(),
internal: true,
active_connection_count: Arc::clone(&active_connection_count),
});
mz_server_core::serve(internal_sql_conns, internal_sql_server)
});
let http_metrics = http::Metrics::register_into(&config.metrics_registry, "mz_http");
task::spawn(|| "http_server", {
let http_server = HttpServer::new(HttpConfig {
tls: http_tls,
frontegg: config.frontegg.clone(),
adapter_client: adapter_client.clone(),
allowed_origin: config.cors_allowed_origin.clone(),
active_connection_count: Arc::clone(&active_connection_count),
concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
metrics: http_metrics.clone(),
});
mz_server_core::serve(http_conns, http_server)
});
task::spawn(|| "balancer_http_server", {
let balancer_http_server = HttpServer::new(HttpConfig {
tls: None,
frontegg: config.frontegg.clone(),
adapter_client: adapter_client.clone(),
allowed_origin: config.cors_allowed_origin,
active_connection_count: Arc::clone(&active_connection_count),
concurrent_webhook_req: webhook_concurrency_limit.semaphore(),
metrics: http_metrics,
});
mz_server_core::serve(balancer_http_conns, balancer_http_server)
});
task::spawn(|| "balancer_sql_server", {
let balancer_sql_server = mz_pgwire::Server::new(mz_pgwire::Config {
tls: None,
adapter_client: adapter_client.clone(),
frontegg: config.frontegg.clone(),
metrics,
internal: false,
active_connection_count: Arc::clone(&active_connection_count),
});
mz_server_core::serve(balancer_sql_conns, balancer_sql_server)
});
if let Some(segment_client) = segment_client {
telemetry::start_reporting(telemetry::Config {
segment_client,
adapter_client: adapter_client.clone(),
environment_id: config.environment_id,
});
}
if let Some(system_parameter_sync_config) = system_parameter_sync_config {
task::spawn(
|| "system_parameter_sync",
AssertUnwindSafe(system_parameter_sync(
system_parameter_sync_config,
adapter_client,
config.config_sync_loop_interval,
))
.ore_catch_unwind(),
);
}
Ok(Server {
sql_listener,
http_listener,
balancer_sql_listener,
balancer_http_listener,
internal_sql_listener,
internal_http_listener,
_adapter_handle: adapter_handle,
})
}
pub fn sql_local_addr(&self) -> SocketAddr {
self.sql.0.local_addr()
}
pub fn http_local_addr(&self) -> SocketAddr {
self.http.0.local_addr()
}
pub fn internal_sql_local_addr(&self) -> SocketAddr {
self.internal_sql.0.local_addr()
}
pub fn internal_http_local_addr(&self) -> SocketAddr {
self.internal_http.0.local_addr()
}
}
async fn catalog_opener(
catalog_config: &CatalogConfig,
controller_config: &ControllerConfig,
environment_id: &EnvironmentId,
) -> Result<Box<dyn OpenableDurableCatalogState>, anyhow::Error> {
Ok(match catalog_config {
CatalogConfig::Stash { url } => {
info!("Using stash backed catalog");
let stash_factory =
mz_stash::StashFactory::from_metrics(Arc::clone(&controller_config.stash_metrics));
let tls = mz_tls_util::make_tls(&tokio_postgres::config::Config::from_str(url)?)?;
Box::new(mz_catalog::durable::stash_backed_catalog_state(
StashConfig {
stash_factory,
stash_url: url.clone(),
schema: None,
tls,
},
))
}
CatalogConfig::Persist {
persist_clients,
metrics,
} => {
info!("Using persist backed catalog");
let persist_client = persist_clients
.open(controller_config.persist_location.clone())
.await?;
Box::new(
mz_catalog::durable::persist_backed_catalog_state(
persist_client,
environment_id.organization_id(),
Arc::clone(metrics),
)
.await,
)
}
CatalogConfig::Shadow {
url,
persist_clients,
} => {
info!("Using shadow catalog");
let stash_factory =
mz_stash::StashFactory::from_metrics(Arc::clone(&controller_config.stash_metrics));
let tls = mz_tls_util::make_tls(&tokio_postgres::config::Config::from_str(url)?)?;
let persist_client = persist_clients
.open(controller_config.persist_location.clone())
.await?;
Box::new(
mz_catalog::durable::shadow_catalog_state(
StashConfig {
stash_factory,
stash_url: url.clone(),
schema: None,
tls,
},
persist_client,
environment_id.organization_id(),
)
.await,
)
}
})
}
pub struct Server {
sql_listener: ListenerHandle,
http_listener: ListenerHandle,
balancer_sql_listener: ListenerHandle,
balancer_http_listener: ListenerHandle,
internal_sql_listener: ListenerHandle,
internal_http_listener: ListenerHandle,
_adapter_handle: mz_adapter::Handle,
}
impl Server {
pub fn sql_local_addr(&self) -> SocketAddr {
self.sql_listener.local_addr()
}
pub fn http_local_addr(&self) -> SocketAddr {
self.http_listener.local_addr()
}
pub fn balancer_sql_local_addr(&self) -> SocketAddr {
self.balancer_sql_listener.local_addr()
}
pub fn balancer_http_local_addr(&self) -> SocketAddr {
self.balancer_http_listener.local_addr()
}
pub fn internal_sql_local_addr(&self) -> SocketAddr {
self.internal_sql_listener.local_addr()
}
pub fn internal_http_local_addr(&self) -> SocketAddr {
self.internal_http_listener.local_addr()
}
}