mz_adapter/coord/validity.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use mz_cluster_client::ReplicaId;
13use mz_compute_types::ComputeInstanceId;
14use mz_repr::CatalogItemId;
15use mz_sql::rbac::UnauthorizedError;
16use mz_sql::session::user::RoleMetadata;
17
18use crate::AdapterError;
19use crate::catalog::Catalog;
20
21// The inner fields of PlanValidity are not pub to prevent callers from using them in SQL logic.
22// Callers are responsible for tracking their own needed IDs explicitly and not using
23// PlanValidity as a logic sidecar.
24
25/// A struct to hold information about the validity of plans and if they should be abandoned after
26/// doing work off of the Coordinator thread.
27#[derive(Debug, Clone)]
28pub enum PlanValidity {
29 /// Requires a specific transient revision.
30 RequireRevision { required_revision: u64 },
31 /// Checks various catalog IDs. Uses the transient revision only as a cache marker.
32 Checks {
33 /// The most recent revision at which this plan was verified as valid.
34 transient_revision: u64,
35 /// Objects on which the plan depends.
36 dependency_ids: BTreeSet<CatalogItemId>,
37 cluster_id: Option<ComputeInstanceId>,
38 replica_id: Option<ReplicaId>,
39 role_metadata: RoleMetadata,
40 },
41}
42
43impl PlanValidity {
44 pub fn new(
45 transient_revision: u64,
46 dependency_ids: BTreeSet<CatalogItemId>,
47 cluster_id: Option<ComputeInstanceId>,
48 replica_id: Option<ReplicaId>,
49 role_metadata: RoleMetadata,
50 ) -> Self {
51 PlanValidity::Checks {
52 transient_revision,
53 dependency_ids,
54 cluster_id,
55 replica_id,
56 role_metadata,
57 }
58 }
59
60 /// WARNING: This is currently a no-op and `check` will always succeed.
61 ///
62 /// Sets the required `transient_revision` of the catalog. Should only be used by serialized
63 /// statements (and thus should never fail for users), but here as an internal failsafe against
64 /// programming errors.
65 pub fn require_transient_revision(required_revision: u64) -> Self {
66 PlanValidity::RequireRevision { required_revision }
67 }
68
69 /// Panics if not called on a Checks variant.
70 pub fn extend_dependencies(&mut self, ids: impl Iterator<Item = CatalogItemId>) {
71 let Self::Checks { dependency_ids, .. } = self else {
72 unreachable!();
73 };
74 dependency_ids.extend(ids);
75 }
76
77 /// Returns an error if the current catalog no longer has all dependencies.
78 pub fn check(&mut self, catalog: &Catalog) -> Result<(), AdapterError> {
79 match self {
80 PlanValidity::RequireRevision { required_revision } => {
81 if catalog.transient_revision() != *required_revision {
82 // TODO: We would like to use this as a programming check that no catalog
83 // revisions were made as a double-check that all DDLs are serialized. However,
84 // since only *most* DDLs are serialized (see `must_serialize_ddl()` for those
85 // that aren't), it is possible for two DDLs to run concurrently and the catalog
86 // revision to increment during the off-thread work from this DDL. For example,
87 // a CREATE VIEW could be off-thread optimizing while an ALTER SECRET runs and
88 // increments the revision. For now we assume this check is not strictly needed
89 // because we have thought medium hard about the DDLs that do not require
90 // serialization, so they do not pose a correctness problem when executing
91 // concurrently with any other DDL.
92 //
93 // If we want to remove the need to even think at all about DDL ordering
94 // correctness we would need to refactor all calls to catalog_transact to
95 // require passing the serialized DDL lock. Statements would be responsible for
96 // getting the lock at the latest possible correct time. ALTER SECRET for
97 // example could acquire the lock after interacting with k8s, but most other
98 // DDLs would get the lock for their entire sequencing duration.
99
100 //soft_panic_or_log!("another DDL executed while this assumed it was serial");
101 }
102 Ok(())
103 }
104 PlanValidity::Checks {
105 transient_revision,
106 dependency_ids,
107 cluster_id,
108 replica_id,
109 role_metadata,
110 } => {
111 if *transient_revision == catalog.transient_revision() {
112 return Ok(());
113 }
114 // If the transient revision changed, we have to recheck. If successful, bump the revision
115 // so next check uses the above fast path.
116 if let Some(cluster_id) = cluster_id {
117 let Some(cluster) = catalog.try_get_cluster(*cluster_id) else {
118 return Err(AdapterError::ChangedPlan(format!(
119 "cluster {} was removed",
120 cluster_id
121 )));
122 };
123
124 if let Some(replica_id) = replica_id {
125 if cluster.replica(*replica_id).is_none() {
126 return Err(AdapterError::ChangedPlan(format!(
127 "replica {} of cluster {} was removed",
128 replica_id, cluster_id
129 )));
130 }
131 }
132 }
133 // It is sufficient to check that all the dependency_ids still exist because we assume:
134 // - Ids do not mutate.
135 // - Ids are not reused.
136 // - If an id was dropped, this will detect it and error.
137 for id in dependency_ids.iter() {
138 if catalog.try_get_entry(id).is_none() {
139 return Err(AdapterError::ChangedPlan(format!(
140 "dependency was removed: {id}",
141 )));
142 }
143 }
144 if catalog.try_get_role(&role_metadata.current_role).is_none() {
145 return Err(AdapterError::Unauthorized(
146 UnauthorizedError::ConcurrentRoleDrop(role_metadata.current_role.clone()),
147 ));
148 }
149 if catalog.try_get_role(&role_metadata.session_role).is_none() {
150 return Err(AdapterError::Unauthorized(
151 UnauthorizedError::ConcurrentRoleDrop(role_metadata.session_role.clone()),
152 ));
153 }
154
155 if catalog
156 .try_get_role(&role_metadata.authenticated_role)
157 .is_none()
158 {
159 return Err(AdapterError::Unauthorized(
160 UnauthorizedError::ConcurrentRoleDrop(
161 role_metadata.authenticated_role.clone(),
162 ),
163 ));
164 }
165 *transient_revision = catalog.transient_revision();
166 Ok(())
167 }
168 }
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use std::collections::BTreeSet;
175
176 use mz_adapter_types::connection::ConnectionId;
177 use mz_auth::AuthenticatorKind;
178 use mz_cluster_client::ReplicaId;
179 use mz_controller_types::ClusterId;
180 use mz_ore::metrics::MetricsRegistry;
181 use mz_ore::{assert_contains, assert_ok};
182 use mz_repr::CatalogItemId;
183 use mz_repr::role_id::RoleId;
184 use mz_sql::catalog::RoleAttributesRaw;
185 use mz_sql::session::metadata::SessionMetadata;
186 use uuid::Uuid;
187
188 use crate::AdapterError;
189 use crate::catalog::{Catalog, Op};
190 use crate::coord::validity::PlanValidity;
191 use crate::metrics::Metrics;
192 use crate::session::{Session, SessionConfig};
193
194 #[mz_ore::test(tokio::test)]
195 #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
196 async fn test_plan_validity() {
197 Catalog::with_debug(|mut catalog| async move {
198 let conn_id = ConnectionId::Static(1);
199 let user = String::from("validity_user");
200 let role = "validity_role";
201 let metrics_registry = MetricsRegistry::new();
202 let metrics = Metrics::register_into(&metrics_registry);
203
204 let commit_ts = catalog.current_upper().await;
205 catalog
206 .transact(
207 None,
208 commit_ts,
209 None,
210 vec![Op::CreateRole {
211 name: role.into(),
212 attributes: RoleAttributesRaw::new(),
213 }],
214 )
215 .await
216 .expect("is ok");
217 let role = catalog.try_get_role_by_name(role).expect("must exist");
218 // Can't use a dummy session because we need a valid role for the validity check.
219 let mut session = Session::new(
220 &mz_build_info::DUMMY_BUILD_INFO,
221 SessionConfig {
222 conn_id,
223 uuid: Uuid::new_v4(),
224 user,
225 client_ip: None,
226 external_metadata_rx: None,
227 helm_chart_version: None,
228 authenticator_kind: AuthenticatorKind::None,
229 groups: None,
230 },
231 metrics.session_metrics(),
232 );
233 session.initialize_role_metadata(role.id);
234 let empty = PlanValidity::new(
235 // Set the transient rev 1 down so the check logic runs.
236 catalog
237 .transient_revision()
238 .checked_sub(1)
239 .expect("must subtract"),
240 BTreeSet::new(),
241 None,
242 None,
243 session.role_metadata().clone(),
244 );
245 let some_system_cluster = catalog
246 .clusters()
247 .find(|c| matches!(c.id, ClusterId::System(_)))
248 .expect("must exist");
249
250 // Plan generation and result assertion closures.
251 let tests: &[(
252 Box<dyn Fn(&mut PlanValidity)>,
253 Box<dyn Fn(Result<(), AdapterError>)>,
254 )] = &[
255 (Box::new(|_validity| {}), Box::new(|res| assert_ok!(res))),
256 (
257 Box::new(|validity| {
258 let PlanValidity::Checks { cluster_id, .. } = validity else {
259 panic!();
260 };
261 *cluster_id = Some(ClusterId::user(3).expect("3 is a valid ID"));
262 }),
263 Box::new(|res| {
264 assert_contains!(
265 res.expect_err("must err").to_string(),
266 "cluster u3 was removed"
267 )
268 }),
269 ),
270 (
271 Box::new(|validity| {
272 let PlanValidity::Checks {
273 cluster_id,
274 replica_id,
275 ..
276 } = validity
277 else {
278 panic!();
279 };
280 *cluster_id = Some(some_system_cluster.id);
281 *replica_id = Some(ReplicaId::User(4));
282 }),
283 Box::new(|res| {
284 assert_contains!(
285 res.expect_err("must err").to_string(),
286 format!(
287 "replica u4 of cluster {} was removed",
288 some_system_cluster.id
289 ),
290 )
291 }),
292 ),
293 (
294 Box::new(|validity| {
295 validity.extend_dependencies(vec![CatalogItemId::User(6)].into_iter());
296 }),
297 Box::new(|res| {
298 assert_contains!(
299 res.expect_err("must err").to_string(),
300 "dependency was removed: u6"
301 )
302 }),
303 ),
304 (
305 Box::new(|validity| {
306 let PlanValidity::Checks { role_metadata, .. } = validity else {
307 panic!();
308 };
309 role_metadata.current_role = RoleId::User(5);
310 }),
311 Box::new(|res| {
312 assert_contains!(
313 res.expect_err("must err").to_string(),
314 "role u5 was concurrently dropped"
315 )
316 }),
317 ),
318 (
319 Box::new(|validity| {
320 let PlanValidity::Checks { role_metadata, .. } = validity else {
321 panic!();
322 };
323 role_metadata.session_role = RoleId::User(5);
324 }),
325 Box::new(|res| {
326 assert_contains!(
327 res.expect_err("must err").to_string(),
328 "role u5 was concurrently dropped"
329 )
330 }),
331 ),
332 (
333 Box::new(|validity| {
334 let PlanValidity::Checks { role_metadata, .. } = validity else {
335 panic!();
336 };
337 role_metadata.authenticated_role = RoleId::User(5);
338 }),
339 Box::new(|res| {
340 assert_contains!(
341 res.expect_err("must err").to_string(),
342 "role u5 was concurrently dropped"
343 )
344 }),
345 ),
346 (
347 Box::new(|validity| {
348 *validity = PlanValidity::require_transient_revision(100);
349 }),
350 Box::new(|res| {
351 // This check is a no-op.
352 assert!(res.is_ok());
353 }),
354 ),
355 (
356 Box::new(|validity| {
357 *validity =
358 PlanValidity::require_transient_revision(catalog.transient_revision());
359 }),
360 Box::new(|res| {
361 assert!(res.is_ok());
362 }),
363 ),
364 ];
365 for (get_validity, check_res) in tests {
366 let mut validity = empty.clone();
367 get_validity(&mut validity);
368 let res = validity.check(&catalog);
369 check_res(res);
370 }
371 })
372 .await
373 }
374}