Skip to main content

mz_adapter/coord/
validity.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use mz_cluster_client::ReplicaId;
13use mz_compute_types::ComputeInstanceId;
14use mz_repr::CatalogItemId;
15use mz_sql::rbac::UnauthorizedError;
16use mz_sql::session::user::RoleMetadata;
17
18use crate::AdapterError;
19use crate::catalog::Catalog;
20
21// The inner fields of PlanValidity are not pub to prevent callers from using them in SQL logic.
22// Callers are responsible for tracking their own needed IDs explicitly and not using
23// PlanValidity as a logic sidecar.
24
25/// A struct to hold information about the validity of plans and if they should be abandoned after
26/// doing work off of the Coordinator thread.
27///
28/// Concurrent DDLs (those returning `false` from `must_serialize_ddl()`) can mutate the catalog
29/// while a staged statement is being optimized off-thread. `check` is called at every off-thread
30/// → on-thread hop in `sequence_staged` so that dropped dependencies / clusters / replicas /
31/// roles surface as a user-facing error instead of panicking later when the persisted SQL is
32/// re-parsed during catalog application.
33#[derive(Debug, Clone)]
34pub struct PlanValidity {
35    /// The most recent revision at which this plan was verified as valid.
36    transient_revision: u64,
37    /// Objects on which the plan depends.
38    dependency_ids: BTreeSet<CatalogItemId>,
39    cluster_id: Option<ComputeInstanceId>,
40    replica_id: Option<ReplicaId>,
41    role_metadata: RoleMetadata,
42}
43
44impl PlanValidity {
45    pub fn new(
46        transient_revision: u64,
47        dependency_ids: BTreeSet<CatalogItemId>,
48        cluster_id: Option<ComputeInstanceId>,
49        replica_id: Option<ReplicaId>,
50        role_metadata: RoleMetadata,
51    ) -> Self {
52        PlanValidity {
53            transient_revision,
54            dependency_ids,
55            cluster_id,
56            replica_id,
57            role_metadata,
58        }
59    }
60
61    pub fn extend_dependencies(&mut self, ids: impl Iterator<Item = CatalogItemId>) {
62        self.dependency_ids.extend(ids);
63    }
64
65    /// Returns an error if the current catalog no longer has all dependencies.
66    pub fn check(&mut self, catalog: &Catalog) -> Result<(), AdapterError> {
67        if self.transient_revision == catalog.transient_revision() {
68            return Ok(());
69        }
70        // If the transient revision changed, we have to recheck. If successful, bump the revision
71        // so next check uses the above fast path.
72        if let Some(cluster_id) = self.cluster_id {
73            let Some(cluster) = catalog.try_get_cluster(cluster_id) else {
74                return Err(AdapterError::ConcurrentDependencyDrop {
75                    dependency_kind: "cluster",
76                    dependency_id: cluster_id.to_string(),
77                });
78            };
79
80            if let Some(replica_id) = self.replica_id {
81                if cluster.replica(replica_id).is_none() {
82                    return Err(AdapterError::ConcurrentDependencyDrop {
83                        dependency_kind: "cluster replica",
84                        dependency_id: format!("{replica_id} of cluster {cluster_id}"),
85                    });
86                }
87            }
88        }
89        // It is sufficient to check that all the dependency_ids still exist because we assume:
90        // - Ids do not mutate.
91        // - Ids are not reused.
92        // - If an id was dropped, this will detect it and error.
93        for id in self.dependency_ids.iter() {
94            if catalog.try_get_entry(id).is_none() {
95                return Err(AdapterError::ConcurrentDependencyDrop {
96                    dependency_kind: "catalog item",
97                    dependency_id: id.to_string(),
98                });
99            }
100        }
101        if catalog
102            .try_get_role(&self.role_metadata.current_role)
103            .is_none()
104        {
105            return Err(AdapterError::Unauthorized(
106                UnauthorizedError::ConcurrentRoleDrop(self.role_metadata.current_role.clone()),
107            ));
108        }
109        if catalog
110            .try_get_role(&self.role_metadata.session_role)
111            .is_none()
112        {
113            return Err(AdapterError::Unauthorized(
114                UnauthorizedError::ConcurrentRoleDrop(self.role_metadata.session_role.clone()),
115            ));
116        }
117
118        if catalog
119            .try_get_role(&self.role_metadata.authenticated_role)
120            .is_none()
121        {
122            return Err(AdapterError::Unauthorized(
123                UnauthorizedError::ConcurrentRoleDrop(
124                    self.role_metadata.authenticated_role.clone(),
125                ),
126            ));
127        }
128        self.transient_revision = catalog.transient_revision();
129        Ok(())
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use std::collections::BTreeSet;
136
137    use mz_adapter_types::connection::ConnectionId;
138    use mz_auth::AuthenticatorKind;
139    use mz_cluster_client::ReplicaId;
140    use mz_controller_types::ClusterId;
141    use mz_ore::metrics::MetricsRegistry;
142    use mz_ore::{assert_contains, assert_ok};
143    use mz_repr::CatalogItemId;
144    use mz_repr::role_id::RoleId;
145    use mz_sql::catalog::RoleAttributesRaw;
146    use mz_sql::session::metadata::SessionMetadata;
147    use uuid::Uuid;
148
149    use crate::AdapterError;
150    use crate::catalog::{Catalog, Op};
151    use crate::coord::validity::PlanValidity;
152    use crate::metrics::Metrics;
153    use crate::session::{Session, SessionConfig};
154
155    #[mz_ore::test(tokio::test)]
156    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
157    async fn test_plan_validity() {
158        Catalog::with_debug(|mut catalog| async move {
159            let conn_id = ConnectionId::Static(1);
160            let user = String::from("validity_user");
161            let role = "validity_role";
162            let metrics_registry = MetricsRegistry::new();
163            let metrics = Metrics::register_into(&metrics_registry);
164
165            let commit_ts = catalog.current_upper().await;
166            catalog
167                .transact(
168                    None,
169                    commit_ts,
170                    None,
171                    vec![Op::CreateRole {
172                        name: role.into(),
173                        attributes: RoleAttributesRaw::new(),
174                    }],
175                )
176                .await
177                .expect("is ok");
178            let role = catalog.try_get_role_by_name(role).expect("must exist");
179            // Can't use a dummy session because we need a valid role for the validity check.
180            let mut session = Session::new(
181                &mz_build_info::DUMMY_BUILD_INFO,
182                SessionConfig {
183                    conn_id,
184                    uuid: Uuid::new_v4(),
185                    user,
186                    client_ip: None,
187                    external_metadata_rx: None,
188                    helm_chart_version: None,
189                    authenticator_kind: AuthenticatorKind::None,
190                    groups: None,
191                },
192                metrics.session_metrics(),
193            );
194            session.initialize_role_metadata(role.id);
195            let empty = PlanValidity::new(
196                // Set the transient rev 1 down so the check logic runs.
197                catalog
198                    .transient_revision()
199                    .checked_sub(1)
200                    .expect("must subtract"),
201                BTreeSet::new(),
202                None,
203                None,
204                session.role_metadata().clone(),
205            );
206            let some_system_cluster = catalog
207                .clusters()
208                .find(|c| matches!(c.id, ClusterId::System(_)))
209                .expect("must exist");
210
211            // Plan generation and result assertion closures.
212            let tests: &[(
213                Box<dyn Fn(&mut PlanValidity)>,
214                Box<dyn Fn(Result<(), AdapterError>)>,
215            )] = &[
216                (Box::new(|_validity| {}), Box::new(|res| assert_ok!(res))),
217                (
218                    Box::new(|validity| {
219                        validity.cluster_id = Some(ClusterId::user(3).expect("3 is a valid ID"));
220                    }),
221                    Box::new(|res| {
222                        assert_contains!(
223                            res.expect_err("must err").to_string(),
224                            "cluster 'u3' was dropped"
225                        )
226                    }),
227                ),
228                (
229                    Box::new(|validity| {
230                        validity.cluster_id = Some(some_system_cluster.id);
231                        validity.replica_id = Some(ReplicaId::User(4));
232                    }),
233                    Box::new(|res| {
234                        assert_contains!(
235                            res.expect_err("must err").to_string(),
236                            format!(
237                                "cluster replica 'u4 of cluster {}' was dropped",
238                                some_system_cluster.id
239                            ),
240                        )
241                    }),
242                ),
243                (
244                    Box::new(|validity| {
245                        validity.extend_dependencies(vec![CatalogItemId::User(6)].into_iter());
246                    }),
247                    Box::new(|res| {
248                        assert_contains!(
249                            res.expect_err("must err").to_string(),
250                            "catalog item 'u6' was dropped"
251                        )
252                    }),
253                ),
254                (
255                    Box::new(|validity| {
256                        validity.role_metadata.current_role = RoleId::User(5);
257                    }),
258                    Box::new(|res| {
259                        assert_contains!(
260                            res.expect_err("must err").to_string(),
261                            "role u5 was concurrently dropped"
262                        )
263                    }),
264                ),
265                (
266                    Box::new(|validity| {
267                        validity.role_metadata.session_role = RoleId::User(5);
268                    }),
269                    Box::new(|res| {
270                        assert_contains!(
271                            res.expect_err("must err").to_string(),
272                            "role u5 was concurrently dropped"
273                        )
274                    }),
275                ),
276                (
277                    Box::new(|validity| {
278                        validity.role_metadata.authenticated_role = RoleId::User(5);
279                    }),
280                    Box::new(|res| {
281                        assert_contains!(
282                            res.expect_err("must err").to_string(),
283                            "role u5 was concurrently dropped"
284                        )
285                    }),
286                ),
287            ];
288            for (get_validity, check_res) in tests {
289                let mut validity = empty.clone();
290                get_validity(&mut validity);
291                let res = validity.check(&catalog);
292                check_res(res);
293            }
294        })
295        .await
296    }
297}