use mz_expr::CollectionPlan;
use mz_repr::namespaces::is_system_schema;
use mz_repr::GlobalId;
use mz_sql::catalog::SessionCatalog;
use mz_sql::plan::{
ExplainPlanPlan, ExplainTimestampPlan, Explainee, ExplaineeStatement, Plan, SubscribeFrom,
};
use smallvec::SmallVec;
use crate::catalog::ConnCatalog;
use crate::coord::TargetCluster;
use crate::notice::AdapterNotice;
use crate::session::Session;
use crate::AdapterError;
use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
pub fn auto_run_on_catalog_server<'a, 's, 'p>(
catalog: &'a ConnCatalog<'a>,
session: &'s Session,
plan: &'p Plan,
) -> TargetCluster {
let (depends_on, could_run_expensive_function) = match plan {
Plan::Select(plan) => (
plan.source.depends_on(),
plan.source.could_run_expensive_function(),
),
Plan::ShowColumns(plan) => (
plan.select_plan.source.depends_on(),
plan.select_plan.source.could_run_expensive_function(),
),
Plan::Subscribe(plan) => (
plan.from.depends_on(),
match &plan.from {
SubscribeFrom::Id(_) => false,
SubscribeFrom::Query { expr, desc: _ } => expr.could_run_expensive_function(),
},
),
Plan::ExplainPlan(ExplainPlanPlan {
explainee: Explainee::Statement(ExplaineeStatement::Select { plan, .. }),
..
}) => (
plan.source.depends_on(),
plan.source.could_run_expensive_function(),
),
Plan::ExplainTimestamp(ExplainTimestampPlan { raw_plan, .. }) => (
raw_plan.depends_on(),
raw_plan.could_run_expensive_function(),
),
Plan::CreateConnection(_)
| Plan::CreateDatabase(_)
| Plan::CreateSchema(_)
| Plan::CreateRole(_)
| Plan::CreateNetworkPolicy(_)
| Plan::CreateCluster(_)
| Plan::CreateClusterReplica(_)
| Plan::CreateContinualTask(_)
| Plan::CreateSource(_)
| Plan::CreateSources(_)
| Plan::CreateSecret(_)
| Plan::CreateSink(_)
| Plan::CreateTable(_)
| Plan::CreateView(_)
| Plan::CreateMaterializedView(_)
| Plan::CreateIndex(_)
| Plan::CreateType(_)
| Plan::Comment(_)
| Plan::DiscardTemp
| Plan::DiscardAll
| Plan::DropObjects(_)
| Plan::DropOwned(_)
| Plan::EmptyQuery
| Plan::ShowAllVariables
| Plan::ShowCreate(_)
| Plan::ShowVariable(_)
| Plan::InspectShard(_)
| Plan::SetVariable(_)
| Plan::ResetVariable(_)
| Plan::SetTransaction(_)
| Plan::StartTransaction(_)
| Plan::CommitTransaction(_)
| Plan::AbortTransaction(_)
| Plan::CopyFrom(_)
| Plan::CopyTo(_)
| Plan::ExplainPlan(_)
| Plan::ExplainPushdown(_)
| Plan::ExplainSinkSchema(_)
| Plan::Insert(_)
| Plan::AlterNetworkPolicy(_)
| Plan::AlterNoop(_)
| Plan::AlterClusterRename(_)
| Plan::AlterClusterSwap(_)
| Plan::AlterClusterReplicaRename(_)
| Plan::AlterCluster(_)
| Plan::AlterConnection(_)
| Plan::AlterSource(_)
| Plan::AlterSetCluster(_)
| Plan::AlterItemRename(_)
| Plan::AlterRetainHistory(_)
| Plan::AlterSchemaRename(_)
| Plan::AlterSchemaSwap(_)
| Plan::AlterSecret(_)
| Plan::AlterSink(_)
| Plan::AlterSystemSet(_)
| Plan::AlterSystemReset(_)
| Plan::AlterSystemResetAll(_)
| Plan::AlterRole(_)
| Plan::AlterOwner(_)
| Plan::AlterTableAddColumn(_)
| Plan::Declare(_)
| Plan::Fetch(_)
| Plan::Close(_)
| Plan::ReadThenWrite(_)
| Plan::Prepare(_)
| Plan::Execute(_)
| Plan::Deallocate(_)
| Plan::Raise(_)
| Plan::GrantRole(_)
| Plan::RevokeRole(_)
| Plan::GrantPrivileges(_)
| Plan::RevokePrivileges(_)
| Plan::AlterDefaultPrivileges(_)
| Plan::ReassignOwned(_)
| Plan::ValidateConnection(_)
| Plan::SideEffectingFunc(_) => return TargetCluster::Active,
};
if !session.vars().auto_route_catalog_queries() {
return TargetCluster::Active;
}
if session.vars().cluster_replica().is_some() {
return TargetCluster::Active;
}
let mut depends_on = depends_on
.into_iter()
.map(|gid| catalog.resolve_item_id(&gid))
.peekable();
let has_dependencies = depends_on.peek().is_some();
let valid_dependencies = depends_on.all(|id| {
let entry = catalog.state().get_entry(&id);
let schema = entry.name().qualifiers.schema_spec;
let system_only = catalog.state().is_system_schema_specifier(schema);
let non_replica = catalog.state().introspection_dependencies(id).is_empty();
system_only && non_replica
});
if (has_dependencies && valid_dependencies)
|| (!has_dependencies && !could_run_expensive_function)
{
let intros_cluster = catalog
.state()
.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER);
tracing::debug!("Running on '{}' cluster", MZ_CATALOG_SERVER_CLUSTER.name);
if intros_cluster.name != session.vars().cluster() {
session.add_notice(AdapterNotice::AutoRunOnCatalogServerCluster);
}
TargetCluster::CatalogServer
} else {
TargetCluster::Active
}
}
pub fn check_cluster_restrictions(
cluster: &str,
catalog: &impl SessionCatalog,
plan: &Plan,
) -> Result<(), AdapterError> {
if cluster != MZ_CATALOG_SERVER_CLUSTER.name {
return Ok(());
}
let depends_on: Box<dyn Iterator<Item = GlobalId>> = match plan {
Plan::ReadThenWrite(plan) => Box::new(plan.selection.depends_on().into_iter()),
Plan::Subscribe(plan) => match plan.from {
SubscribeFrom::Id(id) => Box::new(std::iter::once(id)),
SubscribeFrom::Query { ref expr, .. } => Box::new(expr.depends_on().into_iter()),
},
Plan::Select(plan) => Box::new(plan.source.depends_on().into_iter()),
_ => return Ok(()),
};
let unallowed_dependents: SmallVec<[String; 2]> = depends_on
.filter_map(|id| {
let item = catalog.get_item_by_global_id(&id);
let full_name = catalog.resolve_full_name(item.name());
if !is_system_schema(&full_name.schema) {
Some(full_name.to_string())
} else {
None
}
})
.collect();
if !unallowed_dependents.is_empty() {
Err(AdapterError::UnallowedOnCluster {
depends_on: unallowed_dependents,
cluster: MZ_CATALOG_SERVER_CLUSTER.name.to_string(),
})
} else {
Ok(())
}
}