Skip to main content

mz_sql/
rbac.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26    CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29    CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30    SchemaSpecifier, SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34    DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40/// Common checks that need to be performed before we can start checking a role's privileges.
41fn rbac_check_preamble(
42    catalog: &impl SessionCatalog,
43    session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45    // PostgreSQL allows users that have their role dropped to perform some actions,
46    // such as `SET ROLE` and certain `SELECT` queries. We haven't implemented
47    // `SET ROLE` and feel it's safer to force to user to re-authenticate if their
48    // role is dropped.
49    if catalog
50        .try_get_role(&session_meta.role_metadata().current_role)
51        .is_none()
52    {
53        return Err(UnauthorizedError::ConcurrentRoleDrop(
54            session_meta.role_metadata().current_role.clone(),
55        ));
56    };
57    if catalog
58        .try_get_role(&session_meta.role_metadata().session_role)
59        .is_none()
60    {
61        return Err(UnauthorizedError::ConcurrentRoleDrop(
62            session_meta.role_metadata().session_role.clone(),
63        ));
64    };
65    if catalog
66        .try_get_role(&session_meta.role_metadata().authenticated_role)
67        .is_none()
68    {
69        return Err(UnauthorizedError::ConcurrentRoleDrop(
70            session_meta.role_metadata().authenticated_role.clone(),
71        ));
72    };
73
74    Ok(())
75}
76
77/// Filters `RbacRequirements` based on the session role metadata and RBAC related feature flags.
78fn filter_requirements(
79    catalog: &impl SessionCatalog,
80    session_meta: &dyn SessionMetadata,
81    rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83    // Skip RBAC non-mandatory checks if RBAC is disabled. However, we never skip RBAC checks for
84    // system roles. This allows us to limit access of system users even when RBAC is off.
85    let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86        && !session_meta.role_metadata().current_role.is_system()
87        && !session_meta.role_metadata().session_role.is_system();
88    // Skip RBAC checks on user items if the session is a superuser.
89    let is_superuser = session_meta.is_superuser();
90    if is_rbac_disabled || is_superuser {
91        return rbac_requirements.filter_to_mandatory_requirements();
92    }
93
94    rbac_requirements
95}
96
97// The default item types that most statements require USAGE privileges for.
98static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99    btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101// CREATE statements require USAGE privileges on the default item types and USAGE privileges on
102// Types.
103pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104    let mut items = DEFAULT_ITEM_USAGE.clone();
105    items.insert(CatalogItemType::Type);
106    items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110/// System catalog objects exempted from `check_restrict_to_user_objects`.
111///
112/// The `mz_mcp_data_product*` views are how the MCP agent endpoint
113/// discovers data products; blocking them defeats the isolation model.
114/// `mz_show_my_cluster_privileges` is joined by `read_data_product` to
115/// check cluster USAGE (it replaces a `has_cluster_privilege` call whose
116/// body referenced `mz_roles`) and only exposes the session role's own
117/// privileges.
118static RESTRICT_TO_USER_OBJECTS_ALLOWED_OIDS: LazyLock<BTreeSet<u32>> = LazyLock::new(|| {
119    use mz_pgrepr::oid;
120    btreeset! {
121        oid::VIEW_MZ_MCP_DATA_PRODUCTS_OID,
122        oid::VIEW_MZ_MCP_DATA_PRODUCT_DETAILS_OID,
123        oid::VIEW_MZ_SHOW_MY_CLUSTER_PRIVILEGES_OID,
124    }
125});
126
127/// Errors that can occur due to an unauthorized action.
128#[derive(Debug, thiserror::Error)]
129pub enum UnauthorizedError {
130    /// The action can only be performed by a superuser.
131    #[error("permission denied to {action}")]
132    Superuser { action: String },
133    /// The action requires ownership of an object.
134    #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
135    Ownership { objects: Vec<(ObjectType, String)> },
136    /// Altering an owner requires membership of the new owner role.
137    #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
138    RoleMembership { role_names: Vec<String> },
139    /// The action requires one or more privileges.
140    #[error("permission denied for {object_description}")]
141    Privilege {
142        object_description: ErrorMessageObjectDescription,
143        role_name: String,
144        privileges: String,
145    },
146    // TODO(jkosh44) When we implement parameter privileges, this can be replaced with a regular
147    //  privilege error.
148    /// The action can only be performed by the mz_system role.
149    #[error("permission denied to {action}")]
150    MzSystem { action: String },
151    /// The action cannot be performed by the mz_support role.
152    #[error("permission denied to {action}")]
153    MzSupport { action: String },
154    /// The active role was dropped while a user was logged in.
155    #[error("role {0} was concurrently dropped")]
156    ConcurrentRoleDrop(RoleId),
157    /// Access to system objects is restricted by the restrict_to_user_objects session variable.
158    #[error("access to system object {object_name} is restricted")]
159    RestrictedSystemObject { object_name: String },
160}
161
162impl UnauthorizedError {
163    pub fn detail(&self) -> Option<String> {
164        match &self {
165            UnauthorizedError::Superuser { action } => {
166                Some(format!("You must be a superuser to {}", action))
167            }
168            UnauthorizedError::Privilege {
169                object_description,
170                role_name,
171                privileges,
172            } => Some(format!(
173                "The '{role_name}' role needs {privileges} privileges on {object_description}"
174            )),
175            UnauthorizedError::MzSystem { .. } => {
176                Some(format!("You must be the '{}' role", SYSTEM_USER.name))
177            }
178            UnauthorizedError::MzSupport { .. } => Some(format!(
179                "The '{}' role has very limited privileges",
180                SUPPORT_USER.name
181            )),
182            UnauthorizedError::ConcurrentRoleDrop(_) => {
183                Some("Please disconnect and re-connect with a valid role.".into())
184            }
185            UnauthorizedError::RestrictedSystemObject { .. } => Some(
186                "Access to system catalog objects is restricted for this role. \
187                Contact your administrator if you need access."
188                    .into(),
189            ),
190            UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
191        }
192    }
193}
194
195/// RBAC requirements for executing a given plan.
196#[derive(Debug)]
197struct RbacRequirements {
198    /// The role memberships required.
199    role_membership: BTreeSet<RoleId>,
200    /// The object ownerships required.
201    ownership: Vec<ObjectId>,
202    /// The privileges required. The tuples are of the form:
203    /// (What object the privilege is on, What privilege is required, Who must possess the privilege).
204    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
205    /// The types of catalog items that this plan requires USAGE privileges on.
206    ///
207    /// Most plans will require USAGE on secrets and connections but some plans, like SHOW CREATE,
208    /// can reference an item without requiring any privileges on that item.
209    item_usage: &'static BTreeSet<CatalogItemType>,
210    /// Some action if superuser is required to perform that action, None otherwise.
211    superuser_action: Option<String>,
212}
213
214impl RbacRequirements {
215    fn empty() -> RbacRequirements {
216        RbacRequirements {
217            role_membership: BTreeSet::new(),
218            ownership: Vec::new(),
219            privileges: Vec::new(),
220            item_usage: &EMPTY_ITEM_USAGE,
221            superuser_action: None,
222        }
223    }
224
225    fn validate(
226        self,
227        catalog: &impl SessionCatalog,
228        session: &dyn SessionMetadata,
229        resolved_ids: &ResolvedIds,
230    ) -> Result<(), UnauthorizedError> {
231        // Obtain all roles that the current session is a member of.
232        let role_membership =
233            catalog.collect_role_membership(&session.role_metadata().current_role);
234
235        check_usage(catalog, session, resolved_ids, self.item_usage)?;
236
237        // Validate that the current session has the required role membership to execute the provided
238        // plan.
239        let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
240        if !unheld_membership.is_empty() {
241            let role_names = unheld_membership
242                .into_iter()
243                .map(|role_id| {
244                    // Some role references may no longer exist due to concurrent drops.
245                    catalog
246                        .try_get_role(role_id)
247                        .map(|role| role.name().to_string())
248                        .unwrap_or_else(|| role_id.to_string())
249                })
250                .collect();
251            return Err(UnauthorizedError::RoleMembership { role_names });
252        }
253
254        // Validate that the current session has the required object ownership to execute the provided
255        // plan.
256        let unheld_ownership = self
257            .ownership
258            .into_iter()
259            .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
260            .collect();
261        ownership_err(unheld_ownership, catalog)?;
262
263        check_object_privileges(
264            catalog,
265            self.privileges,
266            role_membership,
267            session.role_metadata().current_role,
268        )?;
269
270        if let Some(action) = self.superuser_action {
271            return Err(UnauthorizedError::Superuser { action });
272        }
273
274        Ok(())
275    }
276
277    fn filter_to_mandatory_requirements(self) -> RbacRequirements {
278        let RbacRequirements {
279            role_membership,
280            ownership,
281            privileges,
282            item_usage,
283            superuser_action: _,
284        } = self;
285        let role_membership = role_membership
286            .into_iter()
287            .filter(|id| id.is_system())
288            .collect();
289        let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
290        let privileges = privileges
291            .into_iter()
292            .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
293            // We allow reading objects for superusers and when RBAC is off.
294            .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
295            .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
296            .collect();
297        let superuser_action = None;
298        RbacRequirements {
299            role_membership,
300            ownership,
301            privileges,
302            item_usage,
303            superuser_action,
304        }
305    }
306}
307
308impl Default for RbacRequirements {
309    fn default() -> Self {
310        RbacRequirements {
311            role_membership: BTreeSet::new(),
312            ownership: Vec::new(),
313            privileges: Vec::new(),
314            item_usage: &DEFAULT_ITEM_USAGE,
315            superuser_action: None,
316        }
317    }
318}
319
320/// When `restrict_to_user_objects` is active, rejects access to system catalog objects.
321///
322/// Functions and types are allowed through because they are needed for query execution.
323/// All other system items (tables, views, sources, sinks, etc.) are blocked. This is an
324/// allow-list — new catalog item types are blocked by default.
325///
326/// See: doc/developer/design/20260508_restrict_to_user_objects.md
327fn check_restrict_to_user_objects(
328    catalog: &impl SessionCatalog,
329    session: &dyn SessionMetadata,
330    resolved_ids: &ResolvedIds,
331) -> Result<(), UnauthorizedError> {
332    if !session.restrict_to_user_objects() {
333        return Ok(());
334    }
335    for item_id in resolved_ids.items() {
336        if item_id.is_system() {
337            if let Some(item) = catalog.try_get_item(item_id) {
338                match item.item_type() {
339                    CatalogItemType::Func | CatalogItemType::Type => {}
340                    _ => {
341                        if RESTRICT_TO_USER_OBJECTS_ALLOWED_OIDS.contains(&item.oid()) {
342                            continue;
343                        }
344                        return Err(UnauthorizedError::RestrictedSystemObject {
345                            object_name: item.name().item.clone(),
346                        });
347                    }
348                }
349            }
350        }
351    }
352    Ok(())
353}
354
355/// Checks if a `session` is authorized to use `resolved_ids`. If not, an error is returned.
356pub fn check_usage(
357    catalog: &impl SessionCatalog,
358    session: &dyn SessionMetadata,
359    resolved_ids: &ResolvedIds,
360    item_types: &BTreeSet<CatalogItemType>,
361) -> Result<(), UnauthorizedError> {
362    rbac_check_preamble(catalog, session)?;
363
364    // See: doc/developer/design/20260508_restrict_to_user_objects.md
365    check_restrict_to_user_objects(catalog, session, resolved_ids)?;
366
367    // Obtain all roles that the current session is a member of.
368    let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
369
370    // Certain statements depend on objects that haven't been created yet, like sub-sources, so we
371    // need to filter those out.
372    let existing_resolved_ids =
373        resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
374
375    let required_privileges = generate_usage_privileges(
376        catalog,
377        &existing_resolved_ids,
378        session.role_metadata().current_role,
379        item_types,
380    )
381    .into_iter()
382    .collect();
383
384    let mut rbac_requirements = RbacRequirements::empty();
385    rbac_requirements.privileges = required_privileges;
386    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
387    let required_privileges = rbac_requirements.privileges;
388
389    check_object_privileges(
390        catalog,
391        required_privileges,
392        role_membership,
393        session.role_metadata().current_role,
394    )?;
395
396    Ok(())
397}
398
399/// Checks if a session is authorized to execute a plan. If not, an error is returned.
400///
401/// `sql_impl_resolved_ids` contains resolved IDs discovered inside SQL-implemented function
402/// bodies during planning. These are kept separate from `resolved_ids` because they are
403/// implementation details of the functions, not dependencies of the statement. They are
404/// only checked by the `restrict_to_user_objects` restriction.
405pub fn check_plan(
406    catalog: &impl SessionCatalog,
407    // Function mapping a connection ID to an authenticated role. The roles may have been dropped concurrently.
408    // Only required for Plan::SideEffectingFunc; can be None for other plan types.
409    // TODO(peek-seq): Remove this when deleting the old peek sequencing. The logic here that uses
410    // `active_conns` is mirrored in `execute_side_effecting_func`, which is what the frontend peek
411    // sequencing uses.
412    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
413    session: &dyn SessionMetadata,
414    plan: &Plan,
415    target_cluster_id: Option<ClusterId>,
416    resolved_ids: &ResolvedIds,
417    sql_impl_resolved_ids: &ResolvedIds,
418) -> Result<(), UnauthorizedError> {
419    rbac_check_preamble(catalog, session)?;
420
421    // Check sql_impl function body dependencies against restrict_to_user_objects.
422    // These are checked separately from the main resolved_ids because they are
423    // implementation details that should not affect dependency tracking.
424    check_restrict_to_user_objects(catalog, session, sql_impl_resolved_ids)?;
425
426    let rbac_requirements = generate_rbac_requirements(
427        catalog,
428        plan,
429        active_conns,
430        target_cluster_id,
431        session.role_metadata().current_role,
432    );
433    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
434    debug!(
435        "rbac requirements {rbac_requirements:?} for plan {:?}",
436        PlanKind::from(plan)
437    );
438    rbac_requirements.validate(catalog, session, resolved_ids)
439}
440
441/// Returns true if RBAC is turned on for a session, false otherwise.
442pub fn is_rbac_enabled_for_session(
443    system_vars: &SystemVars,
444    session: &dyn SessionMetadata,
445) -> bool {
446    let server_enabled = system_vars.enable_rbac_checks();
447    let session_enabled = session.enable_session_rbac_checks();
448
449    // The session flag allows users to turn RBAC on for just their session while the server flag
450    // allows users to turn RBAC on for everyone.
451    server_enabled || session_enabled
452}
453
454/// Generates all requirements needed to execute a given plan.
455fn generate_rbac_requirements(
456    catalog: &impl SessionCatalog,
457    plan: &Plan,
458    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
459    target_cluster_id: Option<ClusterId>,
460    role_id: RoleId,
461) -> RbacRequirements {
462    match plan {
463        Plan::CreateConnection(plan::CreateConnectionPlan {
464            name,
465            if_not_exists: _,
466            connection: _,
467            validate: _,
468        }) => RbacRequirements {
469            privileges: vec![(
470                SystemObjectId::Object(name.qualifiers.clone().into()),
471                AclMode::CREATE,
472                role_id,
473            )],
474            item_usage: &CREATE_ITEM_USAGE,
475            ..Default::default()
476        },
477        Plan::CreateDatabase(plan::CreateDatabasePlan {
478            name: _,
479            if_not_exists: _,
480        }) => RbacRequirements {
481            privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
482            item_usage: &CREATE_ITEM_USAGE,
483            ..Default::default()
484        },
485        Plan::CreateSchema(plan::CreateSchemaPlan {
486            database_spec,
487            schema_name: _,
488            if_not_exists: _,
489        }) => {
490            let privileges = match database_spec {
491                ResolvedDatabaseSpecifier::Ambient => Vec::new(),
492                ResolvedDatabaseSpecifier::Id(database_id) => {
493                    vec![(
494                        SystemObjectId::Object(database_id.into()),
495                        AclMode::CREATE,
496                        role_id,
497                    )]
498                }
499            };
500            RbacRequirements {
501                privileges,
502                item_usage: &CREATE_ITEM_USAGE,
503                ..Default::default()
504            }
505        }
506        Plan::CreateRole(plan::CreateRolePlan {
507            name: _,
508            attributes,
509        }) => {
510            if attributes.superuser.unwrap_or(false) {
511                RbacRequirements {
512                    superuser_action: Some("create superuser role".to_string()),
513                    ..Default::default()
514                }
515            } else {
516                RbacRequirements {
517                    privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
518                    item_usage: &CREATE_ITEM_USAGE,
519                    ..Default::default()
520                }
521            }
522        }
523        Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
524            privileges: vec![(
525                SystemObjectId::System,
526                AclMode::CREATE_NETWORK_POLICY,
527                role_id,
528            )],
529            item_usage: &CREATE_ITEM_USAGE,
530            ..Default::default()
531        },
532        Plan::CreateCluster(plan::CreateClusterPlan {
533            name: _,
534            variant: _,
535            workload_class: _,
536        }) => RbacRequirements {
537            privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
538            item_usage: &CREATE_ITEM_USAGE,
539            ..Default::default()
540        },
541        Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
542            cluster_id,
543            name: _,
544            config: _,
545        }) => RbacRequirements {
546            ownership: vec![ObjectId::Cluster(*cluster_id)],
547            item_usage: &CREATE_ITEM_USAGE,
548            ..Default::default()
549        },
550        Plan::CreateSource(plan::CreateSourcePlan {
551            name,
552            source,
553            if_not_exists: _,
554            timeline: _,
555            in_cluster,
556        }) => RbacRequirements {
557            privileges: generate_required_source_privileges(
558                name,
559                &source.data_source,
560                *in_cluster,
561                role_id,
562            ),
563            item_usage: &CREATE_ITEM_USAGE,
564            ..Default::default()
565        },
566        Plan::CreateSources(plans) => RbacRequirements {
567            privileges: plans
568                .iter()
569                .flat_map(
570                    |plan::CreateSourcePlanBundle {
571                         item_id: _,
572                         global_id: _,
573                         plan:
574                             plan::CreateSourcePlan {
575                                 name,
576                                 source,
577                                 if_not_exists: _,
578                                 timeline: _,
579                                 in_cluster,
580                             },
581                         resolved_ids: _,
582                         available_source_references: _,
583                     }| {
584                        generate_required_source_privileges(
585                            name,
586                            &source.data_source,
587                            *in_cluster,
588                            role_id,
589                        )
590                        .into_iter()
591                    },
592                )
593                .collect(),
594            item_usage: &CREATE_ITEM_USAGE,
595            ..Default::default()
596        },
597        Plan::CreateSecret(plan::CreateSecretPlan {
598            name,
599            secret: _,
600            if_not_exists: _,
601        }) => RbacRequirements {
602            privileges: vec![(
603                SystemObjectId::Object(name.qualifiers.clone().into()),
604                AclMode::CREATE,
605                role_id,
606            )],
607            item_usage: &CREATE_ITEM_USAGE,
608            ..Default::default()
609        },
610        Plan::CreateSink(plan::CreateSinkPlan {
611            name,
612            sink,
613            with_snapshot: _,
614            if_not_exists: _,
615            in_cluster,
616        }) => {
617            let mut privileges = vec![(
618                SystemObjectId::Object(name.qualifiers.clone().into()),
619                AclMode::CREATE,
620                role_id,
621            )];
622            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
623            privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
624            privileges.push((
625                SystemObjectId::Object(in_cluster.into()),
626                AclMode::CREATE,
627                role_id,
628            ));
629            RbacRequirements {
630                privileges,
631                item_usage: &CREATE_ITEM_USAGE,
632                ..Default::default()
633            }
634        }
635        Plan::CreateTable(plan::CreateTablePlan {
636            name,
637            table: _,
638            if_not_exists: _,
639        }) => RbacRequirements {
640            privileges: vec![(
641                SystemObjectId::Object(name.qualifiers.clone().into()),
642                AclMode::CREATE,
643                role_id,
644            )],
645            item_usage: &CREATE_ITEM_USAGE,
646            ..Default::default()
647        },
648        Plan::CreateView(plan::CreateViewPlan {
649            name,
650            view: _,
651            replace,
652            drop_ids: _,
653            if_not_exists: _,
654            ambiguous_columns: _,
655        }) => RbacRequirements {
656            ownership: replace
657                .map(|id| vec![ObjectId::Item(id)])
658                .unwrap_or_default(),
659            privileges: vec![(
660                SystemObjectId::Object(name.qualifiers.clone().into()),
661                AclMode::CREATE,
662                role_id,
663            )],
664            item_usage: &CREATE_ITEM_USAGE,
665            ..Default::default()
666        },
667        Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
668            name,
669            materialized_view,
670            replace,
671            drop_ids: _,
672            if_not_exists: _,
673            ambiguous_columns: _,
674        }) => RbacRequirements {
675            ownership: replace
676                .map(|id| vec![ObjectId::Item(id)])
677                .unwrap_or_default(),
678            privileges: vec![
679                (
680                    SystemObjectId::Object(name.qualifiers.clone().into()),
681                    AclMode::CREATE,
682                    role_id,
683                ),
684                (
685                    SystemObjectId::Object(materialized_view.cluster_id.into()),
686                    AclMode::CREATE,
687                    role_id,
688                ),
689            ],
690            item_usage: &CREATE_ITEM_USAGE,
691            ..Default::default()
692        },
693        Plan::CreateIndex(plan::CreateIndexPlan {
694            name,
695            index,
696            if_not_exists: _,
697        }) => {
698            let index_on_item = catalog.resolve_item_id(&index.on);
699            RbacRequirements {
700                ownership: vec![ObjectId::Item(index_on_item)],
701                privileges: vec![
702                    (
703                        SystemObjectId::Object(name.qualifiers.clone().into()),
704                        AclMode::CREATE,
705                        role_id,
706                    ),
707                    (
708                        SystemObjectId::Object(index.cluster_id.into()),
709                        AclMode::CREATE,
710                        role_id,
711                    ),
712                ],
713                item_usage: &CREATE_ITEM_USAGE,
714                ..Default::default()
715            }
716        }
717        Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
718            privileges: vec![(
719                SystemObjectId::Object(name.qualifiers.clone().into()),
720                AclMode::CREATE,
721                role_id,
722            )],
723            item_usage: &CREATE_ITEM_USAGE,
724            ..Default::default()
725        },
726        Plan::Comment(plan::CommentPlan {
727            object_id,
728            sub_component: _,
729            comment: _,
730        }) => {
731            let (ownership, privileges) = match object_id {
732                // Roles don't have owners, instead we require the current session to have the
733                // `CREATEROLE` privilege.
734                CommentObjectId::Role(_) => (
735                    Vec::new(),
736                    vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
737                ),
738                _ => (vec![ObjectId::from(*object_id)], Vec::new()),
739            };
740            RbacRequirements {
741                ownership,
742                privileges,
743                ..Default::default()
744            }
745        }
746        Plan::DropObjects(plan::DropObjectsPlan {
747            referenced_ids,
748            drop_ids: _,
749            object_type,
750        }) => {
751            let privileges = if object_type == &ObjectType::Role {
752                vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
753            } else {
754                referenced_ids
755                    .iter()
756                    .filter_map(|id| match id {
757                        ObjectId::ClusterReplica((cluster_id, _)) => Some((
758                            SystemObjectId::Object(cluster_id.into()),
759                            AclMode::USAGE,
760                            role_id,
761                        )),
762                        ObjectId::Schema((database_spec, _)) => match database_spec {
763                            ResolvedDatabaseSpecifier::Ambient => None,
764                            ResolvedDatabaseSpecifier::Id(database_id) => Some((
765                                SystemObjectId::Object(database_id.into()),
766                                AclMode::USAGE,
767                                role_id,
768                            )),
769                        },
770                        ObjectId::Item(item_id) => {
771                            let item = catalog.get_item(item_id);
772                            Some((
773                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
774                                AclMode::USAGE,
775                                role_id,
776                            ))
777                        }
778                        ObjectId::Cluster(_)
779                        | ObjectId::Database(_)
780                        | ObjectId::Role(_)
781                        | ObjectId::NetworkPolicy(_) => None,
782                    })
783                    .collect()
784            };
785            RbacRequirements {
786                // Do not need ownership of descendant objects.
787                ownership: referenced_ids.clone(),
788                privileges,
789                ..Default::default()
790            }
791        }
792        Plan::DropOwned(plan::DropOwnedPlan {
793            role_ids,
794            drop_ids: _,
795            privilege_revokes: _,
796            default_privilege_revokes: _,
797        }) => RbacRequirements {
798            role_membership: role_ids.into_iter().cloned().collect(),
799            ..Default::default()
800        },
801        Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
802            let container_id = match id {
803                ObjectId::Item(id) => Some(SystemObjectId::Object(
804                    catalog.get_item(id).name().qualifiers.clone().into(),
805                )),
806                ObjectId::Schema((database_id, _schema_id)) => match database_id {
807                    ResolvedDatabaseSpecifier::Ambient => None,
808                    ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
809                },
810                ObjectId::Cluster(_)
811                | ObjectId::ClusterReplica(_)
812                | ObjectId::Database(_)
813                | ObjectId::Role(_)
814                | ObjectId::NetworkPolicy(_) => None,
815            };
816            let privileges = match container_id {
817                Some(id) => vec![(id, AclMode::USAGE, role_id)],
818                None => Vec::new(),
819            };
820            RbacRequirements {
821                privileges,
822                item_usage: &EMPTY_ITEM_USAGE,
823                ..Default::default()
824            }
825        }
826        Plan::ShowColumns(plan::ShowColumnsPlan {
827            id,
828            select_plan,
829            new_resolved_ids: _,
830        }) => {
831            let mut privileges = vec![(
832                SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
833                AclMode::USAGE,
834                role_id,
835            )];
836
837            for privilege in generate_rbac_requirements(
838                catalog,
839                &Plan::Select(select_plan.clone()),
840                active_conns,
841                target_cluster_id,
842                role_id,
843            )
844            .privileges
845            {
846                privileges.push(privilege);
847            }
848            RbacRequirements {
849                privileges,
850                ..Default::default()
851            }
852        }
853        Plan::Select(plan::SelectPlan {
854            source,
855            select: _,
856            when: _,
857            finishing: _,
858            copy_to: _,
859        }) => {
860            let items = source
861                .depends_on()
862                .into_iter()
863                .map(|gid| catalog.resolve_item_id(&gid));
864            let mut privileges = generate_read_privileges(catalog, items, role_id);
865            if let Some(privilege) = generate_cluster_usage_privileges(
866                source.as_const().is_some(),
867                target_cluster_id,
868                role_id,
869            ) {
870                privileges.push(privilege);
871            }
872            RbacRequirements {
873                privileges,
874                ..Default::default()
875            }
876        }
877        Plan::Subscribe(plan::SubscribePlan {
878            from,
879            with_snapshot: _,
880            when: _,
881            up_to: _,
882            copy_to: _,
883            emit_progress: _,
884            output: _,
885        }) => {
886            let items = from
887                .depends_on()
888                .into_iter()
889                .map(|gid| catalog.resolve_item_id(&gid));
890            let mut privileges = generate_read_privileges(catalog, items, role_id);
891            if let Some(cluster_id) = target_cluster_id {
892                privileges.push((
893                    SystemObjectId::Object(cluster_id.into()),
894                    AclMode::USAGE,
895                    role_id,
896                ));
897            }
898            RbacRequirements {
899                privileges,
900                ..Default::default()
901            }
902        }
903        Plan::CopyFrom(plan::CopyFromPlan {
904            target_name: _,
905            target_id,
906            source: _,
907            columns: _,
908            source_desc: _,
909            mfp: _,
910            params: _,
911            filter: _,
912        }) => RbacRequirements {
913            privileges: vec![
914                (
915                    SystemObjectId::Object(
916                        catalog.get_item(target_id).name().qualifiers.clone().into(),
917                    ),
918                    AclMode::USAGE,
919                    role_id,
920                ),
921                (
922                    SystemObjectId::Object(target_id.into()),
923                    AclMode::INSERT,
924                    role_id,
925                ),
926            ],
927            ..Default::default()
928        },
929        Plan::CopyTo(plan::CopyToPlan {
930            select_plan,
931            desc: _,
932            to: _,
933            connection: _,
934            connection_id: _,
935            format: _,
936            max_file_size: _,
937        }) => {
938            let items = select_plan
939                .source
940                .depends_on()
941                .into_iter()
942                .map(|gid| catalog.resolve_item_id(&gid));
943            let mut privileges = generate_read_privileges(catalog, items, role_id);
944            if let Some(cluster_id) = target_cluster_id {
945                privileges.push((
946                    SystemObjectId::Object(cluster_id.into()),
947                    AclMode::USAGE,
948                    role_id,
949                ));
950            }
951            RbacRequirements {
952                privileges,
953                ..Default::default()
954            }
955        }
956        Plan::ExplainPlan(plan::ExplainPlanPlan {
957            stage: _,
958            format: _,
959            config: _,
960            explainee,
961        })
962        | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
963            privileges: match explainee {
964                Explainee::View(id)
965                | Explainee::MaterializedView(id)
966                | Explainee::Index(id)
967                | Explainee::ReplanView(id)
968                | Explainee::ReplanMaterializedView(id)
969                | Explainee::ReplanIndex(id) => {
970                    let item = catalog.get_item(id);
971                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
972                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
973                }
974                Explainee::Statement(stmt) => stmt
975                    .depends_on()
976                    .into_iter()
977                    .map(|id| {
978                        let item = catalog.get_item_by_global_id(&id);
979                        let schema_id: ObjectId = item.name().qualifiers.clone().into();
980                        (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
981                    })
982                    .collect(),
983            },
984            item_usage: match explainee {
985                Explainee::View(..)
986                | Explainee::MaterializedView(..)
987                | Explainee::Index(..)
988                | Explainee::ReplanView(..)
989                | Explainee::ReplanMaterializedView(..)
990                | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
991                Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
992            },
993            ..Default::default()
994        },
995        Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
996            RbacRequirements {
997                privileges: {
998                    let item = catalog.get_item_by_global_id(sink_from);
999                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
1000                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
1001                },
1002                item_usage: &EMPTY_ITEM_USAGE,
1003                ..Default::default()
1004            }
1005        }
1006        Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
1007            format: _,
1008            raw_plan,
1009            when: _,
1010        }) => RbacRequirements {
1011            privileges: raw_plan
1012                .depends_on()
1013                .into_iter()
1014                .map(|id| {
1015                    let item = catalog.get_item_by_global_id(&id);
1016                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
1017                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
1018                })
1019                .collect(),
1020            ..Default::default()
1021        },
1022        Plan::Insert(plan::InsertPlan {
1023            id,
1024            values,
1025            returning,
1026        }) => {
1027            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1028            let mut privileges = vec![
1029                (
1030                    SystemObjectId::Object(schema_id.clone()),
1031                    AclMode::USAGE,
1032                    role_id,
1033                ),
1034                (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
1035            ];
1036            let mut seen = BTreeSet::from([(schema_id, role_id)]);
1037
1038            // We don't allow arbitrary sub-queries in `returning`. So either it
1039            // contains a column reference to the outer table or it's constant.
1040            if returning
1041                .iter()
1042                .any(|assignment| assignment.contains_column())
1043            {
1044                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1045                seen.insert((id.into(), role_id));
1046            }
1047
1048            let items = values
1049                .depends_on()
1050                .into_iter()
1051                .map(|gid| catalog.resolve_item_id(&gid));
1052            privileges.extend_from_slice(&generate_read_privileges_inner(
1053                catalog, items, role_id, &mut seen,
1054            ));
1055
1056            if let Some(privilege) = generate_cluster_usage_privileges(
1057                values.as_const().is_some(),
1058                target_cluster_id,
1059                role_id,
1060            ) {
1061                privileges.push(privilege);
1062            } else if !returning.is_empty() {
1063                // TODO(jkosh44) returning may be a constant, but for now we are overly protective
1064                //  and require cluster privileges for all returning.
1065                if let Some(cluster_id) = target_cluster_id {
1066                    privileges.push((
1067                        SystemObjectId::Object(cluster_id.into()),
1068                        AclMode::USAGE,
1069                        role_id,
1070                    ));
1071                }
1072            }
1073            RbacRequirements {
1074                privileges,
1075                ..Default::default()
1076            }
1077        }
1078        Plan::AlterCluster(plan::AlterClusterPlan {
1079            id,
1080            name: _,
1081            options: _,
1082            strategy: _,
1083        }) => RbacRequirements {
1084            ownership: vec![ObjectId::Cluster(*id)],
1085            item_usage: &CREATE_ITEM_USAGE,
1086            ..Default::default()
1087        },
1088        Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1089            ownership: vec![ObjectId::Item(*id)],
1090            privileges: vec![(
1091                SystemObjectId::Object(set_cluster.into()),
1092                AclMode::CREATE,
1093                role_id,
1094            )],
1095            item_usage: &CREATE_ITEM_USAGE,
1096            ..Default::default()
1097        },
1098        Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1099            id,
1100            window: _,
1101            value: _,
1102            object_type: _,
1103        }) => RbacRequirements {
1104            ownership: vec![ObjectId::Item(*id)],
1105            item_usage: &CREATE_ITEM_USAGE,
1106            ..Default::default()
1107        },
1108        Plan::AlterSourceTimestampInterval(plan::AlterSourceTimestampIntervalPlan {
1109            id,
1110            value: _,
1111            interval: _,
1112        }) => RbacRequirements {
1113            ownership: vec![ObjectId::Item(*id)],
1114            item_usage: &CREATE_ITEM_USAGE,
1115            ..Default::default()
1116        },
1117        Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1118            ownership: vec![ObjectId::Item(*id)],
1119            ..Default::default()
1120        },
1121        Plan::AlterSource(plan::AlterSourcePlan {
1122            item_id,
1123            ingestion_id: _,
1124            action: _,
1125        }) => RbacRequirements {
1126            ownership: vec![ObjectId::Item(*item_id)],
1127            item_usage: &CREATE_ITEM_USAGE,
1128            ..Default::default()
1129        },
1130        Plan::AlterSink(plan::AlterSinkPlan {
1131            item_id,
1132            global_id: _,
1133            sink,
1134            with_snapshot: _,
1135            in_cluster,
1136        }) => {
1137            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1138            let mut privileges = generate_read_privileges(catalog, items, role_id);
1139            privileges.push((
1140                SystemObjectId::Object(in_cluster.into()),
1141                AclMode::CREATE,
1142                role_id,
1143            ));
1144            RbacRequirements {
1145                ownership: vec![ObjectId::Item(*item_id)],
1146                privileges,
1147                item_usage: &CREATE_ITEM_USAGE,
1148                ..Default::default()
1149            }
1150        }
1151        Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1152            id,
1153            name: _,
1154            to_name: _,
1155        }) => RbacRequirements {
1156            ownership: vec![ObjectId::Cluster(*id)],
1157            ..Default::default()
1158        },
1159        Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1160            id_a,
1161            id_b,
1162            name_a: _,
1163            name_b: _,
1164            name_temp: _,
1165        }) => RbacRequirements {
1166            ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1167            ..Default::default()
1168        },
1169        Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1170            cluster_id,
1171            replica_id,
1172            name: _,
1173            to_name: _,
1174        }) => RbacRequirements {
1175            ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1176            ..Default::default()
1177        },
1178        Plan::AlterItemRename(plan::AlterItemRenamePlan {
1179            id,
1180            current_full_name: _,
1181            to_name: _,
1182            object_type: _,
1183        }) => RbacRequirements {
1184            ownership: vec![ObjectId::Item(*id)],
1185            ..Default::default()
1186        },
1187        Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1188            cur_schema_spec,
1189            new_schema_name: _,
1190        }) => {
1191            let privileges = match cur_schema_spec.0 {
1192                ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1193                    SystemObjectId::Object(ObjectId::Database(db_id)),
1194                    AclMode::CREATE,
1195                    role_id,
1196                )],
1197                ResolvedDatabaseSpecifier::Ambient => vec![],
1198            };
1199
1200            RbacRequirements {
1201                ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1202                privileges,
1203                ..Default::default()
1204            }
1205        }
1206        Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1207            schema_a_spec,
1208            schema_a_name: _,
1209            schema_b_spec,
1210            schema_b_name: _,
1211            name_temp: _,
1212        }) => {
1213            let mut privileges = vec![];
1214            if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1215                privileges.push((
1216                    SystemObjectId::Object(ObjectId::Database(id_a)),
1217                    AclMode::CREATE,
1218                    role_id,
1219                ));
1220            }
1221            if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1222                privileges.push((
1223                    SystemObjectId::Object(ObjectId::Database(id_b)),
1224                    AclMode::CREATE,
1225                    role_id,
1226                ));
1227            }
1228
1229            RbacRequirements {
1230                ownership: vec![
1231                    ObjectId::Schema(*schema_a_spec),
1232                    ObjectId::Schema(*schema_b_spec),
1233                ],
1234                privileges,
1235                ..Default::default()
1236            }
1237        }
1238        Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1239            ownership: vec![ObjectId::Item(*id)],
1240            item_usage: &CREATE_ITEM_USAGE,
1241            ..Default::default()
1242        },
1243        Plan::AlterRole(plan::AlterRolePlan {
1244            id,
1245            name: _,
1246            option,
1247        }) => match option {
1248            // Only superusers can alter the superuserness of a role.
1249            plan::PlannedAlterRoleOption::Attributes(attributes)
1250                if attributes.superuser.is_some() =>
1251            {
1252                RbacRequirements {
1253                    superuser_action: Some("alter superuser role".to_string()),
1254                    ..Default::default()
1255                }
1256            }
1257            // Roles are allowed to change their own password, but only if
1258            // password is the sole attribute being changed.
1259            plan::PlannedAlterRoleOption::Attributes(plan::PlannedRoleAttributes {
1260                password,
1261                // scram_iterations and nopassword are password-related, so
1262                // they're fine to change alongside the password.
1263                scram_iterations: _,
1264                nopassword: _,
1265                // superuser is already handled by the match arm above, so it
1266                // will always be None here.
1267                superuser: None,
1268                inherit: None,
1269                login: None,
1270            }) if password.is_some() && role_id == *id => RbacRequirements::default(),
1271            // But no one elses...
1272            plan::PlannedAlterRoleOption::Attributes(attributes)
1273                if attributes.password.is_some() && role_id != *id =>
1274            {
1275                RbacRequirements {
1276                    superuser_action: Some("alter password of role".to_string()),
1277                    ..Default::default()
1278                }
1279            }
1280            // restrict_to_user_objects can only be set by superuser.
1281            // SECURITY: This must use case-insensitive comparison because
1282            // var.name() comes from Ident::to_string() which preserves the
1283            // original casing for quoted identifiers.
1284            plan::PlannedAlterRoleOption::Variable(var)
1285                if var.name().eq_ignore_ascii_case("restrict_to_user_objects") =>
1286            {
1287                RbacRequirements {
1288                    superuser_action: Some("set restrict_to_user_objects".to_string()),
1289                    ..Default::default()
1290                }
1291            }
1292            // Roles are allowed to change their own other variables.
1293            plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1294                RbacRequirements::default()
1295            }
1296            // Otherwise to ALTER a role, you need to have the CREATE_ROLE privilege.
1297            _ => RbacRequirements {
1298                privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1299                item_usage: &CREATE_ITEM_USAGE,
1300                ..Default::default()
1301            },
1302        },
1303        Plan::AlterOwner(plan::AlterOwnerPlan {
1304            id,
1305            object_type: _,
1306            new_owner,
1307        }) => {
1308            let privileges = match id {
1309                ObjectId::ClusterReplica((cluster_id, _)) => {
1310                    vec![(
1311                        SystemObjectId::Object(cluster_id.into()),
1312                        AclMode::CREATE,
1313                        role_id,
1314                    )]
1315                }
1316                ObjectId::Schema((database_spec, _)) => match database_spec {
1317                    ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1318                    ResolvedDatabaseSpecifier::Id(database_id) => {
1319                        vec![(
1320                            SystemObjectId::Object(database_id.into()),
1321                            AclMode::CREATE,
1322                            role_id,
1323                        )]
1324                    }
1325                },
1326                ObjectId::Item(item_id) => {
1327                    let item = catalog.get_item(item_id);
1328                    vec![(
1329                        SystemObjectId::Object(item.name().qualifiers.clone().into()),
1330                        AclMode::CREATE,
1331                        role_id,
1332                    )]
1333                }
1334                ObjectId::Cluster(_)
1335                | ObjectId::Database(_)
1336                | ObjectId::Role(_)
1337                | ObjectId::NetworkPolicy(_) => Vec::new(),
1338            };
1339            RbacRequirements {
1340                role_membership: BTreeSet::from([*new_owner]),
1341                ownership: vec![id.clone()],
1342                privileges,
1343                ..Default::default()
1344            }
1345        }
1346        Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1347            ownership: vec![ObjectId::Item(*relation_id)],
1348            item_usage: &CREATE_ITEM_USAGE,
1349            ..Default::default()
1350        },
1351        Plan::AlterMaterializedViewApplyReplacement(
1352            plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1353        ) => RbacRequirements {
1354            ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1355            item_usage: &CREATE_ITEM_USAGE,
1356            ..Default::default()
1357        },
1358        Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1359            ownership: vec![ObjectId::NetworkPolicy(*id)],
1360            item_usage: &CREATE_ITEM_USAGE,
1361            ..Default::default()
1362        },
1363        Plan::ReadThenWrite(plan::ReadThenWritePlan {
1364            id,
1365            selection,
1366            finishing: _,
1367            assignments,
1368            kind,
1369            returning,
1370        }) => {
1371            let acl_mode = match kind {
1372                MutationKind::Insert => AclMode::INSERT,
1373                MutationKind::Update => AclMode::UPDATE,
1374                MutationKind::Delete => AclMode::DELETE,
1375            };
1376            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1377            let mut privileges = vec![
1378                (
1379                    SystemObjectId::Object(schema_id.clone()),
1380                    AclMode::USAGE,
1381                    role_id,
1382                ),
1383                (SystemObjectId::Object(id.into()), acl_mode, role_id),
1384            ];
1385            let mut seen = BTreeSet::from([(schema_id, role_id)]);
1386
1387            // We don't allow arbitrary sub-queries in `assignments` or `returning`. So either they
1388            // contains a column reference to the outer table or it's constant.
1389            if assignments
1390                .values()
1391                .chain(returning.iter())
1392                .any(|assignment| assignment.contains_column())
1393            {
1394                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1395                seen.insert((id.into(), role_id));
1396            }
1397
1398            // TODO(jkosh44) It's fairly difficult to determine what part of `selection` is from a
1399            //  user specified read and what part is from the implementation of the read then write.
1400            //  instead we are overly protective and always require SELECT privileges even though
1401            //  PostgreSQL doesn't always do this.
1402            //  As a concrete example, we require SELECT and UPDATE privileges to execute
1403            //  `UPDATE t SET a = 42;`, while PostgreSQL only requires UPDATE privileges.
1404            let items = selection
1405                .depends_on()
1406                .into_iter()
1407                .map(|gid| catalog.resolve_item_id(&gid));
1408            privileges.extend_from_slice(&generate_read_privileges_inner(
1409                catalog, items, role_id, &mut seen,
1410            ));
1411
1412            if let Some(privilege) = generate_cluster_usage_privileges(
1413                selection.as_const().is_some(),
1414                target_cluster_id,
1415                role_id,
1416            ) {
1417                privileges.push(privilege);
1418            }
1419            RbacRequirements {
1420                privileges,
1421                ..Default::default()
1422            }
1423        }
1424        Plan::GrantRole(plan::GrantRolePlan {
1425            role_ids: _,
1426            member_ids: _,
1427            grantor_id: _,
1428        })
1429        | Plan::RevokeRole(plan::RevokeRolePlan {
1430            role_ids: _,
1431            member_ids: _,
1432            grantor_id: _,
1433        }) => RbacRequirements {
1434            privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1435            ..Default::default()
1436        },
1437        Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1438            update_privileges,
1439            grantees: _,
1440        })
1441        | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1442            update_privileges,
1443            revokees: _,
1444        }) => {
1445            let mut privileges = Vec::with_capacity(update_privileges.len());
1446            for UpdatePrivilege { target_id, .. } in update_privileges {
1447                match target_id {
1448                    SystemObjectId::Object(object_id) => match object_id {
1449                        ObjectId::ClusterReplica((cluster_id, _)) => {
1450                            privileges.push((
1451                                SystemObjectId::Object(cluster_id.into()),
1452                                AclMode::USAGE,
1453                                role_id,
1454                            ));
1455                        }
1456                        ObjectId::Schema((database_spec, _)) => match database_spec {
1457                            ResolvedDatabaseSpecifier::Ambient => {}
1458                            ResolvedDatabaseSpecifier::Id(database_id) => {
1459                                privileges.push((
1460                                    SystemObjectId::Object(database_id.into()),
1461                                    AclMode::USAGE,
1462                                    role_id,
1463                                ));
1464                            }
1465                        },
1466                        ObjectId::Item(item_id) => {
1467                            let item = catalog.get_item(item_id);
1468                            privileges.push((
1469                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
1470                                AclMode::USAGE,
1471                                role_id,
1472                            ))
1473                        }
1474                        ObjectId::Cluster(_)
1475                        | ObjectId::Database(_)
1476                        | ObjectId::Role(_)
1477                        | ObjectId::NetworkPolicy(_) => {}
1478                    },
1479                    SystemObjectId::System => {}
1480                }
1481            }
1482            RbacRequirements {
1483                ownership: update_privileges
1484                    .iter()
1485                    .filter_map(|update_privilege| update_privilege.target_id.object_id())
1486                    .cloned()
1487                    .collect(),
1488                privileges,
1489                // To grant/revoke a privilege on some object, generally the grantor/revoker must be the
1490                // owner of that object (or have a grant option on that object which isn't implemented in
1491                // Materialize yet). There is no owner of the entire system, so it's only reasonable to
1492                // restrict granting/revoking system privileges to superusers.
1493                superuser_action: if update_privileges
1494                    .iter()
1495                    .any(|update_privilege| update_privilege.target_id.is_system())
1496                {
1497                    Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1498                } else {
1499                    None
1500                },
1501                ..Default::default()
1502            }
1503        }
1504        Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1505            privilege_objects,
1506            privilege_acl_items: _,
1507            is_grant: _,
1508        }) => RbacRequirements {
1509            role_membership: privilege_objects
1510                .iter()
1511                .map(|privilege_object| privilege_object.role_id)
1512                .collect(),
1513            privileges: privilege_objects
1514                .into_iter()
1515                .filter_map(|privilege_object| {
1516                    if let (Some(database_id), Some(_)) =
1517                        (privilege_object.database_id, privilege_object.schema_id)
1518                    {
1519                        Some((
1520                            SystemObjectId::Object(database_id.into()),
1521                            AclMode::USAGE,
1522                            role_id,
1523                        ))
1524                    } else {
1525                        None
1526                    }
1527                })
1528                .collect(),
1529            // Altering the default privileges for the PUBLIC role (aka ALL ROLES) will affect all roles
1530            // that currently exist and roles that will exist in the future. It's impossible for an exising
1531            // role to be a member of a role that doesn't exist yet, so no current role could possibly have
1532            // the privileges required to alter default privileges for the PUBLIC role. Therefore we
1533            // only superusers can alter default privileges for the PUBLIC role.
1534            superuser_action: if privilege_objects
1535                .iter()
1536                .any(|privilege_object| privilege_object.role_id.is_public())
1537            {
1538                Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1539            } else {
1540                None
1541            },
1542            ..Default::default()
1543        },
1544        Plan::ReassignOwned(plan::ReassignOwnedPlan {
1545            old_roles,
1546            new_role,
1547            reassign_ids: _,
1548        }) => RbacRequirements {
1549            role_membership: old_roles
1550                .into_iter()
1551                .cloned()
1552                .chain(iter::once(*new_role))
1553                .collect(),
1554            ..Default::default()
1555        },
1556        Plan::SideEffectingFunc(func) => {
1557            let role_membership = match func {
1558                // A `NULL` argument cancels no connection (the function returns
1559                // `NULL`), so there is no role membership to require.
1560                SideEffectingFunc::PgCancelBackend {
1561                    connection_id: None,
1562                } => BTreeSet::new(),
1563                SideEffectingFunc::PgCancelBackend {
1564                    connection_id: Some(connection_id),
1565                } => active_conns.expect("active_conns is required for Plan::SideEffectingFunc")(
1566                    *connection_id,
1567                )
1568                .map(|x| [x].into())
1569                .unwrap_or_default(),
1570            };
1571            RbacRequirements {
1572                role_membership,
1573                ..Default::default()
1574            }
1575        }
1576        Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1577            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1578            RbacRequirements {
1579                privileges: vec![
1580                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1581                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1582                ],
1583                ..Default::default()
1584            }
1585        }
1586        Plan::DiscardTemp
1587        | Plan::DiscardAll
1588        | Plan::EmptyQuery
1589        | Plan::ShowAllVariables
1590        | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1591        | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1592        | Plan::SetVariable(plan::SetVariablePlan {
1593            name: _,
1594            value: _,
1595            local: _,
1596        })
1597        | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1598        | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1599        | Plan::StartTransaction(plan::StartTransactionPlan {
1600            access: _,
1601            isolation_level: _,
1602        })
1603        | Plan::CommitTransaction(plan::CommitTransactionPlan {
1604            transaction_type: _,
1605        })
1606        | Plan::AbortTransaction(plan::AbortTransactionPlan {
1607            transaction_type: _,
1608        })
1609        | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1610        | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1611        | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1612        | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1613        | Plan::Declare(plan::DeclarePlan {
1614            name: _,
1615            stmt: _,
1616            sql: _,
1617            params: _,
1618        })
1619        | Plan::Fetch(plan::FetchPlan {
1620            name: _,
1621            count: _,
1622            timeout: _,
1623        })
1624        | Plan::Close(plan::ClosePlan { name: _ })
1625        | Plan::Prepare(plan::PreparePlan {
1626            name: _,
1627            stmt: _,
1628            desc: _,
1629            sql: _,
1630        })
1631        | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1632        | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1633        | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1634    }
1635}
1636
1637/// Reports whether any role has ownership over an object.
1638fn check_owner_roles(
1639    object_id: &ObjectId,
1640    role_ids: &BTreeSet<RoleId>,
1641    catalog: &impl SessionCatalog,
1642) -> bool {
1643    if let Some(owner_id) = catalog.get_owner_id(object_id) {
1644        role_ids.contains(&owner_id)
1645    } else {
1646        true
1647    }
1648}
1649
1650fn ownership_err(
1651    unheld_ownership: Vec<ObjectId>,
1652    catalog: &impl SessionCatalog,
1653) -> Result<(), UnauthorizedError> {
1654    if !unheld_ownership.is_empty() {
1655        let objects = unheld_ownership
1656            .into_iter()
1657            .map(|ownership| match ownership {
1658                ObjectId::Cluster(id) => (
1659                    ObjectType::Cluster,
1660                    catalog.get_cluster(id).name().to_string(),
1661                ),
1662                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1663                    let cluster = catalog.get_cluster(cluster_id);
1664                    let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1665                    // Note: using unchecked here is okay because the values are coming from an
1666                    // already existing name.
1667                    let name = QualifiedReplica {
1668                        cluster: Ident::new_unchecked(cluster.name()),
1669                        replica: Ident::new_unchecked(replica.name()),
1670                    };
1671                    (ObjectType::ClusterReplica, name.to_string())
1672                }
1673                ObjectId::Database(id) => (
1674                    ObjectType::Database,
1675                    catalog.get_database(&id).name().to_string(),
1676                ),
1677                ObjectId::Schema((database_spec, schema_spec)) => {
1678                    let schema = catalog.get_schema(&database_spec, &schema_spec);
1679                    let name = catalog.resolve_full_schema_name(schema.name());
1680                    (ObjectType::Schema, name.to_string())
1681                }
1682                ObjectId::Item(id) => {
1683                    let item = catalog.get_item(&id);
1684                    let name = catalog.resolve_full_name(item.name());
1685                    (item.item_type().into(), name.to_string())
1686                }
1687                ObjectId::NetworkPolicy(id) => (
1688                    ObjectType::NetworkPolicy,
1689                    catalog.get_network_policy(&id).name().to_string(),
1690                ),
1691                ObjectId::Role(_) => unreachable!("roles have no owner"),
1692            })
1693            .collect();
1694        Err(UnauthorizedError::Ownership { objects })
1695    } else {
1696        Ok(())
1697    }
1698}
1699
1700fn generate_required_source_privileges(
1701    name: &QualifiedItemName,
1702    data_source: &DataSourceDesc,
1703    in_cluster: Option<ClusterId>,
1704    role_id: RoleId,
1705) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1706    let mut privileges = vec![(
1707        SystemObjectId::Object(name.qualifiers.clone().into()),
1708        AclMode::CREATE,
1709        role_id,
1710    )];
1711    match (data_source, in_cluster) {
1712        (_, Some(id)) => {
1713            privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1714        }
1715        (DataSourceDesc::Ingestion(_), None) => {
1716            privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1717        }
1718        // Non-ingestion data-sources have meaningless cluster config's (for now...) and they need
1719        // to be ignored.
1720        // This feels very brittle, but there's not much we can do until the UNDEFINED cluster
1721        // config is removed.
1722        (_, None) => {}
1723    }
1724    privileges
1725}
1726
1727/// Generates all the privileges required to execute a read that includes the objects in `ids`.
1728///
1729/// Not only do we need to validate that `role_id` has read privileges on all relations in `ids`,
1730/// but if any object is a view or materialized view then we need to validate that the owner of
1731/// that view has all of the privileges required to execute the query within the view.
1732///
1733/// For more details see: <https://www.postgresql.org/docs/15/rules-privileges.html>
1734fn generate_read_privileges(
1735    catalog: &impl SessionCatalog,
1736    ids: impl Iterator<Item = CatalogItemId>,
1737    role_id: RoleId,
1738) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1739    generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1740}
1741
1742fn generate_read_privileges_inner(
1743    catalog: &impl SessionCatalog,
1744    ids: impl Iterator<Item = CatalogItemId>,
1745    role_id: RoleId,
1746    seen: &mut BTreeSet<(ObjectId, RoleId)>,
1747) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1748    let mut privileges = Vec::new();
1749    let mut views = Vec::new();
1750
1751    for id in ids {
1752        if seen.insert((id.into(), role_id)) {
1753            let item = catalog.get_item(&id);
1754            let schema_id: ObjectId = item.name().qualifiers.clone().into();
1755            if seen.insert((schema_id.clone(), role_id)) {
1756                privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1757            }
1758            match item.item_type() {
1759                CatalogItemType::View | CatalogItemType::MaterializedView => {
1760                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1761                    views.push((item.references().items().copied(), item.owner_id()));
1762                }
1763                CatalogItemType::Table | CatalogItemType::Source => {
1764                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1765                }
1766                CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1767                    privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1768                }
1769                CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1770            }
1771        }
1772    }
1773
1774    for (view_ids, view_owner) in views {
1775        privileges.extend_from_slice(&generate_read_privileges_inner(
1776            catalog, view_ids, view_owner, seen,
1777        ));
1778    }
1779
1780    privileges
1781}
1782
1783fn generate_usage_privileges(
1784    catalog: &impl SessionCatalog,
1785    ids: &ResolvedIds,
1786    role_id: RoleId,
1787    item_types: &BTreeSet<CatalogItemType>,
1788) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1789    // Use a `BTreeSet` to remove duplicate privileges.
1790    ids.items()
1791        .filter_map(move |id| {
1792            let item = catalog.get_item(id);
1793            if item_types.contains(&item.item_type()) {
1794                let schema_id = item.name().qualifiers.clone().into();
1795                Some([
1796                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1797                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1798                ])
1799            } else {
1800                None
1801            }
1802        })
1803        .flatten()
1804        .collect()
1805}
1806
1807fn generate_cluster_usage_privileges(
1808    expr_is_const: bool,
1809    target_cluster_id: Option<ClusterId>,
1810    role_id: RoleId,
1811) -> Option<(SystemObjectId, AclMode, RoleId)> {
1812    // TODO(jkosh44) expr hasn't been fully optimized yet, so it might actually be a constant,
1813    //  but we mistakenly think that it's not. For now it's ok to be overly protective.
1814    if !expr_is_const {
1815        if let Some(cluster_id) = target_cluster_id {
1816            return Some((
1817                SystemObjectId::Object(cluster_id.into()),
1818                AclMode::USAGE,
1819                role_id,
1820            ));
1821        }
1822    }
1823
1824    None
1825}
1826
1827fn check_object_privileges(
1828    catalog: &impl SessionCatalog,
1829    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1830    role_membership: BTreeSet<RoleId>,
1831    current_role_id: RoleId,
1832) -> Result<(), UnauthorizedError> {
1833    let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1834    role_memberships.insert(current_role_id, role_membership);
1835    for (object_id, acl_mode, role_id) in privileges {
1836        // Temporary schemas are owned by the connection that created them,
1837        // so users implicitly have all privileges on their own temp schema.
1838        // The schema may not exist yet (lazy creation), so we skip the check.
1839        if matches!(
1840            &object_id,
1841            SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary)))
1842        ) {
1843            continue;
1844        }
1845
1846        let role_membership = role_memberships
1847            .entry(role_id)
1848            .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1849        let object_privileges = catalog
1850            .get_privileges(&object_id)
1851            .expect("only object types with privileges will generate required privileges");
1852        let role_privileges = role_membership
1853            .iter()
1854            .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1855            .map(|mz_acl_item| mz_acl_item.acl_mode)
1856            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1857        if !role_privileges.contains(acl_mode) {
1858            let role_name = catalog.get_role(&role_id).name().to_string();
1859            let privileges = acl_mode.to_error_string();
1860            return Err(UnauthorizedError::Privilege {
1861                object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1862                role_name,
1863                privileges,
1864            });
1865        }
1866    }
1867
1868    Ok(())
1869}
1870
1871pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1872    const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1873        .union(AclMode::SELECT)
1874        .union(AclMode::UPDATE)
1875        .union(AclMode::DELETE);
1876    const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1877    const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1878        .union(AclMode::CREATE_DB)
1879        .union(AclMode::CREATE_CLUSTER)
1880        .union(AclMode::CREATE_NETWORK_POLICY);
1881
1882    const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1883    match object_type {
1884        SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1885        SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1886        SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1887        SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1888        SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1889        SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1890        SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1891        SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1892        SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1893        SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1894        SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1895        SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1896        SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1897        SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1898        SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1899        SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1900        SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1901    }
1902}
1903
1904pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1905    MzAclItem {
1906        grantee: owner_id,
1907        grantor: owner_id,
1908        acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1909    }
1910}
1911
1912const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1913    match object_type {
1914        ObjectType::Table
1915        | ObjectType::View
1916        | ObjectType::MaterializedView
1917        | ObjectType::Source => AclMode::SELECT,
1918        ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1919        ObjectType::Sink
1920        | ObjectType::Index
1921        | ObjectType::Role
1922        | ObjectType::Cluster
1923        | ObjectType::ClusterReplica
1924        | ObjectType::Secret
1925        | ObjectType::Connection
1926        | ObjectType::Database
1927        | ObjectType::Func
1928        | ObjectType::NetworkPolicy => AclMode::empty(),
1929    }
1930}
1931
1932pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1933    let acl_mode = default_builtin_object_acl_mode(object_type);
1934    MzAclItem {
1935        grantee: MZ_SUPPORT_ROLE_ID,
1936        grantor: MZ_SYSTEM_ROLE_ID,
1937        acl_mode,
1938    }
1939}
1940
1941pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1942    let acl_mode = default_builtin_object_acl_mode(object_type);
1943    MzAclItem {
1944        grantee: RoleId::Public,
1945        grantor: MZ_SYSTEM_ROLE_ID,
1946        acl_mode,
1947    }
1948}