mz_adapter/coord/validity.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use mz_cluster_client::ReplicaId;
13use mz_compute_types::ComputeInstanceId;
14use mz_repr::CatalogItemId;
15use mz_sql::rbac::UnauthorizedError;
16use mz_sql::session::user::RoleMetadata;
17
18use crate::AdapterError;
19use crate::catalog::Catalog;
20
21// The inner fields of PlanValidity are not pub to prevent callers from using them in SQL logic.
22// Callers are responsible for tracking their own needed IDs explicitly and not using
23// PlanValidity as a logic sidecar.
24
25/// A struct to hold information about the validity of plans and if they should be abandoned after
26/// doing work off of the Coordinator thread.
27#[derive(Debug, Clone)]
28pub enum PlanValidity {
29 /// Requires a specific transient revision.
30 RequireRevision { required_revision: u64 },
31 /// Checks various catalog IDs. Uses the transient revision only as a cache marker.
32 Checks {
33 /// The most recent revision at which this plan was verified as valid.
34 transient_revision: u64,
35 /// Objects on which the plan depends.
36 dependency_ids: BTreeSet<CatalogItemId>,
37 cluster_id: Option<ComputeInstanceId>,
38 replica_id: Option<ReplicaId>,
39 role_metadata: RoleMetadata,
40 },
41}
42
43impl PlanValidity {
44 pub fn new(
45 transient_revision: u64,
46 dependency_ids: BTreeSet<CatalogItemId>,
47 cluster_id: Option<ComputeInstanceId>,
48 replica_id: Option<ReplicaId>,
49 role_metadata: RoleMetadata,
50 ) -> Self {
51 PlanValidity::Checks {
52 transient_revision,
53 dependency_ids,
54 cluster_id,
55 replica_id,
56 role_metadata,
57 }
58 }
59
60 /// WARNING: This is currently a no-op and `check` will always succeed.
61 ///
62 /// Sets the required `transient_revision` of the catalog. Should only be used by serialized
63 /// statements (and thus should never fail for users), but here as an internal failsafe against
64 /// programming errors.
65 pub fn require_transient_revision(required_revision: u64) -> Self {
66 PlanValidity::RequireRevision { required_revision }
67 }
68
69 /// Panics if not called on a Checks variant.
70 pub fn extend_dependencies(&mut self, ids: impl Iterator<Item = CatalogItemId>) {
71 let Self::Checks { dependency_ids, .. } = self else {
72 unreachable!();
73 };
74 dependency_ids.extend(ids);
75 }
76
77 /// Returns an error if the current catalog no longer has all dependencies.
78 pub fn check(&mut self, catalog: &Catalog) -> Result<(), AdapterError> {
79 match self {
80 PlanValidity::RequireRevision { required_revision } => {
81 if catalog.transient_revision() != *required_revision {
82 // TODO: We would like to use this as a programming check that no catalog
83 // revisions were made as a double-check that all DDLs are serialized. However,
84 // since only *most* DDLs are serialized (see `must_serialize_ddl()` for those
85 // that aren't), it is possible for two DDLs to run concurrently and the catalog
86 // revision to increment during the off-thread work from this DDL. For example,
87 // a CREATE VIEW could be off-thread optimizing while an ALTER SECRET runs and
88 // increments the revision. For now we assume this check is not strictly needed
89 // because we have thought medium hard about the DDLs that do not require
90 // serialization, so they do not pose a correctness problem when executing
91 // concurrently with any other DDL.
92 //
93 // If we want to remove the need to even think at all about DDL ordering
94 // correctness we would need to refactor all calls to catalog_transact to
95 // require passing the serialized DDL lock. Statements would be responsible for
96 // getting the lock at the latest possible correct time. ALTER SECRET for
97 // example could acquire the lock after interacting with k8s, but most other
98 // DDLs would get the lock for their entire sequencing duration.
99
100 //soft_panic_or_log!("another DDL executed while this assumed it was serial");
101 }
102 Ok(())
103 }
104 PlanValidity::Checks {
105 transient_revision,
106 dependency_ids,
107 cluster_id,
108 replica_id,
109 role_metadata,
110 } => {
111 if *transient_revision == catalog.transient_revision() {
112 return Ok(());
113 }
114 // If the transient revision changed, we have to recheck. If successful, bump the revision
115 // so next check uses the above fast path.
116 if let Some(cluster_id) = cluster_id {
117 let Some(cluster) = catalog.try_get_cluster(*cluster_id) else {
118 return Err(AdapterError::ChangedPlan(format!(
119 "cluster {} was removed",
120 cluster_id
121 )));
122 };
123
124 if let Some(replica_id) = replica_id {
125 if cluster.replica(*replica_id).is_none() {
126 return Err(AdapterError::ChangedPlan(format!(
127 "replica {} of cluster {} was removed",
128 replica_id, cluster_id
129 )));
130 }
131 }
132 }
133 // It is sufficient to check that all the dependency_ids still exist because we assume:
134 // - Ids do not mutate.
135 // - Ids are not reused.
136 // - If an id was dropped, this will detect it and error.
137 for id in dependency_ids.iter() {
138 if catalog.try_get_entry(id).is_none() {
139 return Err(AdapterError::ChangedPlan(format!(
140 "dependency was removed: {id}",
141 )));
142 }
143 }
144 if catalog.try_get_role(&role_metadata.current_role).is_none() {
145 return Err(AdapterError::Unauthorized(
146 UnauthorizedError::ConcurrentRoleDrop(role_metadata.current_role.clone()),
147 ));
148 }
149 if catalog.try_get_role(&role_metadata.session_role).is_none() {
150 return Err(AdapterError::Unauthorized(
151 UnauthorizedError::ConcurrentRoleDrop(role_metadata.session_role.clone()),
152 ));
153 }
154
155 if catalog
156 .try_get_role(&role_metadata.authenticated_role)
157 .is_none()
158 {
159 return Err(AdapterError::Unauthorized(
160 UnauthorizedError::ConcurrentRoleDrop(
161 role_metadata.authenticated_role.clone(),
162 ),
163 ));
164 }
165 *transient_revision = catalog.transient_revision();
166 Ok(())
167 }
168 }
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use std::collections::BTreeSet;
175
176 use mz_adapter_types::connection::ConnectionId;
177 use mz_cluster_client::ReplicaId;
178 use mz_controller_types::ClusterId;
179 use mz_ore::metrics::MetricsRegistry;
180 use mz_ore::{assert_contains, assert_ok};
181 use mz_repr::role_id::RoleId;
182 use mz_repr::{CatalogItemId, Timestamp};
183 use mz_sql::catalog::RoleAttributesRaw;
184 use mz_sql::session::metadata::SessionMetadata;
185 use uuid::Uuid;
186
187 use crate::AdapterError;
188 use crate::catalog::{Catalog, Op};
189 use crate::coord::validity::PlanValidity;
190 use crate::metrics::Metrics;
191 use crate::session::{Session, SessionConfig};
192
193 #[mz_ore::test(tokio::test)]
194 #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
195 async fn test_plan_validity() {
196 Catalog::with_debug(|mut catalog| async move {
197 let conn_id = ConnectionId::Static(1);
198 let user = String::from("validity_user");
199 let role = "validity_role";
200 let metrics_registry = MetricsRegistry::new();
201 let metrics = Metrics::register_into(&metrics_registry);
202
203 let commit_ts = catalog.current_upper().await;
204 catalog
205 .transact(
206 None,
207 commit_ts,
208 None,
209 vec![Op::CreateRole {
210 name: role.into(),
211 attributes: RoleAttributesRaw::new(),
212 }],
213 )
214 .await
215 .expect("is ok");
216 let role = catalog.try_get_role_by_name(role).expect("must exist");
217 // Can't use a dummy session because we need a valid role for the validity check.
218 let mut session = Session::<Timestamp>::new(
219 &mz_build_info::DUMMY_BUILD_INFO,
220 SessionConfig {
221 conn_id,
222 uuid: Uuid::new_v4(),
223 user,
224 client_ip: None,
225 external_metadata_rx: None,
226 helm_chart_version: None,
227 },
228 metrics.session_metrics(),
229 );
230 session.initialize_role_metadata(role.id);
231 let empty = PlanValidity::new(
232 // Set the transient rev 1 down so the check logic runs.
233 catalog
234 .transient_revision()
235 .checked_sub(1)
236 .expect("must subtract"),
237 BTreeSet::new(),
238 None,
239 None,
240 session.role_metadata().clone(),
241 );
242 let some_system_cluster = catalog
243 .clusters()
244 .find(|c| matches!(c.id, ClusterId::System(_)))
245 .expect("must exist");
246
247 // Plan generation and result assertion closures.
248 let tests: &[(
249 Box<dyn Fn(&mut PlanValidity)>,
250 Box<dyn Fn(Result<(), AdapterError>)>,
251 )] = &[
252 (Box::new(|_validity| {}), Box::new(|res| assert_ok!(res))),
253 (
254 Box::new(|validity| {
255 let PlanValidity::Checks { cluster_id, .. } = validity else {
256 panic!();
257 };
258 *cluster_id = Some(ClusterId::user(3).expect("3 is a valid ID"));
259 }),
260 Box::new(|res| {
261 assert_contains!(
262 res.expect_err("must err").to_string(),
263 "cluster u3 was removed"
264 )
265 }),
266 ),
267 (
268 Box::new(|validity| {
269 let PlanValidity::Checks {
270 cluster_id,
271 replica_id,
272 ..
273 } = validity
274 else {
275 panic!();
276 };
277 *cluster_id = Some(some_system_cluster.id);
278 *replica_id = Some(ReplicaId::User(4));
279 }),
280 Box::new(|res| {
281 assert_contains!(
282 res.expect_err("must err").to_string(),
283 format!(
284 "replica u4 of cluster {} was removed",
285 some_system_cluster.id
286 ),
287 )
288 }),
289 ),
290 (
291 Box::new(|validity| {
292 validity.extend_dependencies(vec![CatalogItemId::User(6)].into_iter());
293 }),
294 Box::new(|res| {
295 assert_contains!(
296 res.expect_err("must err").to_string(),
297 "dependency was removed: u6"
298 )
299 }),
300 ),
301 (
302 Box::new(|validity| {
303 let PlanValidity::Checks { role_metadata, .. } = validity else {
304 panic!();
305 };
306 role_metadata.current_role = RoleId::User(5);
307 }),
308 Box::new(|res| {
309 assert_contains!(
310 res.expect_err("must err").to_string(),
311 "role u5 was concurrently dropped"
312 )
313 }),
314 ),
315 (
316 Box::new(|validity| {
317 let PlanValidity::Checks { role_metadata, .. } = validity else {
318 panic!();
319 };
320 role_metadata.session_role = RoleId::User(5);
321 }),
322 Box::new(|res| {
323 assert_contains!(
324 res.expect_err("must err").to_string(),
325 "role u5 was concurrently dropped"
326 )
327 }),
328 ),
329 (
330 Box::new(|validity| {
331 let PlanValidity::Checks { role_metadata, .. } = validity else {
332 panic!();
333 };
334 role_metadata.authenticated_role = RoleId::User(5);
335 }),
336 Box::new(|res| {
337 assert_contains!(
338 res.expect_err("must err").to_string(),
339 "role u5 was concurrently dropped"
340 )
341 }),
342 ),
343 (
344 Box::new(|validity| {
345 *validity = PlanValidity::require_transient_revision(100);
346 }),
347 Box::new(|res| {
348 // This check is a no-op.
349 assert!(res.is_ok());
350 }),
351 ),
352 (
353 Box::new(|validity| {
354 *validity =
355 PlanValidity::require_transient_revision(catalog.transient_revision());
356 }),
357 Box::new(|res| {
358 assert!(res.is_ok());
359 }),
360 ),
361 ];
362 for (get_validity, check_res) in tests {
363 let mut validity = empty.clone();
364 get_validity(&mut validity);
365 let res = validity.check(&catalog);
366 check_res(res);
367 }
368 })
369 .await
370 }
371}