mz_adapter/coord/
catalog_serving.rs1use mz_expr::CollectionPlan;
22use mz_repr::GlobalId;
23use mz_repr::namespaces::is_system_schema;
24use mz_sql::catalog::SessionCatalog;
25use mz_sql::plan::{
26 ExplainPlanPlan, ExplainTimestampPlan, Explainee, ExplaineeStatement, Plan, SubscribeFrom,
27};
28use smallvec::SmallVec;
29
30use crate::AdapterError;
31use crate::catalog::ConnCatalog;
32use crate::coord::TargetCluster;
33use crate::notice::AdapterNotice;
34use crate::session::Session;
35use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
36
37pub fn auto_run_on_catalog_server<'a, 's, 'p>(
40 catalog: &'a ConnCatalog<'a>,
41 session: &'s Session,
42 plan: &'p Plan,
43) -> TargetCluster {
44 let (depends_on, could_run_expensive_function) = match plan {
45 Plan::Select(plan) => (
46 plan.source.depends_on(),
47 plan.source.could_run_expensive_function(),
48 ),
49 Plan::ShowColumns(plan) => (
50 plan.select_plan.source.depends_on(),
51 plan.select_plan.source.could_run_expensive_function(),
52 ),
53 Plan::Subscribe(plan) => (
54 plan.from.depends_on(),
55 match &plan.from {
56 SubscribeFrom::Id(_) => false,
57 SubscribeFrom::Query { expr, desc: _ } => expr.could_run_expensive_function(),
58 },
59 ),
60 Plan::ExplainPlan(ExplainPlanPlan {
61 explainee: Explainee::Statement(ExplaineeStatement::Select { plan, .. }),
62 ..
63 }) => (
64 plan.source.depends_on(),
65 plan.source.could_run_expensive_function(),
66 ),
67 Plan::ExplainTimestamp(ExplainTimestampPlan { raw_plan, .. }) => (
68 raw_plan.depends_on(),
69 raw_plan.could_run_expensive_function(),
70 ),
71 Plan::CreateConnection(_)
72 | Plan::CreateDatabase(_)
73 | Plan::CreateSchema(_)
74 | Plan::CreateRole(_)
75 | Plan::CreateNetworkPolicy(_)
76 | Plan::CreateCluster(_)
77 | Plan::CreateClusterReplica(_)
78 | Plan::CreateContinualTask(_)
79 | Plan::CreateSource(_)
80 | Plan::CreateSources(_)
81 | Plan::CreateSecret(_)
82 | Plan::CreateSink(_)
83 | Plan::CreateTable(_)
84 | Plan::CreateView(_)
85 | Plan::CreateMaterializedView(_)
86 | Plan::CreateIndex(_)
87 | Plan::CreateType(_)
88 | Plan::Comment(_)
89 | Plan::DiscardTemp
90 | Plan::DiscardAll
91 | Plan::DropObjects(_)
92 | Plan::DropOwned(_)
93 | Plan::EmptyQuery
94 | Plan::ShowAllVariables
95 | Plan::ShowCreate(_)
96 | Plan::ShowVariable(_)
97 | Plan::InspectShard(_)
98 | Plan::SetVariable(_)
99 | Plan::ResetVariable(_)
100 | Plan::SetTransaction(_)
101 | Plan::StartTransaction(_)
102 | Plan::CommitTransaction(_)
103 | Plan::AbortTransaction(_)
104 | Plan::CopyFrom(_)
105 | Plan::CopyTo(_)
106 | Plan::ExplainPlan(_)
107 | Plan::ExplainPushdown(_)
108 | Plan::ExplainSinkSchema(_)
109 | Plan::Insert(_)
110 | Plan::AlterNetworkPolicy(_)
111 | Plan::AlterNoop(_)
112 | Plan::AlterClusterRename(_)
113 | Plan::AlterClusterSwap(_)
114 | Plan::AlterClusterReplicaRename(_)
115 | Plan::AlterCluster(_)
116 | Plan::AlterConnection(_)
117 | Plan::AlterSource(_)
118 | Plan::AlterSetCluster(_)
119 | Plan::AlterItemRename(_)
120 | Plan::AlterRetainHistory(_)
121 | Plan::AlterSchemaRename(_)
122 | Plan::AlterSchemaSwap(_)
123 | Plan::AlterSecret(_)
124 | Plan::AlterSink(_)
125 | Plan::AlterSystemSet(_)
126 | Plan::AlterSystemReset(_)
127 | Plan::AlterSystemResetAll(_)
128 | Plan::AlterRole(_)
129 | Plan::AlterOwner(_)
130 | Plan::AlterTableAddColumn(_)
131 | Plan::AlterMaterializedViewApplyReplacement(_)
132 | Plan::Declare(_)
133 | Plan::Fetch(_)
134 | Plan::Close(_)
135 | Plan::ReadThenWrite(_)
136 | Plan::Prepare(_)
137 | Plan::Execute(_)
138 | Plan::Deallocate(_)
139 | Plan::Raise(_)
140 | Plan::GrantRole(_)
141 | Plan::RevokeRole(_)
142 | Plan::GrantPrivileges(_)
143 | Plan::RevokePrivileges(_)
144 | Plan::AlterDefaultPrivileges(_)
145 | Plan::ReassignOwned(_)
146 | Plan::ValidateConnection(_)
147 | Plan::SideEffectingFunc(_) => return TargetCluster::Active,
148 };
149
150 if !session.vars().auto_route_catalog_queries() {
152 return TargetCluster::Active;
153 }
154
155 if session.vars().cluster_replica().is_some() {
157 return TargetCluster::Active;
158 }
159
160 let mut depends_on = depends_on
162 .into_iter()
163 .map(|gid| catalog.resolve_item_id(&gid))
164 .peekable();
165 let has_dependencies = depends_on.peek().is_some();
166
167 let valid_dependencies = depends_on.all(|id| {
170 let entry = catalog.state().get_entry(&id);
171 let schema = entry.name().qualifiers.schema_spec;
172
173 let system_only = catalog.state().is_system_schema_specifier(schema);
174 let non_replica = catalog.state().introspection_dependencies(id).is_empty();
175
176 system_only && non_replica
177 });
178
179 if (has_dependencies && valid_dependencies)
180 || (!has_dependencies && !could_run_expensive_function)
181 {
182 let intros_cluster = catalog
183 .state()
184 .resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER);
185 tracing::debug!("Running on '{}' cluster", MZ_CATALOG_SERVER_CLUSTER.name);
186
187 if intros_cluster.name != session.vars().cluster() {
189 session.add_notice(AdapterNotice::AutoRunOnCatalogServerCluster);
190 }
191 TargetCluster::CatalogServer
192 } else {
193 TargetCluster::Active
194 }
195}
196
197pub fn check_cluster_restrictions(
200 cluster: &str,
201 catalog: &impl SessionCatalog,
202 plan: &Plan,
203) -> Result<(), AdapterError> {
204 if cluster != MZ_CATALOG_SERVER_CLUSTER.name {
206 return Ok(());
207 }
208
209 let depends_on: Box<dyn Iterator<Item = GlobalId>> = match plan {
217 Plan::ReadThenWrite(plan) => Box::new(plan.selection.depends_on().into_iter()),
218 Plan::Subscribe(plan) => match plan.from {
219 SubscribeFrom::Id(id) => Box::new(std::iter::once(id)),
220 SubscribeFrom::Query { ref expr, .. } => Box::new(expr.depends_on().into_iter()),
221 },
222 Plan::Select(plan) => Box::new(plan.source.depends_on().into_iter()),
223 _ => return Ok(()),
224 };
225
226 let unallowed_dependents: SmallVec<[String; 2]> = depends_on
228 .filter_map(|id| {
229 let item = catalog.get_item_by_global_id(&id);
230 let full_name = catalog.resolve_full_name(item.name());
231
232 if !is_system_schema(&full_name.schema) {
233 Some(full_name.to_string())
234 } else {
235 None
236 }
237 })
238 .collect();
239
240 if !unallowed_dependents.is_empty() {
242 Err(AdapterError::UnallowedOnCluster {
243 depends_on: unallowed_dependents,
244 cluster: MZ_CATALOG_SERVER_CLUSTER.name.to_string(),
245 })
246 } else {
247 Ok(())
248 }
249}