mz_adapter/coord/
catalog_serving.rs1use mz_expr::CollectionPlan;
22use mz_repr::GlobalId;
23use mz_repr::namespaces::is_system_schema;
24use mz_sql::catalog::SessionCatalog;
25use mz_sql::plan::{
26 ExplainPlanPlan, ExplainTimestampPlan, Explainee, ExplaineeStatement, Plan, SubscribeFrom,
27 SubscribePlan,
28};
29use smallvec::SmallVec;
30
31use crate::AdapterError;
32use crate::catalog::ConnCatalog;
33use crate::coord::TargetCluster;
34use crate::notice::AdapterNotice;
35use crate::session::Session;
36use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
37
38pub fn auto_run_on_catalog_server<'a, 's, 'p>(
41 catalog: &'a ConnCatalog<'a>,
42 session: &'s Session,
43 plan: &'p Plan,
44) -> TargetCluster {
45 let inspect_subscribe = |plan: &SubscribePlan| {
46 (
47 plan.from.depends_on(),
48 match &plan.from {
49 SubscribeFrom::Id(_) => false,
50 SubscribeFrom::Query { expr, desc: _ } => expr.could_run_expensive_function(),
51 },
52 )
53 };
54
55 let (depends_on, could_run_expensive_function) = match plan {
56 Plan::Select(plan) => (
57 plan.source.depends_on(),
58 plan.source.could_run_expensive_function(),
59 ),
60 Plan::ShowColumns(plan) => (
61 plan.select_plan.source.depends_on(),
62 plan.select_plan.source.could_run_expensive_function(),
63 ),
64 Plan::Subscribe(plan) => inspect_subscribe(plan),
65 Plan::ExplainPlan(ExplainPlanPlan {
66 explainee: Explainee::Statement(ExplaineeStatement::Select { plan, .. }),
67 ..
68 }) => (
69 plan.source.depends_on(),
70 plan.source.could_run_expensive_function(),
71 ),
72 Plan::ExplainPlan(ExplainPlanPlan {
73 explainee: Explainee::Statement(ExplaineeStatement::Subscribe { plan, .. }),
74 ..
75 }) => inspect_subscribe(plan),
76 Plan::ExplainTimestamp(ExplainTimestampPlan { raw_plan, .. }) => (
77 raw_plan.depends_on(),
78 raw_plan.could_run_expensive_function(),
79 ),
80 Plan::CreateConnection(_)
81 | Plan::CreateDatabase(_)
82 | Plan::CreateSchema(_)
83 | Plan::CreateRole(_)
84 | Plan::CreateNetworkPolicy(_)
85 | Plan::CreateCluster(_)
86 | Plan::CreateClusterReplica(_)
87 | Plan::CreateContinualTask(_)
88 | Plan::CreateSource(_)
89 | Plan::CreateSources(_)
90 | Plan::CreateSecret(_)
91 | Plan::CreateSink(_)
92 | Plan::CreateTable(_)
93 | Plan::CreateView(_)
94 | Plan::CreateMaterializedView(_)
95 | Plan::CreateIndex(_)
96 | Plan::CreateType(_)
97 | Plan::Comment(_)
98 | Plan::DiscardTemp
99 | Plan::DiscardAll
100 | Plan::DropObjects(_)
101 | Plan::DropOwned(_)
102 | Plan::EmptyQuery
103 | Plan::ShowAllVariables
104 | Plan::ShowCreate(_)
105 | Plan::ShowVariable(_)
106 | Plan::InspectShard(_)
107 | Plan::SetVariable(_)
108 | Plan::ResetVariable(_)
109 | Plan::SetTransaction(_)
110 | Plan::StartTransaction(_)
111 | Plan::CommitTransaction(_)
112 | Plan::AbortTransaction(_)
113 | Plan::CopyFrom(_)
114 | Plan::CopyTo(_)
115 | Plan::ExplainPlan(ExplainPlanPlan {
116 explainee:
117 Explainee::Statement(
118 ExplaineeStatement::CreateView { .. }
121 | ExplaineeStatement::CreateMaterializedView { .. }
122 | ExplaineeStatement::CreateIndex { .. },
123 ),
124 ..
125 })
126 | Plan::ExplainPlan(ExplainPlanPlan { explainee: _, .. })
127 | Plan::ExplainPushdown(_)
128 | Plan::ExplainSinkSchema(_)
129 | Plan::Insert(_)
130 | Plan::AlterNetworkPolicy(_)
131 | Plan::AlterNoop(_)
132 | Plan::AlterClusterRename(_)
133 | Plan::AlterClusterSwap(_)
134 | Plan::AlterClusterReplicaRename(_)
135 | Plan::AlterCluster(_)
136 | Plan::AlterConnection(_)
137 | Plan::AlterSource(_)
138 | Plan::AlterSetCluster(_)
139 | Plan::AlterItemRename(_)
140 | Plan::AlterRetainHistory(_)
141 | Plan::AlterSourceTimestampInterval(_)
142 | Plan::AlterSchemaRename(_)
143 | Plan::AlterSchemaSwap(_)
144 | Plan::AlterSecret(_)
145 | Plan::AlterSink(_)
146 | Plan::AlterSystemSet(_)
147 | Plan::AlterSystemReset(_)
148 | Plan::AlterSystemResetAll(_)
149 | Plan::AlterRole(_)
150 | Plan::AlterOwner(_)
151 | Plan::AlterTableAddColumn(_)
152 | Plan::AlterMaterializedViewApplyReplacement(_)
153 | Plan::Declare(_)
154 | Plan::Fetch(_)
155 | Plan::Close(_)
156 | Plan::ReadThenWrite(_)
157 | Plan::Prepare(_)
158 | Plan::Execute(_)
159 | Plan::Deallocate(_)
160 | Plan::Raise(_)
161 | Plan::GrantRole(_)
162 | Plan::RevokeRole(_)
163 | Plan::GrantPrivileges(_)
164 | Plan::RevokePrivileges(_)
165 | Plan::AlterDefaultPrivileges(_)
166 | Plan::ReassignOwned(_)
167 | Plan::ValidateConnection(_)
168 | Plan::SideEffectingFunc(_) => return TargetCluster::Active,
169 };
170
171 if !session.vars().auto_route_catalog_queries() {
173 return TargetCluster::Active;
174 }
175
176 if session.vars().cluster_replica().is_some() {
178 return TargetCluster::Active;
179 }
180
181 let mut depends_on = depends_on
183 .into_iter()
184 .map(|gid| catalog.resolve_item_id(&gid))
185 .peekable();
186 let has_dependencies = depends_on.peek().is_some();
187
188 let valid_dependencies = depends_on.all(|id| {
191 let entry = catalog.state().get_entry(&id);
192 let schema = entry.name().qualifiers.schema_spec;
193
194 let system_only = catalog.state().is_system_schema_specifier(schema);
195 let non_replica = catalog.state().introspection_dependencies(id).is_empty();
196
197 system_only && non_replica
198 });
199
200 if (has_dependencies && valid_dependencies)
201 || (!has_dependencies && !could_run_expensive_function)
202 {
203 let intros_cluster = catalog
204 .state()
205 .resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER);
206 tracing::debug!("Running on '{}' cluster", MZ_CATALOG_SERVER_CLUSTER.name);
207
208 if intros_cluster.name != session.vars().cluster() {
210 session.add_notice(AdapterNotice::AutoRunOnCatalogServerCluster);
211 }
212 TargetCluster::CatalogServer
213 } else {
214 TargetCluster::Active
215 }
216}
217
218pub fn check_cluster_restrictions(
221 cluster: &str,
222 catalog: &impl SessionCatalog,
223 plan: &Plan,
224) -> Result<(), AdapterError> {
225 if cluster != MZ_CATALOG_SERVER_CLUSTER.name {
227 return Ok(());
228 }
229
230 let depends_on: Box<dyn Iterator<Item = GlobalId>> = match plan {
238 Plan::ReadThenWrite(plan) => Box::new(plan.selection.depends_on().into_iter()),
239 Plan::Subscribe(plan) => match plan.from {
240 SubscribeFrom::Id(id) => Box::new(std::iter::once(id)),
241 SubscribeFrom::Query { ref expr, .. } => Box::new(expr.depends_on().into_iter()),
242 },
243 Plan::Select(plan) => Box::new(plan.source.depends_on().into_iter()),
244 _ => return Ok(()),
245 };
246
247 let unallowed_dependents: SmallVec<[String; 2]> = depends_on
249 .filter_map(|id| {
250 let item = catalog.get_item_by_global_id(&id);
251 let full_name = catalog.resolve_full_name(item.name());
252
253 if !is_system_schema(&full_name.schema) {
254 Some(full_name.to_string())
255 } else {
256 None
257 }
258 })
259 .collect();
260
261 if !unallowed_dependents.is_empty() {
263 Err(AdapterError::UnallowedOnCluster {
264 depends_on: unallowed_dependents,
265 cluster: MZ_CATALOG_SERVER_CLUSTER.name.to_string(),
266 })
267 } else {
268 Ok(())
269 }
270}