mz_adapter/coord/validity.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use mz_cluster_client::ReplicaId;
13use mz_compute_types::ComputeInstanceId;
14use mz_repr::CatalogItemId;
15use mz_sql::rbac::UnauthorizedError;
16use mz_sql::session::user::RoleMetadata;
17
18use crate::AdapterError;
19use crate::catalog::Catalog;
20
21// The inner fields of PlanValidity are not pub to prevent callers from using them in SQL logic.
22// Callers are responsible for tracking their own needed IDs explicitly and not using
23// PlanValidity as a logic sidecar.
24
25/// A struct to hold information about the validity of plans and if they should be abandoned after
26/// doing work off of the Coordinator thread.
27#[derive(Debug, Clone)]
28pub enum PlanValidity {
29 /// Requires a specific transient revision.
30 RequireRevision { required_revision: u64 },
31 /// Checks various catalog IDs. Uses the transient revision only as a cache marker.
32 Checks {
33 /// The most recent revision at which this plan was verified as valid.
34 transient_revision: u64,
35 /// Objects on which the plan depends.
36 dependency_ids: BTreeSet<CatalogItemId>,
37 cluster_id: Option<ComputeInstanceId>,
38 replica_id: Option<ReplicaId>,
39 role_metadata: RoleMetadata,
40 },
41}
42
43impl PlanValidity {
44 pub fn new(
45 transient_revision: u64,
46 dependency_ids: BTreeSet<CatalogItemId>,
47 cluster_id: Option<ComputeInstanceId>,
48 replica_id: Option<ReplicaId>,
49 role_metadata: RoleMetadata,
50 ) -> Self {
51 PlanValidity::Checks {
52 transient_revision,
53 dependency_ids,
54 cluster_id,
55 replica_id,
56 role_metadata,
57 }
58 }
59
60 /// WARNING: This is currently a no-op and `check` will always succeed.
61 ///
62 /// Sets the required `transient_revision` of the catalog. Should only be used by serialized
63 /// statements (and thus should never fail for users), but here as an internal failsafe against
64 /// programming errors.
65 pub fn require_transient_revision(required_revision: u64) -> Self {
66 PlanValidity::RequireRevision { required_revision }
67 }
68
69 /// Panics if not called on a Checks variant.
70 pub fn extend_dependencies(&mut self, ids: impl Iterator<Item = CatalogItemId>) {
71 let Self::Checks { dependency_ids, .. } = self else {
72 unreachable!();
73 };
74 dependency_ids.extend(ids);
75 }
76
77 /// Returns an error if the current catalog no longer has all dependencies.
78 pub fn check(&mut self, catalog: &Catalog) -> Result<(), AdapterError> {
79 match self {
80 PlanValidity::RequireRevision { required_revision } => {
81 if catalog.transient_revision() != *required_revision {
82 // TODO: We would like to use this as a programming check that no catalog
83 // revisions were made as a double-check that all DDLs are serialized. However,
84 // since only *most* DDLs are serialized (see `must_serialize_ddl()` for those
85 // that aren't), it is possible for two DDLs to run concurrently and the catalog
86 // revision to increment during the off-thread work from this DDL. For example,
87 // a CREATE VIEW could be off-thread optimizing while an ALTER SECRET runs and
88 // increments the revision. For now we assume this check is not strictly needed
89 // because we have thought medium hard about the DDLs that do not require
90 // serialization, so they do not pose a correctness problem when executing
91 // concurrently with any other DDL.
92 //
93 // If we want to remove the need to even think at all about DDL ordering
94 // correctness we would need to refactor all calls to catalog_transact to
95 // require passing the serialized DDL lock. Statements would be responsible for
96 // getting the lock at the latest possible correct time. ALTER SECRET for
97 // example could acquire the lock after interacting with k8s, but most other
98 // DDLs would get the lock for their entire sequencing duration.
99
100 //soft_panic_or_log!("another DDL executed while this assumed it was serial");
101 }
102 Ok(())
103 }
104 PlanValidity::Checks {
105 transient_revision,
106 dependency_ids,
107 cluster_id,
108 replica_id,
109 role_metadata,
110 } => {
111 if *transient_revision == catalog.transient_revision() {
112 return Ok(());
113 }
114 // If the transient revision changed, we have to recheck. If successful, bump the revision
115 // so next check uses the above fast path.
116 if let Some(cluster_id) = cluster_id {
117 let Some(cluster) = catalog.try_get_cluster(*cluster_id) else {
118 return Err(AdapterError::ChangedPlan(format!(
119 "cluster {} was removed",
120 cluster_id
121 )));
122 };
123
124 if let Some(replica_id) = replica_id {
125 if cluster.replica(*replica_id).is_none() {
126 return Err(AdapterError::ChangedPlan(format!(
127 "replica {} of cluster {} was removed",
128 replica_id, cluster_id
129 )));
130 }
131 }
132 }
133 // It is sufficient to check that all the dependency_ids still exist because we assume:
134 // - Ids do not mutate.
135 // - Ids are not reused.
136 // - If an id was dropped, this will detect it and error.
137 for id in dependency_ids.iter() {
138 if catalog.try_get_entry(id).is_none() {
139 return Err(AdapterError::ChangedPlan(format!(
140 "dependency was removed: {id}",
141 )));
142 }
143 }
144 if catalog.try_get_role(&role_metadata.current_role).is_none() {
145 return Err(AdapterError::Unauthorized(
146 UnauthorizedError::ConcurrentRoleDrop(role_metadata.current_role.clone()),
147 ));
148 }
149 if catalog.try_get_role(&role_metadata.session_role).is_none() {
150 return Err(AdapterError::Unauthorized(
151 UnauthorizedError::ConcurrentRoleDrop(role_metadata.session_role.clone()),
152 ));
153 }
154
155 if catalog
156 .try_get_role(&role_metadata.authenticated_role)
157 .is_none()
158 {
159 return Err(AdapterError::Unauthorized(
160 UnauthorizedError::ConcurrentRoleDrop(
161 role_metadata.authenticated_role.clone(),
162 ),
163 ));
164 }
165 *transient_revision = catalog.transient_revision();
166 Ok(())
167 }
168 }
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use std::collections::BTreeSet;
175
176 use mz_adapter_types::connection::ConnectionId;
177 use mz_cluster_client::ReplicaId;
178 use mz_controller_types::ClusterId;
179 use mz_ore::metrics::MetricsRegistry;
180 use mz_ore::{assert_contains, assert_ok};
181 use mz_repr::role_id::RoleId;
182 use mz_repr::{CatalogItemId, Timestamp};
183 use mz_sql::catalog::RoleAttributes;
184 use mz_sql::session::metadata::SessionMetadata;
185 use uuid::Uuid;
186
187 use crate::AdapterError;
188 use crate::catalog::{Catalog, Op};
189 use crate::coord::validity::PlanValidity;
190 use crate::metrics::Metrics;
191 use crate::session::{Session, SessionConfig};
192
193 #[mz_ore::test(tokio::test)]
194 #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
195 async fn test_plan_validity() {
196 Catalog::with_debug(|mut catalog| async move {
197 let conn_id = ConnectionId::Static(1);
198 let user = String::from("validity_user");
199 let role = "validity_role";
200 let metrics_registry = MetricsRegistry::new();
201 let metrics = Metrics::register_into(&metrics_registry);
202
203 let commit_ts = catalog.current_upper().await;
204 catalog
205 .transact(
206 None,
207 commit_ts,
208 None,
209 vec![Op::CreateRole {
210 name: role.into(),
211 attributes: RoleAttributes::new(),
212 }],
213 )
214 .await
215 .expect("is ok");
216 let role = catalog.try_get_role_by_name(role).expect("must exist");
217 // Can't use a dummy session because we need a valid role for the validity check.
218 let mut session = Session::<Timestamp>::new(
219 &mz_build_info::DUMMY_BUILD_INFO,
220 SessionConfig {
221 conn_id,
222 uuid: Uuid::new_v4(),
223 user,
224 client_ip: None,
225 external_metadata_rx: None,
226 internal_user_metadata: None,
227 helm_chart_version: None,
228 },
229 metrics.session_metrics(),
230 );
231 session.initialize_role_metadata(role.id);
232 let empty = PlanValidity::new(
233 // Set the transient rev 1 down so the check logic runs.
234 catalog
235 .transient_revision()
236 .checked_sub(1)
237 .expect("must subtract"),
238 BTreeSet::new(),
239 None,
240 None,
241 session.role_metadata().clone(),
242 );
243 let some_system_cluster = catalog
244 .clusters()
245 .find(|c| matches!(c.id, ClusterId::System(_)))
246 .expect("must exist");
247
248 // Plan generation and result assertion closures.
249 let tests: &[(
250 Box<dyn Fn(&mut PlanValidity)>,
251 Box<dyn Fn(Result<(), AdapterError>)>,
252 )] = &[
253 (Box::new(|_validity| {}), Box::new(|res| assert_ok!(res))),
254 (
255 Box::new(|validity| {
256 let PlanValidity::Checks { cluster_id, .. } = validity else {
257 panic!();
258 };
259 *cluster_id = Some(ClusterId::user(3).expect("3 is a valid ID"));
260 }),
261 Box::new(|res| {
262 assert_contains!(
263 res.expect_err("must err").to_string(),
264 "cluster u3 was removed"
265 )
266 }),
267 ),
268 (
269 Box::new(|validity| {
270 let PlanValidity::Checks {
271 cluster_id,
272 replica_id,
273 ..
274 } = validity
275 else {
276 panic!();
277 };
278 *cluster_id = Some(some_system_cluster.id);
279 *replica_id = Some(ReplicaId::User(4));
280 }),
281 Box::new(|res| {
282 assert_contains!(
283 res.expect_err("must err").to_string(),
284 format!(
285 "replica u4 of cluster {} was removed",
286 some_system_cluster.id
287 ),
288 )
289 }),
290 ),
291 (
292 Box::new(|validity| {
293 validity.extend_dependencies(vec![CatalogItemId::User(6)].into_iter());
294 }),
295 Box::new(|res| {
296 assert_contains!(
297 res.expect_err("must err").to_string(),
298 "dependency was removed: u6"
299 )
300 }),
301 ),
302 (
303 Box::new(|validity| {
304 let PlanValidity::Checks { role_metadata, .. } = validity else {
305 panic!();
306 };
307 role_metadata.current_role = RoleId::User(5);
308 }),
309 Box::new(|res| {
310 assert_contains!(
311 res.expect_err("must err").to_string(),
312 "role u5 was concurrently dropped"
313 )
314 }),
315 ),
316 (
317 Box::new(|validity| {
318 let PlanValidity::Checks { role_metadata, .. } = validity else {
319 panic!();
320 };
321 role_metadata.session_role = RoleId::User(5);
322 }),
323 Box::new(|res| {
324 assert_contains!(
325 res.expect_err("must err").to_string(),
326 "role u5 was concurrently dropped"
327 )
328 }),
329 ),
330 (
331 Box::new(|validity| {
332 let PlanValidity::Checks { role_metadata, .. } = validity else {
333 panic!();
334 };
335 role_metadata.authenticated_role = RoleId::User(5);
336 }),
337 Box::new(|res| {
338 assert_contains!(
339 res.expect_err("must err").to_string(),
340 "role u5 was concurrently dropped"
341 )
342 }),
343 ),
344 (
345 Box::new(|validity| {
346 *validity = PlanValidity::require_transient_revision(100);
347 }),
348 Box::new(|res| {
349 // This check is a no-op.
350 assert!(res.is_ok());
351 }),
352 ),
353 (
354 Box::new(|validity| {
355 *validity =
356 PlanValidity::require_transient_revision(catalog.transient_revision());
357 }),
358 Box::new(|res| {
359 assert!(res.is_ok());
360 }),
361 ),
362 ];
363 for (get_validity, check_res) in tests {
364 let mut validity = empty.clone();
365 get_validity(&mut validity);
366 let res = validity.check(&catalog);
367 check_res(res);
368 }
369 })
370 .await
371 }
372}