1use std::collections::BTreeSet;
11
12use mz_cluster_client::ReplicaId;
13use mz_compute_types::ComputeInstanceId;
14use mz_repr::CatalogItemId;
15use mz_sql::rbac::UnauthorizedError;
16use mz_sql::session::user::RoleMetadata;
17
18use crate::AdapterError;
19use crate::catalog::Catalog;
20
21#[derive(Debug, Clone)]
34pub struct PlanValidity {
35 transient_revision: u64,
37 dependency_ids: BTreeSet<CatalogItemId>,
39 cluster_id: Option<ComputeInstanceId>,
40 replica_id: Option<ReplicaId>,
41 role_metadata: RoleMetadata,
42}
43
44impl PlanValidity {
45 pub fn new(
46 transient_revision: u64,
47 dependency_ids: BTreeSet<CatalogItemId>,
48 cluster_id: Option<ComputeInstanceId>,
49 replica_id: Option<ReplicaId>,
50 role_metadata: RoleMetadata,
51 ) -> Self {
52 PlanValidity {
53 transient_revision,
54 dependency_ids,
55 cluster_id,
56 replica_id,
57 role_metadata,
58 }
59 }
60
61 pub fn extend_dependencies(&mut self, ids: impl Iterator<Item = CatalogItemId>) {
62 self.dependency_ids.extend(ids);
63 }
64
65 pub fn check(&mut self, catalog: &Catalog) -> Result<(), AdapterError> {
67 if self.transient_revision == catalog.transient_revision() {
68 return Ok(());
69 }
70 if let Some(cluster_id) = self.cluster_id {
73 let Some(cluster) = catalog.try_get_cluster(cluster_id) else {
74 return Err(AdapterError::ConcurrentDependencyDrop {
75 dependency_kind: "cluster",
76 dependency_id: cluster_id.to_string(),
77 });
78 };
79
80 if let Some(replica_id) = self.replica_id {
81 if cluster.replica(replica_id).is_none() {
82 return Err(AdapterError::ConcurrentDependencyDrop {
83 dependency_kind: "cluster replica",
84 dependency_id: format!("{replica_id} of cluster {cluster_id}"),
85 });
86 }
87 }
88 }
89 for id in self.dependency_ids.iter() {
94 if catalog.try_get_entry(id).is_none() {
95 return Err(AdapterError::ConcurrentDependencyDrop {
96 dependency_kind: "catalog item",
97 dependency_id: id.to_string(),
98 });
99 }
100 }
101 if catalog
102 .try_get_role(&self.role_metadata.current_role)
103 .is_none()
104 {
105 return Err(AdapterError::Unauthorized(
106 UnauthorizedError::ConcurrentRoleDrop(self.role_metadata.current_role.clone()),
107 ));
108 }
109 if catalog
110 .try_get_role(&self.role_metadata.session_role)
111 .is_none()
112 {
113 return Err(AdapterError::Unauthorized(
114 UnauthorizedError::ConcurrentRoleDrop(self.role_metadata.session_role.clone()),
115 ));
116 }
117
118 if catalog
119 .try_get_role(&self.role_metadata.authenticated_role)
120 .is_none()
121 {
122 return Err(AdapterError::Unauthorized(
123 UnauthorizedError::ConcurrentRoleDrop(
124 self.role_metadata.authenticated_role.clone(),
125 ),
126 ));
127 }
128 self.transient_revision = catalog.transient_revision();
129 Ok(())
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use std::collections::BTreeSet;
136
137 use mz_adapter_types::connection::ConnectionId;
138 use mz_auth::AuthenticatorKind;
139 use mz_cluster_client::ReplicaId;
140 use mz_controller_types::ClusterId;
141 use mz_ore::metrics::MetricsRegistry;
142 use mz_ore::{assert_contains, assert_ok};
143 use mz_repr::CatalogItemId;
144 use mz_repr::role_id::RoleId;
145 use mz_sql::catalog::RoleAttributesRaw;
146 use mz_sql::session::metadata::SessionMetadata;
147 use uuid::Uuid;
148
149 use crate::AdapterError;
150 use crate::catalog::{Catalog, Op};
151 use crate::coord::validity::PlanValidity;
152 use crate::metrics::Metrics;
153 use crate::session::{Session, SessionConfig};
154
155 #[mz_ore::test(tokio::test)]
156 #[cfg_attr(miri, ignore)] async fn test_plan_validity() {
158 Catalog::with_debug(|mut catalog| async move {
159 let conn_id = ConnectionId::Static(1);
160 let user = String::from("validity_user");
161 let role = "validity_role";
162 let metrics_registry = MetricsRegistry::new();
163 let metrics = Metrics::register_into(&metrics_registry);
164
165 let commit_ts = catalog.current_upper().await;
166 catalog
167 .transact(
168 None,
169 commit_ts,
170 None,
171 vec![Op::CreateRole {
172 name: role.into(),
173 attributes: RoleAttributesRaw::new(),
174 }],
175 )
176 .await
177 .expect("is ok");
178 let role = catalog.try_get_role_by_name(role).expect("must exist");
179 let mut session = Session::new(
181 &mz_build_info::DUMMY_BUILD_INFO,
182 SessionConfig {
183 conn_id,
184 uuid: Uuid::new_v4(),
185 user,
186 client_ip: None,
187 external_metadata_rx: None,
188 helm_chart_version: None,
189 authenticator_kind: AuthenticatorKind::None,
190 groups: None,
191 },
192 metrics.session_metrics(),
193 );
194 session.initialize_role_metadata(role.id);
195 let empty = PlanValidity::new(
196 catalog
198 .transient_revision()
199 .checked_sub(1)
200 .expect("must subtract"),
201 BTreeSet::new(),
202 None,
203 None,
204 session.role_metadata().clone(),
205 );
206 let some_system_cluster = catalog
207 .clusters()
208 .find(|c| matches!(c.id, ClusterId::System(_)))
209 .expect("must exist");
210
211 let tests: &[(
213 Box<dyn Fn(&mut PlanValidity)>,
214 Box<dyn Fn(Result<(), AdapterError>)>,
215 )] = &[
216 (Box::new(|_validity| {}), Box::new(|res| assert_ok!(res))),
217 (
218 Box::new(|validity| {
219 validity.cluster_id = Some(ClusterId::user(3).expect("3 is a valid ID"));
220 }),
221 Box::new(|res| {
222 assert_contains!(
223 res.expect_err("must err").to_string(),
224 "cluster 'u3' was dropped"
225 )
226 }),
227 ),
228 (
229 Box::new(|validity| {
230 validity.cluster_id = Some(some_system_cluster.id);
231 validity.replica_id = Some(ReplicaId::User(4));
232 }),
233 Box::new(|res| {
234 assert_contains!(
235 res.expect_err("must err").to_string(),
236 format!(
237 "cluster replica 'u4 of cluster {}' was dropped",
238 some_system_cluster.id
239 ),
240 )
241 }),
242 ),
243 (
244 Box::new(|validity| {
245 validity.extend_dependencies(vec![CatalogItemId::User(6)].into_iter());
246 }),
247 Box::new(|res| {
248 assert_contains!(
249 res.expect_err("must err").to_string(),
250 "catalog item 'u6' was dropped"
251 )
252 }),
253 ),
254 (
255 Box::new(|validity| {
256 validity.role_metadata.current_role = RoleId::User(5);
257 }),
258 Box::new(|res| {
259 assert_contains!(
260 res.expect_err("must err").to_string(),
261 "role u5 was concurrently dropped"
262 )
263 }),
264 ),
265 (
266 Box::new(|validity| {
267 validity.role_metadata.session_role = RoleId::User(5);
268 }),
269 Box::new(|res| {
270 assert_contains!(
271 res.expect_err("must err").to_string(),
272 "role u5 was concurrently dropped"
273 )
274 }),
275 ),
276 (
277 Box::new(|validity| {
278 validity.role_metadata.authenticated_role = RoleId::User(5);
279 }),
280 Box::new(|res| {
281 assert_contains!(
282 res.expect_err("must err").to_string(),
283 "role u5 was concurrently dropped"
284 )
285 }),
286 ),
287 ];
288 for (get_validity, check_res) in tests {
289 let mut validity = empty.clone();
290 get_validity(&mut validity);
291 let res = validity.check(&catalog);
292 check_res(res);
293 }
294 })
295 .await
296 }
297}