mz_adapter/coord/
validity.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use mz_cluster_client::ReplicaId;
13use mz_compute_types::ComputeInstanceId;
14use mz_repr::CatalogItemId;
15use mz_sql::rbac::UnauthorizedError;
16use mz_sql::session::user::RoleMetadata;
17
18use crate::AdapterError;
19use crate::catalog::Catalog;
20
21// The inner fields of PlanValidity are not pub to prevent callers from using them in SQL logic.
22// Callers are responsible for tracking their own needed IDs explicitly and not using
23// PlanValidity as a logic sidecar.
24
25/// A struct to hold information about the validity of plans and if they should be abandoned after
26/// doing work off of the Coordinator thread.
27#[derive(Debug, Clone)]
28pub enum PlanValidity {
29    /// Requires a specific transient revision.
30    RequireRevision { required_revision: u64 },
31    /// Checks various catalog IDs. Uses the transient revision only as a cache marker.
32    Checks {
33        /// The most recent revision at which this plan was verified as valid.
34        transient_revision: u64,
35        /// Objects on which the plan depends.
36        dependency_ids: BTreeSet<CatalogItemId>,
37        cluster_id: Option<ComputeInstanceId>,
38        replica_id: Option<ReplicaId>,
39        role_metadata: RoleMetadata,
40    },
41}
42
43impl PlanValidity {
44    pub fn new(
45        transient_revision: u64,
46        dependency_ids: BTreeSet<CatalogItemId>,
47        cluster_id: Option<ComputeInstanceId>,
48        replica_id: Option<ReplicaId>,
49        role_metadata: RoleMetadata,
50    ) -> Self {
51        PlanValidity::Checks {
52            transient_revision,
53            dependency_ids,
54            cluster_id,
55            replica_id,
56            role_metadata,
57        }
58    }
59
60    /// WARNING: This is currently a no-op and `check` will always succeed.
61    ///
62    /// Sets the required `transient_revision` of the catalog. Should only be used by serialized
63    /// statements (and thus should never fail for users), but here as an internal failsafe against
64    /// programming errors.
65    pub fn require_transient_revision(required_revision: u64) -> Self {
66        PlanValidity::RequireRevision { required_revision }
67    }
68
69    /// Panics if not called on a Checks variant.
70    pub fn extend_dependencies(&mut self, ids: impl Iterator<Item = CatalogItemId>) {
71        let Self::Checks { dependency_ids, .. } = self else {
72            unreachable!();
73        };
74        dependency_ids.extend(ids);
75    }
76
77    /// Returns an error if the current catalog no longer has all dependencies.
78    pub fn check(&mut self, catalog: &Catalog) -> Result<(), AdapterError> {
79        match self {
80            PlanValidity::RequireRevision { required_revision } => {
81                if catalog.transient_revision() != *required_revision {
82                    // TODO: We would like to use this as a programming check that no catalog
83                    // revisions were made as a double-check that all DDLs are serialized. However,
84                    // since only *most* DDLs are serialized (see `must_serialize_ddl()` for those
85                    // that aren't), it is possible for two DDLs to run concurrently and the catalog
86                    // revision to increment during the off-thread work from this DDL. For example,
87                    // a CREATE VIEW could be off-thread optimizing while an ALTER SECRET runs and
88                    // increments the revision. For now we assume this check is not strictly needed
89                    // because we have thought medium hard about the DDLs that do not require
90                    // serialization, so they do not pose a correctness problem when executing
91                    // concurrently with any other DDL.
92                    //
93                    // If we want to remove the need to even think at all about DDL ordering
94                    // correctness we would need to refactor all calls to catalog_transact to
95                    // require passing the serialized DDL lock. Statements would be responsible for
96                    // getting the lock at the latest possible correct time. ALTER SECRET for
97                    // example could acquire the lock after interacting with k8s, but most other
98                    // DDLs would get the lock for their entire sequencing duration.
99
100                    //soft_panic_or_log!("another DDL executed while this assumed it was serial");
101                }
102                Ok(())
103            }
104            PlanValidity::Checks {
105                transient_revision,
106                dependency_ids,
107                cluster_id,
108                replica_id,
109                role_metadata,
110            } => {
111                if *transient_revision == catalog.transient_revision() {
112                    return Ok(());
113                }
114                // If the transient revision changed, we have to recheck. If successful, bump the revision
115                // so next check uses the above fast path.
116                if let Some(cluster_id) = cluster_id {
117                    let Some(cluster) = catalog.try_get_cluster(*cluster_id) else {
118                        return Err(AdapterError::ChangedPlan(format!(
119                            "cluster {} was removed",
120                            cluster_id
121                        )));
122                    };
123
124                    if let Some(replica_id) = replica_id {
125                        if cluster.replica(*replica_id).is_none() {
126                            return Err(AdapterError::ChangedPlan(format!(
127                                "replica {} of cluster {} was removed",
128                                replica_id, cluster_id
129                            )));
130                        }
131                    }
132                }
133                // It is sufficient to check that all the dependency_ids still exist because we assume:
134                // - Ids do not mutate.
135                // - Ids are not reused.
136                // - If an id was dropped, this will detect it and error.
137                for id in dependency_ids.iter() {
138                    if catalog.try_get_entry(id).is_none() {
139                        return Err(AdapterError::ChangedPlan(format!(
140                            "dependency was removed: {id}",
141                        )));
142                    }
143                }
144                if catalog.try_get_role(&role_metadata.current_role).is_none() {
145                    return Err(AdapterError::Unauthorized(
146                        UnauthorizedError::ConcurrentRoleDrop(role_metadata.current_role.clone()),
147                    ));
148                }
149                if catalog.try_get_role(&role_metadata.session_role).is_none() {
150                    return Err(AdapterError::Unauthorized(
151                        UnauthorizedError::ConcurrentRoleDrop(role_metadata.session_role.clone()),
152                    ));
153                }
154
155                if catalog
156                    .try_get_role(&role_metadata.authenticated_role)
157                    .is_none()
158                {
159                    return Err(AdapterError::Unauthorized(
160                        UnauthorizedError::ConcurrentRoleDrop(
161                            role_metadata.authenticated_role.clone(),
162                        ),
163                    ));
164                }
165                *transient_revision = catalog.transient_revision();
166                Ok(())
167            }
168        }
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use std::collections::BTreeSet;
175
176    use mz_adapter_types::connection::ConnectionId;
177    use mz_cluster_client::ReplicaId;
178    use mz_controller_types::ClusterId;
179    use mz_ore::metrics::MetricsRegistry;
180    use mz_ore::{assert_contains, assert_ok};
181    use mz_repr::role_id::RoleId;
182    use mz_repr::{CatalogItemId, Timestamp};
183    use mz_sql::catalog::RoleAttributes;
184    use mz_sql::session::metadata::SessionMetadata;
185    use uuid::Uuid;
186
187    use crate::AdapterError;
188    use crate::catalog::{Catalog, Op};
189    use crate::coord::validity::PlanValidity;
190    use crate::metrics::Metrics;
191    use crate::session::{Session, SessionConfig};
192
193    #[mz_ore::test(tokio::test)]
194    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
195    async fn test_plan_validity() {
196        Catalog::with_debug(|mut catalog| async move {
197            let conn_id = ConnectionId::Static(1);
198            let user = String::from("validity_user");
199            let role = "validity_role";
200            let metrics_registry = MetricsRegistry::new();
201            let metrics = Metrics::register_into(&metrics_registry);
202
203            let commit_ts = catalog.current_upper().await;
204            catalog
205                .transact(
206                    None,
207                    commit_ts,
208                    None,
209                    vec![Op::CreateRole {
210                        name: role.into(),
211                        attributes: RoleAttributes::new(),
212                    }],
213                )
214                .await
215                .expect("is ok");
216            let role = catalog.try_get_role_by_name(role).expect("must exist");
217            // Can't use a dummy session because we need a valid role for the validity check.
218            let mut session = Session::<Timestamp>::new(
219                &mz_build_info::DUMMY_BUILD_INFO,
220                SessionConfig {
221                    conn_id,
222                    uuid: Uuid::new_v4(),
223                    user,
224                    client_ip: None,
225                    external_metadata_rx: None,
226                    internal_user_metadata: None,
227                    helm_chart_version: None,
228                },
229                metrics.session_metrics(),
230            );
231            session.initialize_role_metadata(role.id);
232            let empty = PlanValidity::new(
233                // Set the transient rev 1 down so the check logic runs.
234                catalog
235                    .transient_revision()
236                    .checked_sub(1)
237                    .expect("must subtract"),
238                BTreeSet::new(),
239                None,
240                None,
241                session.role_metadata().clone(),
242            );
243            let some_system_cluster = catalog
244                .clusters()
245                .find(|c| matches!(c.id, ClusterId::System(_)))
246                .expect("must exist");
247
248            // Plan generation and result assertion closures.
249            let tests: &[(
250                Box<dyn Fn(&mut PlanValidity)>,
251                Box<dyn Fn(Result<(), AdapterError>)>,
252            )] = &[
253                (Box::new(|_validity| {}), Box::new(|res| assert_ok!(res))),
254                (
255                    Box::new(|validity| {
256                        let PlanValidity::Checks { cluster_id, .. } = validity else {
257                            panic!();
258                        };
259                        *cluster_id = Some(ClusterId::user(3).expect("3 is a valid ID"));
260                    }),
261                    Box::new(|res| {
262                        assert_contains!(
263                            res.expect_err("must err").to_string(),
264                            "cluster u3 was removed"
265                        )
266                    }),
267                ),
268                (
269                    Box::new(|validity| {
270                        let PlanValidity::Checks {
271                            cluster_id,
272                            replica_id,
273                            ..
274                        } = validity
275                        else {
276                            panic!();
277                        };
278                        *cluster_id = Some(some_system_cluster.id);
279                        *replica_id = Some(ReplicaId::User(4));
280                    }),
281                    Box::new(|res| {
282                        assert_contains!(
283                            res.expect_err("must err").to_string(),
284                            format!(
285                                "replica u4 of cluster {} was removed",
286                                some_system_cluster.id
287                            ),
288                        )
289                    }),
290                ),
291                (
292                    Box::new(|validity| {
293                        validity.extend_dependencies(vec![CatalogItemId::User(6)].into_iter());
294                    }),
295                    Box::new(|res| {
296                        assert_contains!(
297                            res.expect_err("must err").to_string(),
298                            "dependency was removed: u6"
299                        )
300                    }),
301                ),
302                (
303                    Box::new(|validity| {
304                        let PlanValidity::Checks { role_metadata, .. } = validity else {
305                            panic!();
306                        };
307                        role_metadata.current_role = RoleId::User(5);
308                    }),
309                    Box::new(|res| {
310                        assert_contains!(
311                            res.expect_err("must err").to_string(),
312                            "role u5 was concurrently dropped"
313                        )
314                    }),
315                ),
316                (
317                    Box::new(|validity| {
318                        let PlanValidity::Checks { role_metadata, .. } = validity else {
319                            panic!();
320                        };
321                        role_metadata.session_role = RoleId::User(5);
322                    }),
323                    Box::new(|res| {
324                        assert_contains!(
325                            res.expect_err("must err").to_string(),
326                            "role u5 was concurrently dropped"
327                        )
328                    }),
329                ),
330                (
331                    Box::new(|validity| {
332                        let PlanValidity::Checks { role_metadata, .. } = validity else {
333                            panic!();
334                        };
335                        role_metadata.authenticated_role = RoleId::User(5);
336                    }),
337                    Box::new(|res| {
338                        assert_contains!(
339                            res.expect_err("must err").to_string(),
340                            "role u5 was concurrently dropped"
341                        )
342                    }),
343                ),
344                (
345                    Box::new(|validity| {
346                        *validity = PlanValidity::require_transient_revision(100);
347                    }),
348                    Box::new(|res| {
349                        // This check is a no-op.
350                        assert!(res.is_ok());
351                    }),
352                ),
353                (
354                    Box::new(|validity| {
355                        *validity =
356                            PlanValidity::require_transient_revision(catalog.transient_revision());
357                    }),
358                    Box::new(|res| {
359                        assert!(res.is_ok());
360                    }),
361                ),
362            ];
363            for (get_validity, check_res) in tests {
364                let mut validity = empty.clone();
365                get_validity(&mut validity);
366                let res = validity.check(&catalog);
367                check_res(res);
368            }
369        })
370        .await
371    }
372}