use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::time::Duration;
use fail::fail_point;
use futures::Future;
use maplit::{btreemap, btreeset};
use mz_adapter_types::compaction::SINCE_GRANULARITY;
use mz_adapter_types::connection::ConnectionId;
use mz_audit_log::VersionedEvent;
use mz_catalog::memory::objects::{CatalogItem, Connection, DataSourceDesc, Sink};
use mz_catalog::SYSTEM_CONN_ID;
use mz_compute_client::protocol::response::PeekResponse;
use mz_controller::clusters::ReplicaLocation;
use mz_controller_types::{ClusterId, ReplicaId};
use mz_ore::error::ErrorExt;
use mz_ore::future::InTask;
use mz_ore::instrument;
use mz_ore::now::to_datetime;
use mz_ore::retry::Retry;
use mz_ore::str::StrExt;
use mz_ore::task;
use mz_postgres_util::tunnel::PostgresFlavor;
use mz_repr::adt::numeric::Numeric;
use mz_repr::{CatalogItemId, GlobalId, Timestamp};
use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, CatalogSchema};
use mz_sql::names::ResolvedDatabaseSpecifier;
use mz_sql::plan::ConnectionDetails;
use mz_sql::session::metadata::SessionMetadata;
use mz_sql::session::vars::{
self, SystemVars, Var, MAX_AWS_PRIVATELINK_CONNECTIONS, MAX_CLUSTERS, MAX_CONTINUAL_TASKS,
MAX_CREDIT_CONSUMPTION_RATE, MAX_DATABASES, MAX_KAFKA_CONNECTIONS, MAX_MATERIALIZED_VIEWS,
MAX_MYSQL_CONNECTIONS, MAX_NETWORK_POLICIES, MAX_OBJECTS_PER_SCHEMA, MAX_POSTGRES_CONNECTIONS,
MAX_REPLICAS_PER_CLUSTER, MAX_ROLES, MAX_SCHEMAS_PER_DATABASE, MAX_SECRETS, MAX_SINKS,
MAX_SOURCES, MAX_TABLES,
};
use mz_storage_client::controller::ExportDescription;
use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::connections::PostgresConnection;
use mz_storage_types::read_policy::ReadPolicy;
use mz_storage_types::sources::GenericSourceConnection;
use serde_json::json;
use tracing::{event, info_span, warn, Instrument, Level};
use crate::active_compute_sink::{ActiveComputeSink, ActiveComputeSinkRetireReason};
use crate::catalog::{DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult};
use crate::coord::appends::BuiltinTableAppendNotify;
use crate::coord::timeline::{TimelineContext, TimelineState};
use crate::coord::{Coordinator, ReplicaMetadata};
use crate::session::{Session, Transaction, TransactionOps};
use crate::statement_logging::StatementEndedExecutionReason;
use crate::telemetry::{EventDetails, SegmentClientExt};
use crate::util::ResultExt;
use crate::{catalog, flags, AdapterError, TimestampProvider};
impl Coordinator {
#[instrument(name = "coord::catalog_transact")]
pub(crate) async fn catalog_transact(
&mut self,
session: Option<&Session>,
ops: Vec<catalog::Op>,
) -> Result<(), AdapterError> {
self.catalog_transact_conn(session.map(|session| session.conn_id()), ops)
.await
}
#[instrument(name = "coord::catalog_transact_with_side_effects")]
pub(crate) async fn catalog_transact_with_side_effects<'c, F, Fut>(
&'c mut self,
session: Option<&Session>,
ops: Vec<catalog::Op>,
side_effect: F,
) -> Result<(), AdapterError>
where
F: FnOnce(&'c mut Coordinator) -> Fut,
Fut: Future<Output = ()>,
{
let table_updates = self
.catalog_transact_inner(session.map(|session| session.conn_id()), ops)
.await?;
let side_effects_fut = side_effect(self);
let ((), ()) = futures::future::join(
side_effects_fut.instrument(info_span!(
"coord::catalog_transact_with_side_effects::side_effects_fut"
)),
table_updates.instrument(info_span!(
"coord::catalog_transact_with_side_effects::table_updates"
)),
)
.await;
Ok(())
}
#[instrument(name = "coord::catalog_transact_conn")]
pub(crate) async fn catalog_transact_conn(
&mut self,
conn_id: Option<&ConnectionId>,
ops: Vec<catalog::Op>,
) -> Result<(), AdapterError> {
let table_updates = self.catalog_transact_inner(conn_id, ops).await?;
table_updates
.instrument(info_span!("coord::catalog_transact_conn::table_updates"))
.await;
Ok(())
}
#[instrument(name = "coord::catalog_transact_with_ddl_transaction")]
pub(crate) async fn catalog_transact_with_ddl_transaction(
&mut self,
session: &mut Session,
ops: Vec<catalog::Op>,
) -> Result<(), AdapterError> {
let Some(Transaction {
ops:
TransactionOps::DDL {
ops: txn_ops,
revision: txn_revision,
state: _,
},
..
}) = session.transaction().inner()
else {
return self.catalog_transact(Some(session), ops).await;
};
if self.catalog().transient_revision() != *txn_revision {
return Err(AdapterError::DDLTransactionRace);
}
let mut all_ops = Vec::with_capacity(ops.len() + txn_ops.len() + 1);
all_ops.extend(txn_ops.iter().cloned());
all_ops.extend(ops.clone());
all_ops.push(Op::TransactionDryRun);
let result = self.catalog_transact(Some(session), all_ops).await;
match result {
Err(AdapterError::TransactionDryRun { new_ops, new_state }) => {
session.transaction_mut().add_ops(TransactionOps::DDL {
ops: new_ops,
state: new_state,
revision: self.catalog().transient_revision(),
})?;
Ok(())
}
Ok(_) => unreachable!("unexpected success!"),
Err(e) => Err(e),
}
}
#[instrument(name = "coord::catalog_transact_inner")]
pub(crate) async fn catalog_transact_inner<'a>(
&mut self,
conn_id: Option<&ConnectionId>,
ops: Vec<catalog::Op>,
) -> Result<BuiltinTableAppendNotify, AdapterError> {
if self.controller.read_only() {
return Err(AdapterError::ReadOnly);
}
event!(Level::TRACE, ops = format!("{:?}", ops));
let mut sources_to_drop = vec![];
let mut webhook_sources_to_restart = BTreeSet::new();
let mut table_gids_to_drop = vec![];
let mut storage_sink_gids_to_drop = vec![];
let mut indexes_to_drop = vec![];
let mut materialized_views_to_drop = vec![];
let mut continual_tasks_to_drop = vec![];
let mut views_to_drop = vec![];
let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
let mut secrets_to_drop = vec![];
let mut vpc_endpoints_to_drop = vec![];
let mut clusters_to_drop = vec![];
let mut cluster_replicas_to_drop = vec![];
let mut compute_sinks_to_drop = BTreeMap::new();
let mut peeks_to_drop = vec![];
let mut clusters_to_create = vec![];
let mut cluster_replicas_to_create = vec![];
let mut update_tracing_config = false;
let mut update_compute_config = false;
let mut update_storage_config = false;
let mut update_pg_timestamp_oracle_config = false;
let mut update_metrics_retention = false;
let mut update_secrets_caching_config = false;
let mut update_cluster_scheduling_config = false;
let mut update_arrangement_exert_proportionality = false;
let mut update_http_config = false;
for op in &ops {
match op {
catalog::Op::DropObjects(drop_object_infos) => {
for drop_object_info in drop_object_infos {
match &drop_object_info {
catalog::DropObjectInfo::Item(id) => {
match self.catalog().get_entry(id).item() {
CatalogItem::Table(table) => {
table_gids_to_drop.extend(table.global_ids());
}
CatalogItem::Source(source) => {
sources_to_drop.push((*id, source.global_id()));
if let DataSourceDesc::Ingestion {
ingestion_desc, ..
} = &source.data_source
{
match &ingestion_desc.desc.connection {
GenericSourceConnection::Postgres(conn) => {
let conn = conn.clone().into_inline_connection(
self.catalog().state(),
);
let pending_drop = (
conn.connection.clone(),
conn.publication_details.slot.clone(),
);
replication_slots_to_drop.push(pending_drop);
}
_ => {}
}
}
}
CatalogItem::Sink(sink) => {
storage_sink_gids_to_drop.push(sink.global_id());
}
CatalogItem::Index(index) => {
indexes_to_drop.push((index.cluster_id, index.global_id()));
}
CatalogItem::MaterializedView(mv) => {
materialized_views_to_drop
.push((mv.cluster_id, mv.global_id()));
}
CatalogItem::View(view) => {
views_to_drop.push((*id, view.clone()))
}
CatalogItem::ContinualTask(ct) => {
continual_tasks_to_drop.push((
*id,
ct.cluster_id,
ct.global_id(),
));
}
CatalogItem::Secret(_) => {
secrets_to_drop.push(*id);
}
CatalogItem::Connection(Connection { details, .. }) => {
match details {
ConnectionDetails::Ssh { .. } => {
secrets_to_drop.push(*id);
}
ConnectionDetails::AwsPrivatelink(_) => {
vpc_endpoints_to_drop.push(*id);
}
_ => (),
}
}
_ => (),
}
}
catalog::DropObjectInfo::Cluster(id) => {
clusters_to_drop.push(*id);
}
catalog::DropObjectInfo::ClusterReplica((
cluster_id,
replica_id,
_reason,
)) => {
cluster_replicas_to_drop.push((*cluster_id, *replica_id));
}
_ => (),
}
}
}
catalog::Op::ResetSystemConfiguration { name }
| catalog::Op::UpdateSystemConfiguration { name, .. } => {
update_tracing_config |= vars::is_tracing_var(name);
update_compute_config |= self
.catalog
.state()
.system_config()
.is_compute_config_var(name);
update_storage_config |= self
.catalog
.state()
.system_config()
.is_storage_config_var(name);
update_pg_timestamp_oracle_config |=
vars::is_pg_timestamp_oracle_config_var(name);
update_metrics_retention |= name == vars::METRICS_RETENTION.name();
update_secrets_caching_config |= vars::is_secrets_caching_var(name);
update_cluster_scheduling_config |= vars::is_cluster_scheduling_var(name);
update_arrangement_exert_proportionality |=
name == vars::ARRANGEMENT_EXERT_PROPORTIONALITY.name();
update_http_config |= vars::is_http_config_var(name);
}
catalog::Op::ResetAllSystemConfiguration => {
update_tracing_config = true;
update_compute_config = true;
update_storage_config = true;
update_pg_timestamp_oracle_config = true;
update_metrics_retention = true;
update_secrets_caching_config = true;
update_cluster_scheduling_config = true;
update_arrangement_exert_proportionality = true;
update_http_config = true;
}
catalog::Op::RenameItem { id, .. } => {
let item = self.catalog().get_entry(id);
let is_webhook_source = item
.source()
.map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
.unwrap_or(false);
if is_webhook_source {
webhook_sources_to_restart.insert(*id);
}
}
catalog::Op::RenameSchema {
database_spec,
schema_spec,
..
} => {
let schema = self.catalog().get_schema(
database_spec,
schema_spec,
conn_id.unwrap_or(&SYSTEM_CONN_ID),
);
let webhook_sources = schema.item_ids().filter(|id| {
let item = self.catalog().get_entry(id);
item.source()
.map(|s| matches!(s.data_source, DataSourceDesc::Webhook { .. }))
.unwrap_or(false)
});
webhook_sources_to_restart.extend(webhook_sources);
}
catalog::Op::CreateCluster { id, .. } => {
clusters_to_create.push(*id);
}
catalog::Op::CreateClusterReplica {
cluster_id,
name,
config,
..
} => {
cluster_replicas_to_create.push((
*cluster_id,
name.clone(),
config.location.num_processes(),
));
}
_ => (),
}
}
let collections_to_drop: BTreeSet<GlobalId> = sources_to_drop
.iter()
.map(|(_, gid)| *gid)
.chain(table_gids_to_drop.iter().copied())
.chain(storage_sink_gids_to_drop.iter().copied())
.chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
.chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid))
.chain(views_to_drop.iter().map(|(_id, view)| view.global_id()))
.collect();
for (sink_id, sink) in &self.active_compute_sinks {
let cluster_id = sink.cluster_id();
let conn_id = &sink.connection_id();
if let Some(id) = sink
.depends_on()
.iter()
.find(|id| collections_to_drop.contains(id))
{
let entry = self.catalog().get_entry_by_global_id(id);
let name = self
.catalog()
.resolve_full_name(entry.name(), Some(conn_id))
.to_string();
compute_sinks_to_drop.insert(
*sink_id,
ActiveComputeSinkRetireReason::DependencyDropped(format!(
"relation {}",
name.quoted()
)),
);
} else if clusters_to_drop.contains(&cluster_id) {
let name = self.catalog().get_cluster(cluster_id).name();
compute_sinks_to_drop.insert(
*sink_id,
ActiveComputeSinkRetireReason::DependencyDropped(format!(
"cluster {}",
name.quoted()
)),
);
}
}
for (uuid, pending_peek) in &self.pending_peeks {
if let Some(id) = pending_peek
.depends_on
.iter()
.find(|id| collections_to_drop.contains(id))
{
let entry = self.catalog().get_entry_by_global_id(id);
let name = self
.catalog()
.resolve_full_name(entry.name(), Some(&pending_peek.conn_id));
peeks_to_drop.push((
format!("relation {}", name.to_string().quoted()),
uuid.clone(),
));
} else if clusters_to_drop.contains(&pending_peek.cluster_id) {
let name = self.catalog().get_cluster(pending_peek.cluster_id).name();
peeks_to_drop.push((format!("cluster {}", name.quoted()), uuid.clone()));
}
}
let storage_ids_to_drop = sources_to_drop
.iter()
.map(|(_, gid)| *gid)
.chain(storage_sink_gids_to_drop.iter().copied())
.chain(table_gids_to_drop.iter().copied())
.chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid));
let compute_ids_to_drop = indexes_to_drop
.iter()
.copied()
.chain(materialized_views_to_drop.iter().copied())
.chain(
continual_tasks_to_drop
.iter()
.map(|(_, cluster_id, gid)| (*cluster_id, *gid)),
);
let collection_id_bundle = self.build_collection_id_bundle(
storage_ids_to_drop,
compute_ids_to_drop,
clusters_to_drop.clone(),
);
let timeline_associations: BTreeMap<_, _> = self
.partition_ids_by_timeline_context(&collection_id_bundle)
.filter_map(|(context, bundle)| {
let TimelineContext::TimelineDependent(timeline) = context else {
return None;
};
let TimelineState { read_holds, .. } = self
.global_timelines
.get(&timeline)
.expect("all timeslines have a timestamp oracle");
let empty = read_holds.id_bundle().difference(&bundle).is_empty();
Some((timeline, (empty, bundle)))
})
.collect();
self.validate_resource_limits(&ops, conn_id.unwrap_or(&SYSTEM_CONN_ID))?;
let oracle_write_ts = self.get_local_write_ts().await.timestamp;
let Coordinator {
catalog,
active_conns,
controller,
cluster_replica_statuses,
..
} = self;
let catalog = Arc::make_mut(catalog);
let conn = conn_id.map(|id| active_conns.get(id).expect("connection must exist"));
let TransactionResult {
mut builtin_table_updates,
audit_events,
} = catalog
.transact(Some(&mut *controller.storage), oracle_write_ts, conn, ops)
.await?;
for (cluster_id, replica_id) in &cluster_replicas_to_drop {
let replica_statuses =
cluster_replica_statuses.remove_cluster_replica_statuses(cluster_id, replica_id);
for (process_id, status) in replica_statuses {
let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
*replica_id,
process_id,
&status,
-1,
);
let builtin_table_update = catalog
.state()
.resolve_builtin_table_update(builtin_table_update);
builtin_table_updates.push(builtin_table_update);
}
}
for cluster_id in &clusters_to_drop {
let cluster_statuses = cluster_replica_statuses.remove_cluster_statuses(cluster_id);
for (replica_id, replica_statuses) in cluster_statuses {
for (process_id, status) in replica_statuses {
let builtin_table_update = catalog
.state()
.pack_cluster_replica_status_update(replica_id, process_id, &status, -1);
let builtin_table_update = catalog
.state()
.resolve_builtin_table_update(builtin_table_update);
builtin_table_updates.push(builtin_table_update);
}
}
}
for cluster_id in clusters_to_create {
cluster_replica_statuses.initialize_cluster_statuses(cluster_id);
for (replica_id, replica_statuses) in
cluster_replica_statuses.get_cluster_statuses(cluster_id)
{
for (process_id, status) in replica_statuses {
let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
*replica_id,
*process_id,
status,
1,
);
let builtin_table_update = catalog
.state()
.resolve_builtin_table_update(builtin_table_update);
builtin_table_updates.push(builtin_table_update);
}
}
}
let now = to_datetime((catalog.config().now)());
for (cluster_id, replica_name, num_processes) in cluster_replicas_to_create {
let replica_id = catalog
.resolve_replica_in_cluster(&cluster_id, &replica_name)
.expect("just created")
.replica_id();
cluster_replica_statuses.initialize_cluster_replica_statuses(
cluster_id,
replica_id,
num_processes,
now,
);
for (process_id, status) in
cluster_replica_statuses.get_cluster_replica_statuses(cluster_id, replica_id)
{
let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
replica_id,
*process_id,
status,
1,
);
let builtin_table_update = catalog
.state()
.resolve_builtin_table_update(builtin_table_update);
builtin_table_updates.push(builtin_table_update);
}
}
let (builtin_update_notify, _) = self
.builtin_table_update()
.execute(builtin_table_updates)
.await;
let _: () = async {
if !timeline_associations.is_empty() {
for (timeline, (should_be_empty, id_bundle)) in timeline_associations {
let became_empty =
self.remove_resources_associated_with_timeline(timeline, id_bundle);
assert_eq!(should_be_empty, became_empty, "emptiness did not match!");
}
}
if !table_gids_to_drop.is_empty() {
let ts = self.get_local_write_ts().await;
self.drop_tables(table_gids_to_drop, ts.timestamp);
}
if !sources_to_drop.is_empty() {
self.drop_sources(sources_to_drop);
}
if !webhook_sources_to_restart.is_empty() {
self.restart_webhook_sources(webhook_sources_to_restart);
}
if !storage_sink_gids_to_drop.is_empty() {
self.drop_storage_sinks(storage_sink_gids_to_drop);
}
if !compute_sinks_to_drop.is_empty() {
self.retire_compute_sinks(compute_sinks_to_drop).await;
}
if !peeks_to_drop.is_empty() {
for (dropped_name, uuid) in peeks_to_drop {
if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
let cancel_reason = PeekResponse::Error(format!(
"query could not complete because {dropped_name} was dropped"
));
self.controller
.compute
.cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
.unwrap_or_terminate("unable to cancel peek");
self.retire_execution(
StatementEndedExecutionReason::Canceled,
pending_peek.ctx_extra,
);
}
}
}
if !indexes_to_drop.is_empty() {
self.drop_indexes(indexes_to_drop);
}
if !materialized_views_to_drop.is_empty() {
self.drop_materialized_views(materialized_views_to_drop);
}
if !continual_tasks_to_drop.is_empty() {
self.drop_continual_tasks(continual_tasks_to_drop);
}
if !vpc_endpoints_to_drop.is_empty() {
self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
}
if !cluster_replicas_to_drop.is_empty() {
fail::fail_point!("after_catalog_drop_replica");
for (cluster_id, replica_id) in cluster_replicas_to_drop {
self.drop_replica(cluster_id, replica_id);
}
}
if !clusters_to_drop.is_empty() {
for cluster_id in clusters_to_drop {
self.controller.drop_cluster(cluster_id);
}
}
task::spawn(|| "drop_replication_slots_and_secrets", {
let ssh_tunnel_manager = self.connection_context().ssh_tunnel_manager.clone();
let secrets_controller = Arc::clone(&self.secrets_controller);
let secrets_reader = Arc::clone(self.secrets_reader());
let storage_config = self.controller.storage.config().clone();
async move {
for (connection, replication_slot_name) in replication_slots_to_drop {
tracing::info!(?replication_slot_name, "dropping replication slot");
let result: Result<(), anyhow::Error> = Retry::default()
.max_duration(Duration::from_secs(60))
.retry_async(|_state| async {
let config = connection
.config(&secrets_reader, &storage_config, InTask::No)
.await
.map_err(|e| {
anyhow::anyhow!(
"error creating Postgres client for \
dropping acquired slots: {}",
e.display_with_causes()
)
})?;
let should_wait = match connection.flavor {
PostgresFlavor::Vanilla => true,
PostgresFlavor::Yugabyte => false,
};
mz_postgres_util::drop_replication_slots(
&ssh_tunnel_manager,
config.clone(),
&[(&replication_slot_name, should_wait)],
)
.await?;
Ok(())
})
.await;
if let Err(err) = result {
tracing::warn!(
?replication_slot_name,
?err,
"failed to drop replication slot"
);
}
}
fail_point!("drop_secrets");
for secret in secrets_to_drop {
if let Err(e) = secrets_controller.delete(secret).await {
warn!("Dropping secrets has encountered an error: {}", e);
}
}
}
});
if update_compute_config {
self.update_compute_config();
}
if update_storage_config {
self.update_storage_config();
}
if update_pg_timestamp_oracle_config {
self.update_pg_timestamp_oracle_config();
}
if update_metrics_retention {
self.update_metrics_retention();
}
if update_tracing_config {
self.update_tracing_config();
}
if update_secrets_caching_config {
self.update_secrets_caching_config();
}
if update_cluster_scheduling_config {
self.update_cluster_scheduling_config();
}
if update_arrangement_exert_proportionality {
self.update_arrangement_exert_proportionality();
}
if update_http_config {
self.update_http_config();
}
}
.instrument(info_span!("coord::catalog_transact_with::finalize"))
.await;
let conn = conn_id.and_then(|id| self.active_conns.get(id));
if let Some(segment_client) = &self.segment_client {
for VersionedEvent::V1(event) in audit_events {
let event_type = format!(
"{} {}",
event.object_type.as_title_case(),
event.event_type.as_title_case()
);
segment_client.environment_track(
&self.catalog().config().environment_id,
event_type,
json!({ "details": event.details.as_json() }),
EventDetails {
user_id: conn
.and_then(|c| c.user().external_metadata.as_ref())
.map(|m| m.user_id),
application_name: conn.map(|c| c.application_name()),
..Default::default()
},
);
}
}
mz_ore::soft_assert_eq_no_log!(
self.check_consistency(),
Ok(()),
"coordinator inconsistency detected"
);
Ok(builtin_update_notify)
}
fn drop_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
if let Some(Some(ReplicaMetadata { metrics })) =
self.transient_replica_metadata.insert(replica_id, None)
{
let mut updates = vec![];
if let Some(metrics) = metrics {
let retractions = self
.catalog()
.state()
.pack_replica_metric_updates(replica_id, &metrics, -1);
let retractions = self
.catalog()
.state()
.resolve_builtin_table_updates(retractions);
updates.extend(retractions);
}
self.builtin_table_update().background(updates);
}
self.drop_introspection_subscribes(replica_id);
self.controller
.drop_replica(cluster_id, replica_id)
.expect("dropping replica must not fail");
}
fn drop_sources(&mut self, sources: Vec<(CatalogItemId, GlobalId)>) {
for (item_id, _gid) in &sources {
self.active_webhooks.remove(item_id);
}
let storage_metadata = self.catalog.state().storage_metadata();
let source_gids = sources.into_iter().map(|(_id, gid)| gid).collect();
self.controller
.storage
.drop_sources(storage_metadata, source_gids)
.unwrap_or_terminate("cannot fail to drop sources");
}
fn drop_tables(&mut self, table_gids: Vec<GlobalId>, ts: Timestamp) {
let storage_metadata = self.catalog.state().storage_metadata();
self.controller
.storage
.drop_tables(storage_metadata, table_gids, ts)
.unwrap_or_terminate("cannot fail to drop tables");
}
fn restart_webhook_sources(&mut self, sources: impl IntoIterator<Item = CatalogItemId>) {
for id in sources {
self.active_webhooks.remove(&id);
}
}
#[must_use]
pub async fn drop_compute_sink(&mut self, sink_id: GlobalId) -> Option<ActiveComputeSink> {
self.drop_compute_sinks([sink_id]).await.remove(&sink_id)
}
#[must_use]
pub async fn drop_compute_sinks(
&mut self,
sink_ids: impl IntoIterator<Item = GlobalId>,
) -> BTreeMap<GlobalId, ActiveComputeSink> {
let mut by_id = BTreeMap::new();
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
for sink_id in sink_ids {
let sink = match self.remove_active_compute_sink(sink_id).await {
None => {
tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
continue;
}
Some(sink) => sink,
};
by_cluster
.entry(sink.cluster_id())
.or_default()
.push(sink_id);
by_id.insert(sink_id, sink);
}
for (cluster_id, ids) in by_cluster {
let compute = &mut self.controller.compute;
if compute.instance_exists(cluster_id) {
compute
.drop_collections(cluster_id, ids)
.unwrap_or_terminate("cannot fail to drop collections");
}
}
by_id
}
pub async fn retire_compute_sinks(
&mut self,
mut reasons: BTreeMap<GlobalId, ActiveComputeSinkRetireReason>,
) {
let sink_ids = reasons.keys().cloned();
for (id, sink) in self.drop_compute_sinks(sink_ids).await {
let reason = reasons
.remove(&id)
.expect("all returned IDs are in `reasons`");
sink.retire(reason);
}
}
pub async fn drop_reconfiguration_replicas(
&mut self,
cluster_ids: BTreeSet<ClusterId>,
) -> Result<(), AdapterError> {
let pending_cluster_ops: Vec<Op> = cluster_ids
.iter()
.map(|c| {
self.catalog()
.get_cluster(c.clone())
.replicas()
.filter_map(|r| match r.config.location {
ReplicaLocation::Managed(ref l) if l.pending => {
Some(DropObjectInfo::ClusterReplica((
c.clone(),
r.replica_id,
ReplicaCreateDropReason::Manual,
)))
}
_ => None,
})
.collect::<Vec<DropObjectInfo>>()
})
.filter_map(|pending_replica_drop_ops_by_cluster| {
match pending_replica_drop_ops_by_cluster.len() {
0 => None,
_ => Some(Op::DropObjects(pending_replica_drop_ops_by_cluster)),
}
})
.collect();
if !pending_cluster_ops.is_empty() {
self.catalog_transact(None, pending_cluster_ops).await?;
}
Ok(())
}
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn cancel_compute_sinks_for_conn(&mut self, conn_id: &ConnectionId) {
self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Canceled)
.await
}
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn cancel_cluster_reconfigurations_for_conn(
&mut self,
conn_id: &ConnectionId,
) {
self.retire_cluster_reconfigurations_for_conn(conn_id).await
}
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn retire_compute_sinks_for_conn(
&mut self,
conn_id: &ConnectionId,
reason: ActiveComputeSinkRetireReason,
) {
let drop_sinks = self
.active_conns
.get_mut(conn_id)
.expect("must exist for active session")
.drop_sinks
.iter()
.map(|sink_id| (*sink_id, reason.clone()))
.collect();
self.retire_compute_sinks(drop_sinks).await;
}
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn retire_cluster_reconfigurations_for_conn(
&mut self,
conn_id: &ConnectionId,
) {
let reconfiguring_clusters = self
.active_conns
.get(conn_id)
.expect("must exist for active session")
.pending_cluster_alters
.clone();
self.drop_reconfiguration_replicas(reconfiguring_clusters)
.await
.unwrap_or_terminate("cannot fail to drop reconfiguration replicas");
self.active_conns
.get_mut(conn_id)
.expect("must exist for active session")
.pending_cluster_alters
.clear();
}
pub(crate) fn drop_storage_sinks(&mut self, sink_gids: Vec<GlobalId>) {
self.controller
.storage
.drop_sinks(sink_gids)
.unwrap_or_terminate("cannot fail to drop sinks");
}
pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>) {
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
for (cluster_id, gid) in indexes {
by_cluster.entry(cluster_id).or_default().push(gid);
}
for (cluster_id, gids) in by_cluster {
let compute = &mut self.controller.compute;
if compute.instance_exists(cluster_id) {
compute
.drop_collections(cluster_id, gids)
.unwrap_or_terminate("cannot fail to drop collections");
}
}
}
fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>) {
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
let mut mv_gids = Vec::new();
for (cluster_id, gid) in mviews {
by_cluster.entry(cluster_id).or_default().push(gid);
mv_gids.push(gid);
}
for (cluster_id, ids) in by_cluster {
let compute = &mut self.controller.compute;
if compute.instance_exists(cluster_id) {
compute
.drop_collections(cluster_id, ids)
.unwrap_or_terminate("cannot fail to drop collections");
}
}
let storage_metadata = self.catalog.state().storage_metadata();
self.controller
.storage
.drop_sources(storage_metadata, mv_gids)
.unwrap_or_terminate("cannot fail to drop sources");
}
fn drop_continual_tasks(&mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>) {
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
let mut source_ids = Vec::new();
for (item_id, cluster_id, gid) in cts {
by_cluster.entry(cluster_id).or_default().push(gid);
source_ids.push((item_id, gid));
}
for (cluster_id, ids) in by_cluster {
let compute = &mut self.controller.compute;
if compute.instance_exists(cluster_id) {
compute
.drop_collections(cluster_id, ids)
.unwrap_or_terminate("cannot fail to drop collections");
}
}
self.drop_sources(source_ids)
}
fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
.as_ref()
.ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))
.expect("vpc endpoints should only be dropped in CLOUD, where `cloud_resource_controller` is `Some`"));
task::spawn(
|| "drop_vpc_endpoints",
async move {
for vpc_endpoint in vpc_endpoints {
let _ = Retry::default()
.max_duration(Duration::from_secs(60))
.retry_async(|_state| async {
fail_point!("drop_vpc_endpoint", |r| {
Err(anyhow::anyhow!("Fail point error {:?}", r))
});
match cloud_resource_controller
.delete_vpc_endpoint(vpc_endpoint)
.await
{
Ok(_) => Ok(()),
Err(e) => {
warn!("Dropping VPC Endpoints has encountered an error: {}", e);
Err(e)
}
}
})
.await;
}
}
.instrument(info_span!(
"coord::catalog_transact_inner::drop_vpc_endpoints"
)),
);
}
pub(crate) async fn drop_temp_items(&mut self, conn_id: &ConnectionId) {
let temp_items = self.catalog().state().get_temp_items(conn_id).collect();
let all_items = self.catalog().object_dependents(&temp_items, conn_id);
if all_items.is_empty() {
return;
}
let op = Op::DropObjects(
all_items
.into_iter()
.map(DropObjectInfo::manual_drop_from_object_id)
.collect(),
);
self.catalog_transact_conn(Some(conn_id), vec![op])
.await
.expect("unable to drop temporary items for conn_id");
}
fn update_cluster_scheduling_config(&self) {
let config = flags::orchestrator_scheduling_config(self.catalog.system_config());
self.controller
.update_orchestrator_scheduling_config(config);
}
fn update_secrets_caching_config(&self) {
let config = flags::caching_config(self.catalog.system_config());
self.caching_secrets_reader.set_policy(config);
}
fn update_tracing_config(&self) {
let tracing = flags::tracing_config(self.catalog().system_config());
tracing.apply(&self.tracing_handle);
}
fn update_compute_config(&mut self) {
let config_params = flags::compute_config(self.catalog().system_config());
self.controller.compute.update_configuration(config_params);
}
fn update_storage_config(&mut self) {
let config_params = flags::storage_config(self.catalog().system_config());
self.controller.storage.update_parameters(config_params);
}
fn update_pg_timestamp_oracle_config(&self) {
let config_params = flags::pg_timstamp_oracle_config(self.catalog().system_config());
if let Some(config) = self.pg_timestamp_oracle_config.as_ref() {
config_params.apply(config)
}
}
fn update_metrics_retention(&mut self) {
let duration = self.catalog().system_config().metrics_retention();
let policy = ReadPolicy::lag_writes_by(
Timestamp::new(u64::try_from(duration.as_millis()).unwrap_or_else(|_e| {
tracing::error!("Absurd metrics retention duration: {duration:?}.");
u64::MAX
})),
SINCE_GRANULARITY,
);
let storage_policies = self
.catalog()
.entries()
.filter(|entry| {
entry.item().is_retained_metrics_object()
&& entry.item().is_compute_object_on_cluster().is_none()
})
.map(|entry| (entry.id(), policy.clone()))
.collect::<Vec<_>>();
let compute_policies = self
.catalog()
.entries()
.filter_map(|entry| {
if let (true, Some(cluster_id)) = (
entry.item().is_retained_metrics_object(),
entry.item().is_compute_object_on_cluster(),
) {
Some((cluster_id, entry.id(), policy.clone()))
} else {
None
}
})
.collect::<Vec<_>>();
self.update_storage_read_policies(storage_policies);
self.update_compute_read_policies(compute_policies);
}
fn update_arrangement_exert_proportionality(&mut self) {
let prop = self
.catalog()
.system_config()
.arrangement_exert_proportionality();
self.controller
.compute
.set_arrangement_exert_proportionality(prop);
}
fn update_http_config(&mut self) {
let webhook_request_limit = self
.catalog()
.system_config()
.webhook_concurrent_request_limit();
self.webhook_concurrency_limit
.set_limit(webhook_request_limit);
}
pub(crate) async fn create_storage_export(
&mut self,
id: GlobalId,
sink: &Sink,
) -> Result<(), AdapterError> {
self.controller.storage.check_exists(sink.from)?;
let status_id = self
.catalog()
.resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SINK_STATUS_HISTORY);
let status_id = Some(self.catalog().get_entry(&status_id).latest_global_id());
let id_bundle = crate::CollectionIdBundle {
storage_ids: btreeset! {sink.from},
compute_ids: btreemap! {},
};
let read_holds = self.acquire_read_holds(&id_bundle);
let as_of = self.least_valid_read(&read_holds);
let storage_sink_from_entry = self.catalog().get_entry_by_global_id(&sink.from);
let storage_sink_desc = mz_storage_types::sinks::StorageSinkDesc {
from: sink.from,
from_desc: storage_sink_from_entry
.desc(&self.catalog().resolve_full_name(
storage_sink_from_entry.name(),
storage_sink_from_entry.conn_id(),
))
.expect("indexes can only be built on items with descs")
.into_owned(),
connection: sink
.connection
.clone()
.into_inline_connection(self.catalog().state()),
partition_strategy: sink.partition_strategy.clone(),
envelope: sink.envelope,
as_of,
with_snapshot: sink.with_snapshot,
version: sink.version,
status_id,
from_storage_metadata: (),
};
let res = self
.controller
.storage
.create_exports(vec![(
id,
ExportDescription {
sink: storage_sink_desc,
instance_id: sink.cluster_id,
},
)])
.await;
drop(read_holds);
Ok(res?)
}
fn validate_resource_limits(
&self,
ops: &Vec<catalog::Op>,
conn_id: &ConnectionId,
) -> Result<(), AdapterError> {
let mut new_kafka_connections = 0;
let mut new_postgres_connections = 0;
let mut new_mysql_connections = 0;
let mut new_aws_privatelink_connections = 0;
let mut new_tables = 0;
let mut new_sources = 0;
let mut new_sinks = 0;
let mut new_materialized_views = 0;
let mut new_clusters = 0;
let mut new_replicas_per_cluster = BTreeMap::new();
let mut new_credit_consumption_rate = Numeric::zero();
let mut new_databases = 0;
let mut new_schemas_per_database = BTreeMap::new();
let mut new_objects_per_schema = BTreeMap::new();
let mut new_secrets = 0;
let mut new_roles = 0;
let mut new_continual_tasks = 0;
let mut new_network_policies = 0;
for op in ops {
match op {
Op::CreateDatabase { .. } => {
new_databases += 1;
}
Op::CreateSchema { database_id, .. } => {
if let ResolvedDatabaseSpecifier::Id(database_id) = database_id {
*new_schemas_per_database.entry(database_id).or_insert(0) += 1;
}
}
Op::CreateRole { .. } => {
new_roles += 1;
}
Op::CreateNetworkPolicy { .. } => {
new_network_policies += 1;
}
Op::CreateCluster { .. } => {
new_clusters += 1;
}
Op::CreateClusterReplica {
cluster_id, config, ..
} => {
*new_replicas_per_cluster.entry(*cluster_id).or_insert(0) += 1;
if let ReplicaLocation::Managed(location) = &config.location {
let replica_allocation = self
.catalog()
.cluster_replica_sizes()
.0
.get(location.size_for_billing())
.expect("location size is validated against the cluster replica sizes");
new_credit_consumption_rate += replica_allocation.credits_per_hour
}
}
Op::CreateItem { name, item, .. } => {
*new_objects_per_schema
.entry((
name.qualifiers.database_spec.clone(),
name.qualifiers.schema_spec.clone(),
))
.or_insert(0) += 1;
match item {
CatalogItem::Connection(connection) => match connection.details {
ConnectionDetails::Kafka(_) => new_kafka_connections += 1,
ConnectionDetails::Postgres(_) => new_postgres_connections += 1,
ConnectionDetails::MySql(_) => new_mysql_connections += 1,
ConnectionDetails::AwsPrivatelink(_) => {
new_aws_privatelink_connections += 1
}
ConnectionDetails::Csr(_)
| ConnectionDetails::Ssh { .. }
| ConnectionDetails::Aws(_) => {}
},
CatalogItem::Table(_) => {
new_tables += 1;
}
CatalogItem::Source(source) => {
new_sources += source.user_controllable_persist_shard_count()
}
CatalogItem::Sink(_) => new_sinks += 1,
CatalogItem::MaterializedView(_) => {
new_materialized_views += 1;
}
CatalogItem::Secret(_) => {
new_secrets += 1;
}
CatalogItem::ContinualTask(_) => {
new_continual_tasks += 1;
}
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_) => {}
}
}
Op::DropObjects(drop_object_infos) => {
for drop_object_info in drop_object_infos {
match drop_object_info {
DropObjectInfo::Cluster(_) => {
new_clusters -= 1;
}
DropObjectInfo::ClusterReplica((cluster_id, replica_id, _reason)) => {
*new_replicas_per_cluster.entry(*cluster_id).or_insert(0) -= 1;
let cluster =
self.catalog().get_cluster_replica(*cluster_id, *replica_id);
if let ReplicaLocation::Managed(location) = &cluster.config.location
{
let replica_allocation = self
.catalog()
.cluster_replica_sizes()
.0
.get(location.size_for_billing())
.expect(
"location size is validated against the cluster replica sizes",
);
new_credit_consumption_rate -=
replica_allocation.credits_per_hour
}
}
DropObjectInfo::Database(_) => {
new_databases -= 1;
}
DropObjectInfo::Schema((database_spec, _)) => {
if let ResolvedDatabaseSpecifier::Id(database_id) = database_spec {
*new_schemas_per_database.entry(database_id).or_insert(0) -= 1;
}
}
DropObjectInfo::Role(_) => {
new_roles -= 1;
}
DropObjectInfo::NetworkPolicy(_) => {
new_network_policies -= 1;
}
DropObjectInfo::Item(id) => {
let entry = self.catalog().get_entry(id);
*new_objects_per_schema
.entry((
entry.name().qualifiers.database_spec.clone(),
entry.name().qualifiers.schema_spec.clone(),
))
.or_insert(0) -= 1;
match entry.item() {
CatalogItem::Connection(connection) => match connection.details
{
ConnectionDetails::AwsPrivatelink(_) => {
new_aws_privatelink_connections -= 1;
}
_ => (),
},
CatalogItem::Table(_) => {
new_tables -= 1;
}
CatalogItem::Source(source) => {
new_sources -=
source.user_controllable_persist_shard_count()
}
CatalogItem::Sink(_) => new_sinks -= 1,
CatalogItem::MaterializedView(_) => {
new_materialized_views -= 1;
}
CatalogItem::Secret(_) => {
new_secrets -= 1;
}
CatalogItem::ContinualTask(_) => {
new_continual_tasks -= 1;
}
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_) => {}
}
}
}
}
}
Op::UpdateItem {
name: _,
id,
to_item,
} => match to_item {
CatalogItem::Source(source) => {
let current_source = self
.catalog()
.get_entry(id)
.source()
.expect("source update is for source item");
new_sources += source.user_controllable_persist_shard_count()
- current_source.user_controllable_persist_shard_count();
}
CatalogItem::Connection(_)
| CatalogItem::Table(_)
| CatalogItem::Sink(_)
| CatalogItem::MaterializedView(_)
| CatalogItem::Secret(_)
| CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::ContinualTask(_) => {}
},
Op::AlterRole { .. }
| Op::AlterRetainHistory { .. }
| Op::AlterNetworkPolicy { .. }
| Op::UpdatePrivilege { .. }
| Op::UpdateDefaultPrivilege { .. }
| Op::GrantRole { .. }
| Op::RenameCluster { .. }
| Op::RenameClusterReplica { .. }
| Op::RenameItem { .. }
| Op::RenameSchema { .. }
| Op::UpdateOwner { .. }
| Op::RevokeRole { .. }
| Op::UpdateClusterConfig { .. }
| Op::UpdateClusterReplicaConfig { .. }
| Op::UpdateSourceReferences { .. }
| Op::UpdateSystemConfiguration { .. }
| Op::ResetSystemConfiguration { .. }
| Op::ResetAllSystemConfiguration { .. }
| Op::Comment { .. }
| Op::WeirdStorageUsageUpdates { .. }
| Op::TransactionDryRun => {}
}
}
let mut current_aws_privatelink_connections = 0;
let mut current_postgres_connections = 0;
let mut current_mysql_connections = 0;
let mut current_kafka_connections = 0;
for c in self.catalog().user_connections() {
let connection = c
.connection()
.expect("`user_connections()` only returns connection objects");
match connection.details {
ConnectionDetails::AwsPrivatelink(_) => current_aws_privatelink_connections += 1,
ConnectionDetails::Postgres(_) => current_postgres_connections += 1,
ConnectionDetails::MySql(_) => current_mysql_connections += 1,
ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
ConnectionDetails::Csr(_)
| ConnectionDetails::Ssh { .. }
| ConnectionDetails::Aws(_) => {}
}
}
self.validate_resource_limit(
current_kafka_connections,
new_kafka_connections,
SystemVars::max_kafka_connections,
"Kafka Connection",
MAX_KAFKA_CONNECTIONS.name(),
)?;
self.validate_resource_limit(
current_postgres_connections,
new_postgres_connections,
SystemVars::max_postgres_connections,
"PostgreSQL Connection",
MAX_POSTGRES_CONNECTIONS.name(),
)?;
self.validate_resource_limit(
current_mysql_connections,
new_mysql_connections,
SystemVars::max_mysql_connections,
"MySQL Connection",
MAX_MYSQL_CONNECTIONS.name(),
)?;
self.validate_resource_limit(
current_aws_privatelink_connections,
new_aws_privatelink_connections,
SystemVars::max_aws_privatelink_connections,
"AWS PrivateLink Connection",
MAX_AWS_PRIVATELINK_CONNECTIONS.name(),
)?;
self.validate_resource_limit(
self.catalog().user_tables().count(),
new_tables,
SystemVars::max_tables,
"table",
MAX_TABLES.name(),
)?;
let current_sources: usize = self
.catalog()
.user_sources()
.filter_map(|source| source.source())
.map(|source| source.user_controllable_persist_shard_count())
.sum::<i64>()
.try_into()
.expect("non-negative sum of sources");
self.validate_resource_limit(
current_sources,
new_sources,
SystemVars::max_sources,
"source",
MAX_SOURCES.name(),
)?;
self.validate_resource_limit(
self.catalog().user_sinks().count(),
new_sinks,
SystemVars::max_sinks,
"sink",
MAX_SINKS.name(),
)?;
self.validate_resource_limit(
self.catalog().user_materialized_views().count(),
new_materialized_views,
SystemVars::max_materialized_views,
"materialized view",
MAX_MATERIALIZED_VIEWS.name(),
)?;
self.validate_resource_limit(
self.catalog().user_clusters().count(),
new_clusters,
SystemVars::max_clusters,
"cluster",
MAX_CLUSTERS.name(),
)?;
for (cluster_id, new_replicas) in new_replicas_per_cluster {
let current_amount = self
.catalog()
.try_get_cluster(cluster_id)
.map(|instance| instance.replicas().count())
.unwrap_or(0);
self.validate_resource_limit(
current_amount,
new_replicas,
SystemVars::max_replicas_per_cluster,
"cluster replica",
MAX_REPLICAS_PER_CLUSTER.name(),
)?;
}
let current_credit_consumption_rate = self
.catalog()
.user_cluster_replicas()
.filter_map(|replica| match &replica.config.location {
ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
ReplicaLocation::Unmanaged(_) => None,
})
.map(|size| {
self.catalog()
.cluster_replica_sizes()
.0
.get(size)
.expect("location size is validated against the cluster replica sizes")
.credits_per_hour
})
.sum();
self.validate_resource_limit_numeric(
current_credit_consumption_rate,
new_credit_consumption_rate,
SystemVars::max_credit_consumption_rate,
"cluster replica",
MAX_CREDIT_CONSUMPTION_RATE.name(),
)?;
self.validate_resource_limit(
self.catalog().databases().count(),
new_databases,
SystemVars::max_databases,
"database",
MAX_DATABASES.name(),
)?;
for (database_id, new_schemas) in new_schemas_per_database {
self.validate_resource_limit(
self.catalog().get_database(database_id).schemas_by_id.len(),
new_schemas,
SystemVars::max_schemas_per_database,
"schema",
MAX_SCHEMAS_PER_DATABASE.name(),
)?;
}
for ((database_spec, schema_spec), new_objects) in new_objects_per_schema {
self.validate_resource_limit(
self.catalog()
.get_schema(&database_spec, &schema_spec, conn_id)
.items
.len(),
new_objects,
SystemVars::max_objects_per_schema,
"object",
MAX_OBJECTS_PER_SCHEMA.name(),
)?;
}
self.validate_resource_limit(
self.catalog().user_secrets().count(),
new_secrets,
SystemVars::max_secrets,
"secret",
MAX_SECRETS.name(),
)?;
self.validate_resource_limit(
self.catalog().user_roles().count(),
new_roles,
SystemVars::max_roles,
"role",
MAX_ROLES.name(),
)?;
self.validate_resource_limit(
self.catalog().user_continual_tasks().count(),
new_continual_tasks,
SystemVars::max_continual_tasks,
"continual_task",
MAX_CONTINUAL_TASKS.name(),
)?;
self.validate_resource_limit(
self.catalog().user_continual_tasks().count(),
new_network_policies,
SystemVars::max_network_policies,
"network_policy",
MAX_NETWORK_POLICIES.name(),
)?;
Ok(())
}
pub(crate) fn validate_resource_limit<F>(
&self,
current_amount: usize,
new_instances: i64,
resource_limit: F,
resource_type: &str,
limit_name: &str,
) -> Result<(), AdapterError>
where
F: Fn(&SystemVars) -> u32,
{
if new_instances <= 0 {
return Ok(());
}
let limit: i64 = resource_limit(self.catalog().system_config()).into();
let current_amount: Option<i64> = current_amount.try_into().ok();
let desired =
current_amount.and_then(|current_amount| current_amount.checked_add(new_instances));
let exceeds_limit = if let Some(desired) = desired {
desired > limit
} else {
true
};
let desired = desired
.map(|desired| desired.to_string())
.unwrap_or_else(|| format!("more than {}", i64::MAX));
let current = current_amount
.map(|current| current.to_string())
.unwrap_or_else(|| format!("more than {}", i64::MAX));
if exceeds_limit {
Err(AdapterError::ResourceExhaustion {
resource_type: resource_type.to_string(),
limit_name: limit_name.to_string(),
desired,
limit: limit.to_string(),
current,
})
} else {
Ok(())
}
}
fn validate_resource_limit_numeric<F>(
&self,
current_amount: Numeric,
new_amount: Numeric,
resource_limit: F,
resource_type: &str,
limit_name: &str,
) -> Result<(), AdapterError>
where
F: Fn(&SystemVars) -> Numeric,
{
if new_amount <= Numeric::zero() {
return Ok(());
}
let limit = resource_limit(self.catalog().system_config());
let desired = current_amount + new_amount;
if desired > limit {
Err(AdapterError::ResourceExhaustion {
resource_type: resource_type.to_string(),
limit_name: limit_name.to_string(),
desired: desired.to_string(),
limit: limit.to_string(),
current: current_amount.to_string(),
})
} else {
Ok(())
}
}
}