Skip to main content

mz_adapter/coord/
validity.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use mz_cluster_client::ReplicaId;
13use mz_compute_types::ComputeInstanceId;
14use mz_repr::CatalogItemId;
15use mz_sql::rbac::UnauthorizedError;
16use mz_sql::session::user::RoleMetadata;
17
18use crate::AdapterError;
19use crate::catalog::Catalog;
20
21// The inner fields of PlanValidity are not pub to prevent callers from using them in SQL logic.
22// Callers are responsible for tracking their own needed IDs explicitly and not using
23// PlanValidity as a logic sidecar.
24
25/// A struct to hold information about the validity of plans and if they should be abandoned after
26/// doing work off of the Coordinator thread.
27#[derive(Debug, Clone)]
28pub enum PlanValidity {
29    /// Requires a specific transient revision.
30    RequireRevision { required_revision: u64 },
31    /// Checks various catalog IDs. Uses the transient revision only as a cache marker.
32    Checks {
33        /// The most recent revision at which this plan was verified as valid.
34        transient_revision: u64,
35        /// Objects on which the plan depends.
36        dependency_ids: BTreeSet<CatalogItemId>,
37        cluster_id: Option<ComputeInstanceId>,
38        replica_id: Option<ReplicaId>,
39        role_metadata: RoleMetadata,
40    },
41}
42
43impl PlanValidity {
44    pub fn new(
45        transient_revision: u64,
46        dependency_ids: BTreeSet<CatalogItemId>,
47        cluster_id: Option<ComputeInstanceId>,
48        replica_id: Option<ReplicaId>,
49        role_metadata: RoleMetadata,
50    ) -> Self {
51        PlanValidity::Checks {
52            transient_revision,
53            dependency_ids,
54            cluster_id,
55            replica_id,
56            role_metadata,
57        }
58    }
59
60    /// WARNING: This is currently a no-op and `check` will always succeed.
61    ///
62    /// Sets the required `transient_revision` of the catalog. Should only be used by serialized
63    /// statements (and thus should never fail for users), but here as an internal failsafe against
64    /// programming errors.
65    pub fn require_transient_revision(required_revision: u64) -> Self {
66        PlanValidity::RequireRevision { required_revision }
67    }
68
69    /// Panics if not called on a Checks variant.
70    pub fn extend_dependencies(&mut self, ids: impl Iterator<Item = CatalogItemId>) {
71        let Self::Checks { dependency_ids, .. } = self else {
72            unreachable!();
73        };
74        dependency_ids.extend(ids);
75    }
76
77    /// Returns an error if the current catalog no longer has all dependencies.
78    pub fn check(&mut self, catalog: &Catalog) -> Result<(), AdapterError> {
79        match self {
80            PlanValidity::RequireRevision { required_revision } => {
81                if catalog.transient_revision() != *required_revision {
82                    // TODO: We would like to use this as a programming check that no catalog
83                    // revisions were made as a double-check that all DDLs are serialized. However,
84                    // since only *most* DDLs are serialized (see `must_serialize_ddl()` for those
85                    // that aren't), it is possible for two DDLs to run concurrently and the catalog
86                    // revision to increment during the off-thread work from this DDL. For example,
87                    // a CREATE VIEW could be off-thread optimizing while an ALTER SECRET runs and
88                    // increments the revision. For now we assume this check is not strictly needed
89                    // because we have thought medium hard about the DDLs that do not require
90                    // serialization, so they do not pose a correctness problem when executing
91                    // concurrently with any other DDL.
92                    //
93                    // If we want to remove the need to even think at all about DDL ordering
94                    // correctness we would need to refactor all calls to catalog_transact to
95                    // require passing the serialized DDL lock. Statements would be responsible for
96                    // getting the lock at the latest possible correct time. ALTER SECRET for
97                    // example could acquire the lock after interacting with k8s, but most other
98                    // DDLs would get the lock for their entire sequencing duration.
99
100                    //soft_panic_or_log!("another DDL executed while this assumed it was serial");
101                }
102                Ok(())
103            }
104            PlanValidity::Checks {
105                transient_revision,
106                dependency_ids,
107                cluster_id,
108                replica_id,
109                role_metadata,
110            } => {
111                if *transient_revision == catalog.transient_revision() {
112                    return Ok(());
113                }
114                // If the transient revision changed, we have to recheck. If successful, bump the revision
115                // so next check uses the above fast path.
116                if let Some(cluster_id) = cluster_id {
117                    let Some(cluster) = catalog.try_get_cluster(*cluster_id) else {
118                        return Err(AdapterError::ChangedPlan(format!(
119                            "cluster {} was removed",
120                            cluster_id
121                        )));
122                    };
123
124                    if let Some(replica_id) = replica_id {
125                        if cluster.replica(*replica_id).is_none() {
126                            return Err(AdapterError::ChangedPlan(format!(
127                                "replica {} of cluster {} was removed",
128                                replica_id, cluster_id
129                            )));
130                        }
131                    }
132                }
133                // It is sufficient to check that all the dependency_ids still exist because we assume:
134                // - Ids do not mutate.
135                // - Ids are not reused.
136                // - If an id was dropped, this will detect it and error.
137                for id in dependency_ids.iter() {
138                    if catalog.try_get_entry(id).is_none() {
139                        return Err(AdapterError::ChangedPlan(format!(
140                            "dependency was removed: {id}",
141                        )));
142                    }
143                }
144                if catalog.try_get_role(&role_metadata.current_role).is_none() {
145                    return Err(AdapterError::Unauthorized(
146                        UnauthorizedError::ConcurrentRoleDrop(role_metadata.current_role.clone()),
147                    ));
148                }
149                if catalog.try_get_role(&role_metadata.session_role).is_none() {
150                    return Err(AdapterError::Unauthorized(
151                        UnauthorizedError::ConcurrentRoleDrop(role_metadata.session_role.clone()),
152                    ));
153                }
154
155                if catalog
156                    .try_get_role(&role_metadata.authenticated_role)
157                    .is_none()
158                {
159                    return Err(AdapterError::Unauthorized(
160                        UnauthorizedError::ConcurrentRoleDrop(
161                            role_metadata.authenticated_role.clone(),
162                        ),
163                    ));
164                }
165                *transient_revision = catalog.transient_revision();
166                Ok(())
167            }
168        }
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use std::collections::BTreeSet;
175
176    use mz_adapter_types::connection::ConnectionId;
177    use mz_cluster_client::ReplicaId;
178    use mz_controller_types::ClusterId;
179    use mz_ore::metrics::MetricsRegistry;
180    use mz_ore::{assert_contains, assert_ok};
181    use mz_repr::role_id::RoleId;
182    use mz_repr::{CatalogItemId, Timestamp};
183    use mz_sql::catalog::RoleAttributesRaw;
184    use mz_sql::session::metadata::SessionMetadata;
185    use uuid::Uuid;
186
187    use crate::AdapterError;
188    use crate::catalog::{Catalog, Op};
189    use crate::coord::validity::PlanValidity;
190    use crate::metrics::Metrics;
191    use crate::session::{Session, SessionConfig};
192
193    #[mz_ore::test(tokio::test)]
194    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
195    async fn test_plan_validity() {
196        Catalog::with_debug(|mut catalog| async move {
197            let conn_id = ConnectionId::Static(1);
198            let user = String::from("validity_user");
199            let role = "validity_role";
200            let metrics_registry = MetricsRegistry::new();
201            let metrics = Metrics::register_into(&metrics_registry);
202
203            let commit_ts = catalog.current_upper().await;
204            catalog
205                .transact(
206                    None,
207                    commit_ts,
208                    None,
209                    vec![Op::CreateRole {
210                        name: role.into(),
211                        attributes: RoleAttributesRaw::new(),
212                    }],
213                )
214                .await
215                .expect("is ok");
216            let role = catalog.try_get_role_by_name(role).expect("must exist");
217            // Can't use a dummy session because we need a valid role for the validity check.
218            let mut session = Session::<Timestamp>::new(
219                &mz_build_info::DUMMY_BUILD_INFO,
220                SessionConfig {
221                    conn_id,
222                    uuid: Uuid::new_v4(),
223                    user,
224                    client_ip: None,
225                    external_metadata_rx: None,
226                    helm_chart_version: None,
227                },
228                metrics.session_metrics(),
229            );
230            session.initialize_role_metadata(role.id);
231            let empty = PlanValidity::new(
232                // Set the transient rev 1 down so the check logic runs.
233                catalog
234                    .transient_revision()
235                    .checked_sub(1)
236                    .expect("must subtract"),
237                BTreeSet::new(),
238                None,
239                None,
240                session.role_metadata().clone(),
241            );
242            let some_system_cluster = catalog
243                .clusters()
244                .find(|c| matches!(c.id, ClusterId::System(_)))
245                .expect("must exist");
246
247            // Plan generation and result assertion closures.
248            let tests: &[(
249                Box<dyn Fn(&mut PlanValidity)>,
250                Box<dyn Fn(Result<(), AdapterError>)>,
251            )] = &[
252                (Box::new(|_validity| {}), Box::new(|res| assert_ok!(res))),
253                (
254                    Box::new(|validity| {
255                        let PlanValidity::Checks { cluster_id, .. } = validity else {
256                            panic!();
257                        };
258                        *cluster_id = Some(ClusterId::user(3).expect("3 is a valid ID"));
259                    }),
260                    Box::new(|res| {
261                        assert_contains!(
262                            res.expect_err("must err").to_string(),
263                            "cluster u3 was removed"
264                        )
265                    }),
266                ),
267                (
268                    Box::new(|validity| {
269                        let PlanValidity::Checks {
270                            cluster_id,
271                            replica_id,
272                            ..
273                        } = validity
274                        else {
275                            panic!();
276                        };
277                        *cluster_id = Some(some_system_cluster.id);
278                        *replica_id = Some(ReplicaId::User(4));
279                    }),
280                    Box::new(|res| {
281                        assert_contains!(
282                            res.expect_err("must err").to_string(),
283                            format!(
284                                "replica u4 of cluster {} was removed",
285                                some_system_cluster.id
286                            ),
287                        )
288                    }),
289                ),
290                (
291                    Box::new(|validity| {
292                        validity.extend_dependencies(vec![CatalogItemId::User(6)].into_iter());
293                    }),
294                    Box::new(|res| {
295                        assert_contains!(
296                            res.expect_err("must err").to_string(),
297                            "dependency was removed: u6"
298                        )
299                    }),
300                ),
301                (
302                    Box::new(|validity| {
303                        let PlanValidity::Checks { role_metadata, .. } = validity else {
304                            panic!();
305                        };
306                        role_metadata.current_role = RoleId::User(5);
307                    }),
308                    Box::new(|res| {
309                        assert_contains!(
310                            res.expect_err("must err").to_string(),
311                            "role u5 was concurrently dropped"
312                        )
313                    }),
314                ),
315                (
316                    Box::new(|validity| {
317                        let PlanValidity::Checks { role_metadata, .. } = validity else {
318                            panic!();
319                        };
320                        role_metadata.session_role = RoleId::User(5);
321                    }),
322                    Box::new(|res| {
323                        assert_contains!(
324                            res.expect_err("must err").to_string(),
325                            "role u5 was concurrently dropped"
326                        )
327                    }),
328                ),
329                (
330                    Box::new(|validity| {
331                        let PlanValidity::Checks { role_metadata, .. } = validity else {
332                            panic!();
333                        };
334                        role_metadata.authenticated_role = RoleId::User(5);
335                    }),
336                    Box::new(|res| {
337                        assert_contains!(
338                            res.expect_err("must err").to_string(),
339                            "role u5 was concurrently dropped"
340                        )
341                    }),
342                ),
343                (
344                    Box::new(|validity| {
345                        *validity = PlanValidity::require_transient_revision(100);
346                    }),
347                    Box::new(|res| {
348                        // This check is a no-op.
349                        assert!(res.is_ok());
350                    }),
351                ),
352                (
353                    Box::new(|validity| {
354                        *validity =
355                            PlanValidity::require_transient_revision(catalog.transient_revision());
356                    }),
357                    Box::new(|res| {
358                        assert!(res.is_ok());
359                    }),
360                ),
361            ];
362            for (get_validity, check_res) in tests {
363                let mut validity = empty.clone();
364                get_validity(&mut validity);
365                let res = validity.check(&catalog);
366                check_res(res);
367            }
368        })
369        .await
370    }
371}