mz_sql/
rbac.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26    CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29    CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30    SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34    DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40/// Common checks that need to be performed before we can start checking a role's privileges.
41fn rbac_check_preamble(
42    catalog: &impl SessionCatalog,
43    session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45    // PostgreSQL allows users that have their role dropped to perform some actions,
46    // such as `SET ROLE` and certain `SELECT` queries. We haven't implemented
47    // `SET ROLE` and feel it's safer to force to user to re-authenticate if their
48    // role is dropped.
49    if catalog
50        .try_get_role(&session_meta.role_metadata().current_role)
51        .is_none()
52    {
53        return Err(UnauthorizedError::ConcurrentRoleDrop(
54            session_meta.role_metadata().current_role.clone(),
55        ));
56    };
57    if catalog
58        .try_get_role(&session_meta.role_metadata().session_role)
59        .is_none()
60    {
61        return Err(UnauthorizedError::ConcurrentRoleDrop(
62            session_meta.role_metadata().session_role.clone(),
63        ));
64    };
65    if catalog
66        .try_get_role(&session_meta.role_metadata().authenticated_role)
67        .is_none()
68    {
69        return Err(UnauthorizedError::ConcurrentRoleDrop(
70            session_meta.role_metadata().authenticated_role.clone(),
71        ));
72    };
73
74    Ok(())
75}
76
77/// Filters `RbacRequirements` based on the session role metadata and RBAC related feature flags.
78fn filter_requirements(
79    catalog: &impl SessionCatalog,
80    session_meta: &dyn SessionMetadata,
81    rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83    // Skip RBAC non-mandatory checks if RBAC is disabled. However, we never skip RBAC checks for
84    // system roles. This allows us to limit access of system users even when RBAC is off.
85    let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86        && !session_meta.role_metadata().current_role.is_system()
87        && !session_meta.role_metadata().session_role.is_system();
88    // Skip RBAC checks on user items if the session is a superuser.
89    let is_superuser = session_meta.is_superuser();
90    if is_rbac_disabled || is_superuser {
91        return rbac_requirements.filter_to_mandatory_requirements();
92    }
93
94    rbac_requirements
95}
96
97// The default item types that most statements require USAGE privileges for.
98static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99    btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101// CREATE statements require USAGE privileges on the default item types and USAGE privileges on
102// Types.
103pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104    let mut items = DEFAULT_ITEM_USAGE.clone();
105    items.insert(CatalogItemType::Type);
106    items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110/// Errors that can occur due to an unauthorized action.
111#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113    /// The action can only be performed by a superuser.
114    #[error("permission denied to {action}")]
115    Superuser { action: String },
116    /// The action requires ownership of an object.
117    #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118    Ownership { objects: Vec<(ObjectType, String)> },
119    /// Altering an owner requires membership of the new owner role.
120    #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121    RoleMembership { role_names: Vec<String> },
122    /// The action requires one or more privileges.
123    #[error("permission denied for {object_description}")]
124    Privilege {
125        object_description: ErrorMessageObjectDescription,
126        role_name: String,
127        privileges: String,
128    },
129    // TODO(jkosh44) When we implement parameter privileges, this can be replaced with a regular
130    //  privilege error.
131    /// The action can only be performed by the mz_system role.
132    #[error("permission denied to {action}")]
133    MzSystem { action: String },
134    /// The action cannot be performed by the mz_support role.
135    #[error("permission denied to {action}")]
136    MzSupport { action: String },
137    /// The active role was dropped while a user was logged in.
138    #[error("role {0} was concurrently dropped")]
139    ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143    pub fn detail(&self) -> Option<String> {
144        match &self {
145            UnauthorizedError::Superuser { action } => {
146                Some(format!("You must be a superuser to {}", action))
147            }
148            UnauthorizedError::Privilege {
149                object_description,
150                role_name,
151                privileges,
152            } => Some(format!(
153                "The '{role_name}' role needs {privileges} privileges on {object_description}"
154            )),
155            UnauthorizedError::MzSystem { .. } => {
156                Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157            }
158            UnauthorizedError::MzSupport { .. } => Some(format!(
159                "The '{}' role has very limited privileges",
160                SUPPORT_USER.name
161            )),
162            UnauthorizedError::ConcurrentRoleDrop(_) => {
163                Some("Please disconnect and re-connect with a valid role.".into())
164            }
165            UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166        }
167    }
168}
169
170/// RBAC requirements for executing a given plan.
171#[derive(Debug)]
172struct RbacRequirements {
173    /// The role memberships required.
174    role_membership: BTreeSet<RoleId>,
175    /// The object ownerships required.
176    ownership: Vec<ObjectId>,
177    /// The privileges required. The tuples are of the form:
178    /// (What object the privilege is on, What privilege is required, Who must possess the privilege).
179    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180    /// The types of catalog items that this plan requires USAGE privileges on.
181    ///
182    /// Most plans will require USAGE on secrets and connections but some plans, like SHOW CREATE,
183    /// can reference an item without requiring any privileges on that item.
184    item_usage: &'static BTreeSet<CatalogItemType>,
185    /// Some action if superuser is required to perform that action, None otherwise.
186    superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190    fn empty() -> RbacRequirements {
191        RbacRequirements {
192            role_membership: BTreeSet::new(),
193            ownership: Vec::new(),
194            privileges: Vec::new(),
195            item_usage: &EMPTY_ITEM_USAGE,
196            superuser_action: None,
197        }
198    }
199
200    fn validate(
201        self,
202        catalog: &impl SessionCatalog,
203        session: &dyn SessionMetadata,
204        resolved_ids: &ResolvedIds,
205    ) -> Result<(), UnauthorizedError> {
206        // Obtain all roles that the current session is a member of.
207        let role_membership =
208            catalog.collect_role_membership(&session.role_metadata().current_role);
209
210        check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212        // Validate that the current session has the required role membership to execute the provided
213        // plan.
214        let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215        if !unheld_membership.is_empty() {
216            let role_names = unheld_membership
217                .into_iter()
218                .map(|role_id| {
219                    // Some role references may no longer exist due to concurrent drops.
220                    catalog
221                        .try_get_role(role_id)
222                        .map(|role| role.name().to_string())
223                        .unwrap_or_else(|| role_id.to_string())
224                })
225                .collect();
226            return Err(UnauthorizedError::RoleMembership { role_names });
227        }
228
229        // Validate that the current session has the required object ownership to execute the provided
230        // plan.
231        let unheld_ownership = self
232            .ownership
233            .into_iter()
234            .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235            .collect();
236        ownership_err(unheld_ownership, catalog)?;
237
238        check_object_privileges(
239            catalog,
240            self.privileges,
241            role_membership,
242            session.role_metadata().current_role,
243        )?;
244
245        if let Some(action) = self.superuser_action {
246            return Err(UnauthorizedError::Superuser { action });
247        }
248
249        Ok(())
250    }
251
252    fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253        let RbacRequirements {
254            role_membership,
255            ownership,
256            privileges,
257            item_usage,
258            superuser_action: _,
259        } = self;
260        let role_membership = role_membership
261            .into_iter()
262            .filter(|id| id.is_system())
263            .collect();
264        let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265        let privileges = privileges
266            .into_iter()
267            .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268            // We allow reading objects for superusers and when RBAC is off.
269            .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270            .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271            .collect();
272        let superuser_action = None;
273        RbacRequirements {
274            role_membership,
275            ownership,
276            privileges,
277            item_usage,
278            superuser_action,
279        }
280    }
281}
282
283impl Default for RbacRequirements {
284    fn default() -> Self {
285        RbacRequirements {
286            role_membership: BTreeSet::new(),
287            ownership: Vec::new(),
288            privileges: Vec::new(),
289            item_usage: &DEFAULT_ITEM_USAGE,
290            superuser_action: None,
291        }
292    }
293}
294
295/// Checks if a `session` is authorized to use `resolved_ids`. If not, an error is returned.
296pub fn check_usage(
297    catalog: &impl SessionCatalog,
298    session: &dyn SessionMetadata,
299    resolved_ids: &ResolvedIds,
300    item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302    rbac_check_preamble(catalog, session)?;
303
304    // Obtain all roles that the current session is a member of.
305    let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307    // Certain statements depend on objects that haven't been created yet, like sub-sources, so we
308    // need to filter those out.
309    let existing_resolved_ids =
310        resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312    let required_privileges = generate_usage_privileges(
313        catalog,
314        &existing_resolved_ids,
315        session.role_metadata().current_role,
316        item_types,
317    )
318    .into_iter()
319    .collect();
320
321    let mut rbac_requirements = RbacRequirements::empty();
322    rbac_requirements.privileges = required_privileges;
323    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324    let required_privileges = rbac_requirements.privileges;
325
326    check_object_privileges(
327        catalog,
328        required_privileges,
329        role_membership,
330        session.role_metadata().current_role,
331    )?;
332
333    Ok(())
334}
335
336/// Checks if a session is authorized to execute a plan. If not, an error is returned.
337pub fn check_plan(
338    catalog: &impl SessionCatalog,
339    // Function mapping a connection ID to an authenticated role. The roles may have been dropped concurrently.
340    active_conns: impl FnOnce(u32) -> Option<RoleId>,
341    session: &dyn SessionMetadata,
342    plan: &Plan,
343    target_cluster_id: Option<ClusterId>,
344    resolved_ids: &ResolvedIds,
345) -> Result<(), UnauthorizedError> {
346    rbac_check_preamble(catalog, session)?;
347
348    let rbac_requirements = generate_rbac_requirements(
349        catalog,
350        plan,
351        active_conns,
352        target_cluster_id,
353        session.role_metadata().current_role,
354    );
355    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
356    debug!(
357        "rbac requirements {rbac_requirements:?} for plan {:?}",
358        PlanKind::from(plan)
359    );
360    rbac_requirements.validate(catalog, session, resolved_ids)
361}
362
363/// Returns true if RBAC is turned on for a session, false otherwise.
364pub fn is_rbac_enabled_for_session(
365    system_vars: &SystemVars,
366    session: &dyn SessionMetadata,
367) -> bool {
368    let server_enabled = system_vars.enable_rbac_checks();
369    let session_enabled = session.enable_session_rbac_checks();
370
371    // The session flag allows users to turn RBAC on for just their session while the server flag
372    // allows users to turn RBAC on for everyone.
373    server_enabled || session_enabled
374}
375
376/// Generates all requirements needed to execute a given plan.
377fn generate_rbac_requirements(
378    catalog: &impl SessionCatalog,
379    plan: &Plan,
380    active_conns: impl FnOnce(u32) -> Option<RoleId>,
381    target_cluster_id: Option<ClusterId>,
382    role_id: RoleId,
383) -> RbacRequirements {
384    match plan {
385        Plan::CreateConnection(plan::CreateConnectionPlan {
386            name,
387            if_not_exists: _,
388            connection: _,
389            validate: _,
390        }) => RbacRequirements {
391            privileges: vec![(
392                SystemObjectId::Object(name.qualifiers.clone().into()),
393                AclMode::CREATE,
394                role_id,
395            )],
396            item_usage: &CREATE_ITEM_USAGE,
397            ..Default::default()
398        },
399        Plan::CreateDatabase(plan::CreateDatabasePlan {
400            name: _,
401            if_not_exists: _,
402        }) => RbacRequirements {
403            privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
404            item_usage: &CREATE_ITEM_USAGE,
405            ..Default::default()
406        },
407        Plan::CreateSchema(plan::CreateSchemaPlan {
408            database_spec,
409            schema_name: _,
410            if_not_exists: _,
411        }) => {
412            let privileges = match database_spec {
413                ResolvedDatabaseSpecifier::Ambient => Vec::new(),
414                ResolvedDatabaseSpecifier::Id(database_id) => {
415                    vec![(
416                        SystemObjectId::Object(database_id.into()),
417                        AclMode::CREATE,
418                        role_id,
419                    )]
420                }
421            };
422            RbacRequirements {
423                privileges,
424                item_usage: &CREATE_ITEM_USAGE,
425                ..Default::default()
426            }
427        }
428        Plan::CreateRole(plan::CreateRolePlan {
429            name: _,
430            attributes,
431        }) => {
432            if attributes.superuser.unwrap_or(false) {
433                RbacRequirements {
434                    superuser_action: Some("create superuser role".to_string()),
435                    ..Default::default()
436                }
437            } else {
438                RbacRequirements {
439                    privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
440                    item_usage: &CREATE_ITEM_USAGE,
441                    ..Default::default()
442                }
443            }
444        }
445        Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
446            privileges: vec![(
447                SystemObjectId::System,
448                AclMode::CREATE_NETWORK_POLICY,
449                role_id,
450            )],
451            item_usage: &CREATE_ITEM_USAGE,
452            ..Default::default()
453        },
454        Plan::CreateCluster(plan::CreateClusterPlan {
455            name: _,
456            variant: _,
457            workload_class: _,
458        }) => RbacRequirements {
459            privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
460            item_usage: &CREATE_ITEM_USAGE,
461            ..Default::default()
462        },
463        Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
464            cluster_id,
465            name: _,
466            config: _,
467        }) => RbacRequirements {
468            ownership: vec![ObjectId::Cluster(*cluster_id)],
469            item_usage: &CREATE_ITEM_USAGE,
470            ..Default::default()
471        },
472        Plan::CreateSource(plan::CreateSourcePlan {
473            name,
474            source,
475            if_not_exists: _,
476            timeline: _,
477            in_cluster,
478        }) => RbacRequirements {
479            privileges: generate_required_source_privileges(
480                name,
481                &source.data_source,
482                *in_cluster,
483                role_id,
484            ),
485            item_usage: &CREATE_ITEM_USAGE,
486            ..Default::default()
487        },
488        Plan::CreateSources(plans) => RbacRequirements {
489            privileges: plans
490                .iter()
491                .flat_map(
492                    |plan::CreateSourcePlanBundle {
493                         item_id: _,
494                         global_id: _,
495                         plan:
496                             plan::CreateSourcePlan {
497                                 name,
498                                 source,
499                                 if_not_exists: _,
500                                 timeline: _,
501                                 in_cluster,
502                             },
503                         resolved_ids: _,
504                         available_source_references: _,
505                     }| {
506                        generate_required_source_privileges(
507                            name,
508                            &source.data_source,
509                            *in_cluster,
510                            role_id,
511                        )
512                        .into_iter()
513                    },
514                )
515                .collect(),
516            item_usage: &CREATE_ITEM_USAGE,
517            ..Default::default()
518        },
519        Plan::CreateSecret(plan::CreateSecretPlan {
520            name,
521            secret: _,
522            if_not_exists: _,
523        }) => RbacRequirements {
524            privileges: vec![(
525                SystemObjectId::Object(name.qualifiers.clone().into()),
526                AclMode::CREATE,
527                role_id,
528            )],
529            item_usage: &CREATE_ITEM_USAGE,
530            ..Default::default()
531        },
532        Plan::CreateSink(plan::CreateSinkPlan {
533            name,
534            sink,
535            with_snapshot: _,
536            if_not_exists: _,
537            in_cluster,
538        }) => {
539            let mut privileges = vec![(
540                SystemObjectId::Object(name.qualifiers.clone().into()),
541                AclMode::CREATE,
542                role_id,
543            )];
544            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
545            privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
546            privileges.push((
547                SystemObjectId::Object(in_cluster.into()),
548                AclMode::CREATE,
549                role_id,
550            ));
551            RbacRequirements {
552                privileges,
553                item_usage: &CREATE_ITEM_USAGE,
554                ..Default::default()
555            }
556        }
557        Plan::CreateTable(plan::CreateTablePlan {
558            name,
559            table: _,
560            if_not_exists: _,
561        }) => RbacRequirements {
562            privileges: vec![(
563                SystemObjectId::Object(name.qualifiers.clone().into()),
564                AclMode::CREATE,
565                role_id,
566            )],
567            item_usage: &CREATE_ITEM_USAGE,
568            ..Default::default()
569        },
570        Plan::CreateView(plan::CreateViewPlan {
571            name,
572            view: _,
573            replace,
574            drop_ids: _,
575            if_not_exists: _,
576            ambiguous_columns: _,
577        }) => RbacRequirements {
578            ownership: replace
579                .map(|id| vec![ObjectId::Item(id)])
580                .unwrap_or_default(),
581            privileges: vec![(
582                SystemObjectId::Object(name.qualifiers.clone().into()),
583                AclMode::CREATE,
584                role_id,
585            )],
586            item_usage: &CREATE_ITEM_USAGE,
587            ..Default::default()
588        },
589        Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
590            name,
591            materialized_view,
592            replace,
593            drop_ids: _,
594            if_not_exists: _,
595            ambiguous_columns: _,
596        }) => RbacRequirements {
597            ownership: replace
598                .map(|id| vec![ObjectId::Item(id)])
599                .unwrap_or_default(),
600            privileges: vec![
601                (
602                    SystemObjectId::Object(name.qualifiers.clone().into()),
603                    AclMode::CREATE,
604                    role_id,
605                ),
606                (
607                    SystemObjectId::Object(materialized_view.cluster_id.into()),
608                    AclMode::CREATE,
609                    role_id,
610                ),
611            ],
612            item_usage: &CREATE_ITEM_USAGE,
613            ..Default::default()
614        },
615        Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
616            name,
617            placeholder_id: _,
618            desc: _,
619            input_id: _,
620            with_snapshot: _,
621            continual_task,
622        }) => RbacRequirements {
623            privileges: vec![
624                (
625                    SystemObjectId::Object(name.qualifiers.clone().into()),
626                    AclMode::CREATE,
627                    role_id,
628                ),
629                (
630                    SystemObjectId::Object(continual_task.cluster_id.into()),
631                    AclMode::CREATE,
632                    role_id,
633                ),
634            ],
635            item_usage: &CREATE_ITEM_USAGE,
636            ..Default::default()
637        },
638        Plan::CreateIndex(plan::CreateIndexPlan {
639            name,
640            index,
641            if_not_exists: _,
642        }) => {
643            let index_on_item = catalog.resolve_item_id(&index.on);
644            RbacRequirements {
645                ownership: vec![ObjectId::Item(index_on_item)],
646                privileges: vec![
647                    (
648                        SystemObjectId::Object(name.qualifiers.clone().into()),
649                        AclMode::CREATE,
650                        role_id,
651                    ),
652                    (
653                        SystemObjectId::Object(index.cluster_id.into()),
654                        AclMode::CREATE,
655                        role_id,
656                    ),
657                ],
658                item_usage: &CREATE_ITEM_USAGE,
659                ..Default::default()
660            }
661        }
662        Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
663            privileges: vec![(
664                SystemObjectId::Object(name.qualifiers.clone().into()),
665                AclMode::CREATE,
666                role_id,
667            )],
668            item_usage: &CREATE_ITEM_USAGE,
669            ..Default::default()
670        },
671        Plan::Comment(plan::CommentPlan {
672            object_id,
673            sub_component: _,
674            comment: _,
675        }) => {
676            let (ownership, privileges) = match object_id {
677                // Roles don't have owners, instead we require the current session to have the
678                // `CREATEROLE` privilege.
679                CommentObjectId::Role(_) => (
680                    Vec::new(),
681                    vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
682                ),
683                _ => (vec![ObjectId::from(*object_id)], Vec::new()),
684            };
685            RbacRequirements {
686                ownership,
687                privileges,
688                ..Default::default()
689            }
690        }
691        Plan::DropObjects(plan::DropObjectsPlan {
692            referenced_ids,
693            drop_ids: _,
694            object_type,
695        }) => {
696            let privileges = if object_type == &ObjectType::Role {
697                vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
698            } else {
699                referenced_ids
700                    .iter()
701                    .filter_map(|id| match id {
702                        ObjectId::ClusterReplica((cluster_id, _)) => Some((
703                            SystemObjectId::Object(cluster_id.into()),
704                            AclMode::USAGE,
705                            role_id,
706                        )),
707                        ObjectId::Schema((database_spec, _)) => match database_spec {
708                            ResolvedDatabaseSpecifier::Ambient => None,
709                            ResolvedDatabaseSpecifier::Id(database_id) => Some((
710                                SystemObjectId::Object(database_id.into()),
711                                AclMode::USAGE,
712                                role_id,
713                            )),
714                        },
715                        ObjectId::Item(item_id) => {
716                            let item = catalog.get_item(item_id);
717                            Some((
718                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
719                                AclMode::USAGE,
720                                role_id,
721                            ))
722                        }
723                        ObjectId::Cluster(_)
724                        | ObjectId::Database(_)
725                        | ObjectId::Role(_)
726                        | ObjectId::NetworkPolicy(_) => None,
727                    })
728                    .collect()
729            };
730            RbacRequirements {
731                // Do not need ownership of descendant objects.
732                ownership: referenced_ids.clone(),
733                privileges,
734                ..Default::default()
735            }
736        }
737        Plan::DropOwned(plan::DropOwnedPlan {
738            role_ids,
739            drop_ids: _,
740            privilege_revokes: _,
741            default_privilege_revokes: _,
742        }) => RbacRequirements {
743            role_membership: role_ids.into_iter().cloned().collect(),
744            ..Default::default()
745        },
746        Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
747            let container_id = match id {
748                ObjectId::Item(id) => Some(SystemObjectId::Object(
749                    catalog.get_item(id).name().qualifiers.clone().into(),
750                )),
751                ObjectId::Schema((database_id, _schema_id)) => match database_id {
752                    ResolvedDatabaseSpecifier::Ambient => None,
753                    ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
754                },
755                ObjectId::Cluster(_)
756                | ObjectId::ClusterReplica(_)
757                | ObjectId::Database(_)
758                | ObjectId::Role(_)
759                | ObjectId::NetworkPolicy(_) => None,
760            };
761            let privileges = match container_id {
762                Some(id) => vec![(id, AclMode::USAGE, role_id)],
763                None => Vec::new(),
764            };
765            RbacRequirements {
766                privileges,
767                item_usage: &EMPTY_ITEM_USAGE,
768                ..Default::default()
769            }
770        }
771        Plan::ShowColumns(plan::ShowColumnsPlan {
772            id,
773            select_plan,
774            new_resolved_ids: _,
775        }) => {
776            let mut privileges = vec![(
777                SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
778                AclMode::USAGE,
779                role_id,
780            )];
781
782            for privilege in generate_rbac_requirements(
783                catalog,
784                &Plan::Select(select_plan.clone()),
785                active_conns,
786                target_cluster_id,
787                role_id,
788            )
789            .privileges
790            {
791                privileges.push(privilege);
792            }
793            RbacRequirements {
794                privileges,
795                ..Default::default()
796            }
797        }
798        Plan::Select(plan::SelectPlan {
799            source,
800            select: _,
801            when: _,
802            finishing: _,
803            copy_to: _,
804        }) => {
805            let items = source
806                .depends_on()
807                .into_iter()
808                .map(|gid| catalog.resolve_item_id(&gid));
809            let mut privileges = generate_read_privileges(catalog, items, role_id);
810            if let Some(privilege) = generate_cluster_usage_privileges(
811                source.as_const().is_some(),
812                target_cluster_id,
813                role_id,
814            ) {
815                privileges.push(privilege);
816            }
817            RbacRequirements {
818                privileges,
819                ..Default::default()
820            }
821        }
822        Plan::Subscribe(plan::SubscribePlan {
823            from,
824            with_snapshot: _,
825            when: _,
826            up_to: _,
827            copy_to: _,
828            emit_progress: _,
829            output: _,
830        }) => {
831            let items = from
832                .depends_on()
833                .into_iter()
834                .map(|gid| catalog.resolve_item_id(&gid));
835            let mut privileges = generate_read_privileges(catalog, items, role_id);
836            if let Some(cluster_id) = target_cluster_id {
837                privileges.push((
838                    SystemObjectId::Object(cluster_id.into()),
839                    AclMode::USAGE,
840                    role_id,
841                ));
842            }
843            RbacRequirements {
844                privileges,
845                ..Default::default()
846            }
847        }
848        Plan::CopyFrom(plan::CopyFromPlan {
849            target_name: _,
850            target_id,
851            source: _,
852            columns: _,
853            source_desc: _,
854            mfp: _,
855            params: _,
856            filter: _,
857        }) => RbacRequirements {
858            privileges: vec![
859                (
860                    SystemObjectId::Object(
861                        catalog.get_item(target_id).name().qualifiers.clone().into(),
862                    ),
863                    AclMode::USAGE,
864                    role_id,
865                ),
866                (
867                    SystemObjectId::Object(target_id.into()),
868                    AclMode::INSERT,
869                    role_id,
870                ),
871            ],
872            ..Default::default()
873        },
874        Plan::CopyTo(plan::CopyToPlan {
875            select_plan,
876            desc: _,
877            to: _,
878            connection: _,
879            connection_id: _,
880            format: _,
881            max_file_size: _,
882        }) => {
883            let items = select_plan
884                .source
885                .depends_on()
886                .into_iter()
887                .map(|gid| catalog.resolve_item_id(&gid));
888            let mut privileges = generate_read_privileges(catalog, items, role_id);
889            if let Some(cluster_id) = target_cluster_id {
890                privileges.push((
891                    SystemObjectId::Object(cluster_id.into()),
892                    AclMode::USAGE,
893                    role_id,
894                ));
895            }
896            RbacRequirements {
897                privileges,
898                ..Default::default()
899            }
900        }
901        Plan::ExplainPlan(plan::ExplainPlanPlan {
902            stage: _,
903            format: _,
904            config: _,
905            explainee,
906        })
907        | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
908            privileges: match explainee {
909                Explainee::View(id)
910                | Explainee::MaterializedView(id)
911                | Explainee::Index(id)
912                | Explainee::ReplanView(id)
913                | Explainee::ReplanMaterializedView(id)
914                | Explainee::ReplanIndex(id) => {
915                    let item = catalog.get_item(id);
916                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
917                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
918                }
919                Explainee::Statement(stmt) => stmt
920                    .depends_on()
921                    .into_iter()
922                    .map(|id| {
923                        let item = catalog.get_item_by_global_id(&id);
924                        let schema_id: ObjectId = item.name().qualifiers.clone().into();
925                        (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
926                    })
927                    .collect(),
928            },
929            item_usage: match explainee {
930                Explainee::View(..)
931                | Explainee::MaterializedView(..)
932                | Explainee::Index(..)
933                | Explainee::ReplanView(..)
934                | Explainee::ReplanMaterializedView(..)
935                | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
936                Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
937            },
938            ..Default::default()
939        },
940        Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
941            RbacRequirements {
942                privileges: {
943                    let item = catalog.get_item_by_global_id(sink_from);
944                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
945                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
946                },
947                item_usage: &EMPTY_ITEM_USAGE,
948                ..Default::default()
949            }
950        }
951        Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
952            format: _,
953            raw_plan,
954            when: _,
955        }) => RbacRequirements {
956            privileges: raw_plan
957                .depends_on()
958                .into_iter()
959                .map(|id| {
960                    let item = catalog.get_item_by_global_id(&id);
961                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
962                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
963                })
964                .collect(),
965            ..Default::default()
966        },
967        Plan::Insert(plan::InsertPlan {
968            id,
969            values,
970            returning,
971        }) => {
972            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
973            let mut privileges = vec![
974                (
975                    SystemObjectId::Object(schema_id.clone()),
976                    AclMode::USAGE,
977                    role_id,
978                ),
979                (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
980            ];
981            let mut seen = BTreeSet::from([(schema_id, role_id)]);
982
983            // We don't allow arbitrary sub-queries in `returning`. So either it
984            // contains a column reference to the outer table or it's constant.
985            if returning
986                .iter()
987                .any(|assignment| assignment.contains_column())
988            {
989                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
990                seen.insert((id.into(), role_id));
991            }
992
993            let items = values
994                .depends_on()
995                .into_iter()
996                .map(|gid| catalog.resolve_item_id(&gid));
997            privileges.extend_from_slice(&generate_read_privileges_inner(
998                catalog, items, role_id, &mut seen,
999            ));
1000
1001            if let Some(privilege) = generate_cluster_usage_privileges(
1002                values.as_const().is_some(),
1003                target_cluster_id,
1004                role_id,
1005            ) {
1006                privileges.push(privilege);
1007            } else if !returning.is_empty() {
1008                // TODO(jkosh44) returning may be a constant, but for now we are overly protective
1009                //  and require cluster privileges for all returning.
1010                if let Some(cluster_id) = target_cluster_id {
1011                    privileges.push((
1012                        SystemObjectId::Object(cluster_id.into()),
1013                        AclMode::USAGE,
1014                        role_id,
1015                    ));
1016                }
1017            }
1018            RbacRequirements {
1019                privileges,
1020                ..Default::default()
1021            }
1022        }
1023        Plan::AlterCluster(plan::AlterClusterPlan {
1024            id,
1025            name: _,
1026            options: _,
1027            strategy: _,
1028        }) => RbacRequirements {
1029            ownership: vec![ObjectId::Cluster(*id)],
1030            item_usage: &CREATE_ITEM_USAGE,
1031            ..Default::default()
1032        },
1033        Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1034            ownership: vec![ObjectId::Item(*id)],
1035            privileges: vec![(
1036                SystemObjectId::Object(set_cluster.into()),
1037                AclMode::CREATE,
1038                role_id,
1039            )],
1040            item_usage: &CREATE_ITEM_USAGE,
1041            ..Default::default()
1042        },
1043        Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1044            id,
1045            window: _,
1046            value: _,
1047            object_type: _,
1048        }) => RbacRequirements {
1049            ownership: vec![ObjectId::Item(*id)],
1050            item_usage: &CREATE_ITEM_USAGE,
1051            ..Default::default()
1052        },
1053        Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1054            ownership: vec![ObjectId::Item(*id)],
1055            ..Default::default()
1056        },
1057        Plan::AlterSource(plan::AlterSourcePlan {
1058            item_id,
1059            ingestion_id: _,
1060            action: _,
1061        }) => RbacRequirements {
1062            ownership: vec![ObjectId::Item(*item_id)],
1063            item_usage: &CREATE_ITEM_USAGE,
1064            ..Default::default()
1065        },
1066        Plan::AlterSink(plan::AlterSinkPlan {
1067            item_id,
1068            global_id: _,
1069            sink,
1070            with_snapshot: _,
1071            in_cluster,
1072        }) => {
1073            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1074            let mut privileges = generate_read_privileges(catalog, items, role_id);
1075            privileges.push((
1076                SystemObjectId::Object(in_cluster.into()),
1077                AclMode::CREATE,
1078                role_id,
1079            ));
1080            RbacRequirements {
1081                ownership: vec![ObjectId::Item(*item_id)],
1082                privileges,
1083                item_usage: &CREATE_ITEM_USAGE,
1084                ..Default::default()
1085            }
1086        }
1087        Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1088            id,
1089            name: _,
1090            to_name: _,
1091        }) => RbacRequirements {
1092            ownership: vec![ObjectId::Cluster(*id)],
1093            ..Default::default()
1094        },
1095        Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1096            id_a,
1097            id_b,
1098            name_a: _,
1099            name_b: _,
1100            name_temp: _,
1101        }) => RbacRequirements {
1102            ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1103            ..Default::default()
1104        },
1105        Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1106            cluster_id,
1107            replica_id,
1108            name: _,
1109            to_name: _,
1110        }) => RbacRequirements {
1111            ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1112            ..Default::default()
1113        },
1114        Plan::AlterItemRename(plan::AlterItemRenamePlan {
1115            id,
1116            current_full_name: _,
1117            to_name: _,
1118            object_type: _,
1119        }) => RbacRequirements {
1120            ownership: vec![ObjectId::Item(*id)],
1121            ..Default::default()
1122        },
1123        Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1124            cur_schema_spec,
1125            new_schema_name: _,
1126        }) => {
1127            let privileges = match cur_schema_spec.0 {
1128                ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1129                    SystemObjectId::Object(ObjectId::Database(db_id)),
1130                    AclMode::CREATE,
1131                    role_id,
1132                )],
1133                ResolvedDatabaseSpecifier::Ambient => vec![],
1134            };
1135
1136            RbacRequirements {
1137                ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1138                privileges,
1139                ..Default::default()
1140            }
1141        }
1142        Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1143            schema_a_spec,
1144            schema_a_name: _,
1145            schema_b_spec,
1146            schema_b_name: _,
1147            name_temp: _,
1148        }) => {
1149            let mut privileges = vec![];
1150            if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1151                privileges.push((
1152                    SystemObjectId::Object(ObjectId::Database(id_a)),
1153                    AclMode::CREATE,
1154                    role_id,
1155                ));
1156            }
1157            if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1158                privileges.push((
1159                    SystemObjectId::Object(ObjectId::Database(id_b)),
1160                    AclMode::CREATE,
1161                    role_id,
1162                ));
1163            }
1164
1165            RbacRequirements {
1166                ownership: vec![
1167                    ObjectId::Schema(*schema_a_spec),
1168                    ObjectId::Schema(*schema_b_spec),
1169                ],
1170                privileges,
1171                ..Default::default()
1172            }
1173        }
1174        Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1175            ownership: vec![ObjectId::Item(*id)],
1176            item_usage: &CREATE_ITEM_USAGE,
1177            ..Default::default()
1178        },
1179        Plan::AlterRole(plan::AlterRolePlan {
1180            id,
1181            name: _,
1182            option,
1183        }) => match option {
1184            // Only superusers can alter the superuserness of a role.
1185            plan::PlannedAlterRoleOption::Attributes(attributes)
1186                if attributes.superuser.unwrap_or(false) =>
1187            {
1188                RbacRequirements {
1189                    superuser_action: Some("alter superuser role".to_string()),
1190                    ..Default::default()
1191                }
1192            }
1193            // Roles are allowed to change their own password.
1194            plan::PlannedAlterRoleOption::Attributes(attributes)
1195                if attributes.password.is_some() && role_id == *id =>
1196            {
1197                RbacRequirements::default()
1198            }
1199            // But no one elses...
1200            plan::PlannedAlterRoleOption::Attributes(attributes)
1201                if attributes.password.is_some() =>
1202            {
1203                RbacRequirements {
1204                    superuser_action: Some("alter password of role".to_string()),
1205                    ..Default::default()
1206                }
1207            }
1208            // Roles are allowed to change their own variables.
1209            plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1210                RbacRequirements::default()
1211            }
1212            // Otherwise to ALTER a role, you need to have the CREATE_ROLE privilege.
1213            _ => RbacRequirements {
1214                privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1215                item_usage: &CREATE_ITEM_USAGE,
1216                ..Default::default()
1217            },
1218        },
1219        Plan::AlterOwner(plan::AlterOwnerPlan {
1220            id,
1221            object_type: _,
1222            new_owner,
1223        }) => {
1224            let privileges = match id {
1225                ObjectId::ClusterReplica((cluster_id, _)) => {
1226                    vec![(
1227                        SystemObjectId::Object(cluster_id.into()),
1228                        AclMode::CREATE,
1229                        role_id,
1230                    )]
1231                }
1232                ObjectId::Schema((database_spec, _)) => match database_spec {
1233                    ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1234                    ResolvedDatabaseSpecifier::Id(database_id) => {
1235                        vec![(
1236                            SystemObjectId::Object(database_id.into()),
1237                            AclMode::CREATE,
1238                            role_id,
1239                        )]
1240                    }
1241                },
1242                ObjectId::Item(item_id) => {
1243                    let item = catalog.get_item(item_id);
1244                    vec![(
1245                        SystemObjectId::Object(item.name().qualifiers.clone().into()),
1246                        AclMode::CREATE,
1247                        role_id,
1248                    )]
1249                }
1250                ObjectId::Cluster(_)
1251                | ObjectId::Database(_)
1252                | ObjectId::Role(_)
1253                | ObjectId::NetworkPolicy(_) => Vec::new(),
1254            };
1255            RbacRequirements {
1256                role_membership: BTreeSet::from([*new_owner]),
1257                ownership: vec![id.clone()],
1258                privileges,
1259                ..Default::default()
1260            }
1261        }
1262        Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1263            ownership: vec![ObjectId::Item(*relation_id)],
1264            item_usage: &CREATE_ITEM_USAGE,
1265            ..Default::default()
1266        },
1267        Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1268            ownership: vec![ObjectId::NetworkPolicy(*id)],
1269            item_usage: &CREATE_ITEM_USAGE,
1270            ..Default::default()
1271        },
1272        Plan::ReadThenWrite(plan::ReadThenWritePlan {
1273            id,
1274            selection,
1275            finishing: _,
1276            assignments,
1277            kind,
1278            returning,
1279        }) => {
1280            let acl_mode = match kind {
1281                MutationKind::Insert => AclMode::INSERT,
1282                MutationKind::Update => AclMode::UPDATE,
1283                MutationKind::Delete => AclMode::DELETE,
1284            };
1285            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1286            let mut privileges = vec![
1287                (
1288                    SystemObjectId::Object(schema_id.clone()),
1289                    AclMode::USAGE,
1290                    role_id,
1291                ),
1292                (SystemObjectId::Object(id.into()), acl_mode, role_id),
1293            ];
1294            let mut seen = BTreeSet::from([(schema_id, role_id)]);
1295
1296            // We don't allow arbitrary sub-queries in `assignments` or `returning`. So either they
1297            // contains a column reference to the outer table or it's constant.
1298            if assignments
1299                .values()
1300                .chain(returning.iter())
1301                .any(|assignment| assignment.contains_column())
1302            {
1303                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1304                seen.insert((id.into(), role_id));
1305            }
1306
1307            // TODO(jkosh44) It's fairly difficult to determine what part of `selection` is from a
1308            //  user specified read and what part is from the implementation of the read then write.
1309            //  instead we are overly protective and always require SELECT privileges even though
1310            //  PostgreSQL doesn't always do this.
1311            //  As a concrete example, we require SELECT and UPDATE privileges to execute
1312            //  `UPDATE t SET a = 42;`, while PostgreSQL only requires UPDATE privileges.
1313            let items = selection
1314                .depends_on()
1315                .into_iter()
1316                .map(|gid| catalog.resolve_item_id(&gid));
1317            privileges.extend_from_slice(&generate_read_privileges_inner(
1318                catalog, items, role_id, &mut seen,
1319            ));
1320
1321            if let Some(privilege) = generate_cluster_usage_privileges(
1322                selection.as_const().is_some(),
1323                target_cluster_id,
1324                role_id,
1325            ) {
1326                privileges.push(privilege);
1327            }
1328            RbacRequirements {
1329                privileges,
1330                ..Default::default()
1331            }
1332        }
1333        Plan::GrantRole(plan::GrantRolePlan {
1334            role_ids: _,
1335            member_ids: _,
1336            grantor_id: _,
1337        })
1338        | Plan::RevokeRole(plan::RevokeRolePlan {
1339            role_ids: _,
1340            member_ids: _,
1341            grantor_id: _,
1342        }) => RbacRequirements {
1343            privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1344            ..Default::default()
1345        },
1346        Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1347            update_privileges,
1348            grantees: _,
1349        })
1350        | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1351            update_privileges,
1352            revokees: _,
1353        }) => {
1354            let mut privileges = Vec::with_capacity(update_privileges.len());
1355            for UpdatePrivilege { target_id, .. } in update_privileges {
1356                match target_id {
1357                    SystemObjectId::Object(object_id) => match object_id {
1358                        ObjectId::ClusterReplica((cluster_id, _)) => {
1359                            privileges.push((
1360                                SystemObjectId::Object(cluster_id.into()),
1361                                AclMode::USAGE,
1362                                role_id,
1363                            ));
1364                        }
1365                        ObjectId::Schema((database_spec, _)) => match database_spec {
1366                            ResolvedDatabaseSpecifier::Ambient => {}
1367                            ResolvedDatabaseSpecifier::Id(database_id) => {
1368                                privileges.push((
1369                                    SystemObjectId::Object(database_id.into()),
1370                                    AclMode::USAGE,
1371                                    role_id,
1372                                ));
1373                            }
1374                        },
1375                        ObjectId::Item(item_id) => {
1376                            let item = catalog.get_item(item_id);
1377                            privileges.push((
1378                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
1379                                AclMode::USAGE,
1380                                role_id,
1381                            ))
1382                        }
1383                        ObjectId::Cluster(_)
1384                        | ObjectId::Database(_)
1385                        | ObjectId::Role(_)
1386                        | ObjectId::NetworkPolicy(_) => {}
1387                    },
1388                    SystemObjectId::System => {}
1389                }
1390            }
1391            RbacRequirements {
1392                ownership: update_privileges
1393                    .iter()
1394                    .filter_map(|update_privilege| update_privilege.target_id.object_id())
1395                    .cloned()
1396                    .collect(),
1397                privileges,
1398                // To grant/revoke a privilege on some object, generally the grantor/revoker must be the
1399                // owner of that object (or have a grant option on that object which isn't implemented in
1400                // Materialize yet). There is no owner of the entire system, so it's only reasonable to
1401                // restrict granting/revoking system privileges to superusers.
1402                superuser_action: if update_privileges
1403                    .iter()
1404                    .any(|update_privilege| update_privilege.target_id.is_system())
1405                {
1406                    Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1407                } else {
1408                    None
1409                },
1410                ..Default::default()
1411            }
1412        }
1413        Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1414            privilege_objects,
1415            privilege_acl_items: _,
1416            is_grant: _,
1417        }) => RbacRequirements {
1418            role_membership: privilege_objects
1419                .iter()
1420                .map(|privilege_object| privilege_object.role_id)
1421                .collect(),
1422            privileges: privilege_objects
1423                .into_iter()
1424                .filter_map(|privilege_object| {
1425                    if let (Some(database_id), Some(_)) =
1426                        (privilege_object.database_id, privilege_object.schema_id)
1427                    {
1428                        Some((
1429                            SystemObjectId::Object(database_id.into()),
1430                            AclMode::USAGE,
1431                            role_id,
1432                        ))
1433                    } else {
1434                        None
1435                    }
1436                })
1437                .collect(),
1438            // Altering the default privileges for the PUBLIC role (aka ALL ROLES) will affect all roles
1439            // that currently exist and roles that will exist in the future. It's impossible for an exising
1440            // role to be a member of a role that doesn't exist yet, so no current role could possibly have
1441            // the privileges required to alter default privileges for the PUBLIC role. Therefore we
1442            // only superusers can alter default privileges for the PUBLIC role.
1443            superuser_action: if privilege_objects
1444                .iter()
1445                .any(|privilege_object| privilege_object.role_id.is_public())
1446            {
1447                Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1448            } else {
1449                None
1450            },
1451            ..Default::default()
1452        },
1453        Plan::ReassignOwned(plan::ReassignOwnedPlan {
1454            old_roles,
1455            new_role,
1456            reassign_ids: _,
1457        }) => RbacRequirements {
1458            role_membership: old_roles
1459                .into_iter()
1460                .cloned()
1461                .chain(iter::once(*new_role))
1462                .collect(),
1463            ..Default::default()
1464        },
1465        Plan::SideEffectingFunc(func) => {
1466            let role_membership = match func {
1467                SideEffectingFunc::PgCancelBackend { connection_id } => {
1468                    active_conns(*connection_id)
1469                        .map(|x| [x].into())
1470                        .unwrap_or_default()
1471                }
1472            };
1473            RbacRequirements {
1474                role_membership,
1475                ..Default::default()
1476            }
1477        }
1478        Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1479            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1480            RbacRequirements {
1481                privileges: vec![
1482                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1483                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1484                ],
1485                ..Default::default()
1486            }
1487        }
1488        Plan::DiscardTemp
1489        | Plan::DiscardAll
1490        | Plan::EmptyQuery
1491        | Plan::ShowAllVariables
1492        | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1493        | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1494        | Plan::SetVariable(plan::SetVariablePlan {
1495            name: _,
1496            value: _,
1497            local: _,
1498        })
1499        | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1500        | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1501        | Plan::StartTransaction(plan::StartTransactionPlan {
1502            access: _,
1503            isolation_level: _,
1504        })
1505        | Plan::CommitTransaction(plan::CommitTransactionPlan {
1506            transaction_type: _,
1507        })
1508        | Plan::AbortTransaction(plan::AbortTransactionPlan {
1509            transaction_type: _,
1510        })
1511        | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1512        | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1513        | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1514        | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1515        | Plan::Declare(plan::DeclarePlan {
1516            name: _,
1517            stmt: _,
1518            sql: _,
1519            params: _,
1520        })
1521        | Plan::Fetch(plan::FetchPlan {
1522            name: _,
1523            count: _,
1524            timeout: _,
1525        })
1526        | Plan::Close(plan::ClosePlan { name: _ })
1527        | Plan::Prepare(plan::PreparePlan {
1528            name: _,
1529            stmt: _,
1530            desc: _,
1531            sql: _,
1532        })
1533        | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1534        | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1535        | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1536    }
1537}
1538
1539/// Reports whether any role has ownership over an object.
1540fn check_owner_roles(
1541    object_id: &ObjectId,
1542    role_ids: &BTreeSet<RoleId>,
1543    catalog: &impl SessionCatalog,
1544) -> bool {
1545    if let Some(owner_id) = catalog.get_owner_id(object_id) {
1546        role_ids.contains(&owner_id)
1547    } else {
1548        true
1549    }
1550}
1551
1552fn ownership_err(
1553    unheld_ownership: Vec<ObjectId>,
1554    catalog: &impl SessionCatalog,
1555) -> Result<(), UnauthorizedError> {
1556    if !unheld_ownership.is_empty() {
1557        let objects = unheld_ownership
1558            .into_iter()
1559            .map(|ownership| match ownership {
1560                ObjectId::Cluster(id) => (
1561                    ObjectType::Cluster,
1562                    catalog.get_cluster(id).name().to_string(),
1563                ),
1564                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1565                    let cluster = catalog.get_cluster(cluster_id);
1566                    let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1567                    // Note: using unchecked here is okay because the values are coming from an
1568                    // already existing name.
1569                    let name = QualifiedReplica {
1570                        cluster: Ident::new_unchecked(cluster.name()),
1571                        replica: Ident::new_unchecked(replica.name()),
1572                    };
1573                    (ObjectType::ClusterReplica, name.to_string())
1574                }
1575                ObjectId::Database(id) => (
1576                    ObjectType::Database,
1577                    catalog.get_database(&id).name().to_string(),
1578                ),
1579                ObjectId::Schema((database_spec, schema_spec)) => {
1580                    let schema = catalog.get_schema(&database_spec, &schema_spec);
1581                    let name = catalog.resolve_full_schema_name(schema.name());
1582                    (ObjectType::Schema, name.to_string())
1583                }
1584                ObjectId::Item(id) => {
1585                    let item = catalog.get_item(&id);
1586                    let name = catalog.resolve_full_name(item.name());
1587                    (item.item_type().into(), name.to_string())
1588                }
1589                ObjectId::NetworkPolicy(id) => (
1590                    ObjectType::NetworkPolicy,
1591                    catalog.get_network_policy(&id).name().to_string(),
1592                ),
1593                ObjectId::Role(_) => unreachable!("roles have no owner"),
1594            })
1595            .collect();
1596        Err(UnauthorizedError::Ownership { objects })
1597    } else {
1598        Ok(())
1599    }
1600}
1601
1602fn generate_required_source_privileges(
1603    name: &QualifiedItemName,
1604    data_source: &DataSourceDesc,
1605    in_cluster: Option<ClusterId>,
1606    role_id: RoleId,
1607) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1608    let mut privileges = vec![(
1609        SystemObjectId::Object(name.qualifiers.clone().into()),
1610        AclMode::CREATE,
1611        role_id,
1612    )];
1613    match (data_source, in_cluster) {
1614        (_, Some(id)) => {
1615            privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1616        }
1617        (DataSourceDesc::Ingestion(_), None) => {
1618            privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1619        }
1620        // Non-ingestion data-sources have meaningless cluster config's (for now...) and they need
1621        // to be ignored.
1622        // This feels very brittle, but there's not much we can do until the UNDEFINED cluster
1623        // config is removed.
1624        (_, None) => {}
1625    }
1626    privileges
1627}
1628
1629/// Generates all the privileges required to execute a read that includes the objects in `ids`.
1630///
1631/// Not only do we need to validate that `role_id` has read privileges on all relations in `ids`,
1632/// but if any object is a view or materialized view then we need to validate that the owner of
1633/// that view has all of the privileges required to execute the query within the view.
1634///
1635/// For more details see: <https://www.postgresql.org/docs/15/rules-privileges.html>
1636fn generate_read_privileges(
1637    catalog: &impl SessionCatalog,
1638    ids: impl Iterator<Item = CatalogItemId>,
1639    role_id: RoleId,
1640) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1641    generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1642}
1643
1644fn generate_read_privileges_inner(
1645    catalog: &impl SessionCatalog,
1646    ids: impl Iterator<Item = CatalogItemId>,
1647    role_id: RoleId,
1648    seen: &mut BTreeSet<(ObjectId, RoleId)>,
1649) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1650    let mut privileges = Vec::new();
1651    let mut views = Vec::new();
1652
1653    for id in ids {
1654        if seen.insert((id.into(), role_id)) {
1655            let item = catalog.get_item(&id);
1656            let schema_id: ObjectId = item.name().qualifiers.clone().into();
1657            if seen.insert((schema_id.clone(), role_id)) {
1658                privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1659            }
1660            match item.item_type() {
1661                CatalogItemType::View
1662                | CatalogItemType::MaterializedView
1663                | CatalogItemType::ContinualTask => {
1664                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1665                    views.push((item.references().items().copied(), item.owner_id()));
1666                }
1667                CatalogItemType::Table | CatalogItemType::Source => {
1668                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1669                }
1670                CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1671                    privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1672                }
1673                CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1674            }
1675        }
1676    }
1677
1678    for (view_ids, view_owner) in views {
1679        privileges.extend_from_slice(&generate_read_privileges_inner(
1680            catalog, view_ids, view_owner, seen,
1681        ));
1682    }
1683
1684    privileges
1685}
1686
1687fn generate_usage_privileges(
1688    catalog: &impl SessionCatalog,
1689    ids: &ResolvedIds,
1690    role_id: RoleId,
1691    item_types: &BTreeSet<CatalogItemType>,
1692) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1693    // Use a `BTreeSet` to remove duplicate privileges.
1694    ids.items()
1695        .filter_map(move |id| {
1696            let item = catalog.get_item(id);
1697            if item_types.contains(&item.item_type()) {
1698                let schema_id = item.name().qualifiers.clone().into();
1699                Some([
1700                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1701                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1702                ])
1703            } else {
1704                None
1705            }
1706        })
1707        .flatten()
1708        .collect()
1709}
1710
1711fn generate_cluster_usage_privileges(
1712    expr_is_const: bool,
1713    target_cluster_id: Option<ClusterId>,
1714    role_id: RoleId,
1715) -> Option<(SystemObjectId, AclMode, RoleId)> {
1716    // TODO(jkosh44) expr hasn't been fully optimized yet, so it might actually be a constant,
1717    //  but we mistakenly think that it's not. For now it's ok to be overly protective.
1718    if !expr_is_const {
1719        if let Some(cluster_id) = target_cluster_id {
1720            return Some((
1721                SystemObjectId::Object(cluster_id.into()),
1722                AclMode::USAGE,
1723                role_id,
1724            ));
1725        }
1726    }
1727
1728    None
1729}
1730
1731fn check_object_privileges(
1732    catalog: &impl SessionCatalog,
1733    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1734    role_membership: BTreeSet<RoleId>,
1735    current_role_id: RoleId,
1736) -> Result<(), UnauthorizedError> {
1737    let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1738    role_memberships.insert(current_role_id, role_membership);
1739    for (object_id, acl_mode, role_id) in privileges {
1740        let role_membership = role_memberships
1741            .entry(role_id)
1742            .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1743        let object_privileges = catalog
1744            .get_privileges(&object_id)
1745            .expect("only object types with privileges will generate required privileges");
1746        let role_privileges = role_membership
1747            .iter()
1748            .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1749            .map(|mz_acl_item| mz_acl_item.acl_mode)
1750            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1751        if !role_privileges.contains(acl_mode) {
1752            let role_name = catalog.get_role(&role_id).name().to_string();
1753            let privileges = acl_mode.to_error_string();
1754            return Err(UnauthorizedError::Privilege {
1755                object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1756                role_name,
1757                privileges,
1758            });
1759        }
1760    }
1761
1762    Ok(())
1763}
1764
1765pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1766    const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1767        .union(AclMode::SELECT)
1768        .union(AclMode::UPDATE)
1769        .union(AclMode::DELETE);
1770    const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1771    const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1772        .union(AclMode::CREATE_DB)
1773        .union(AclMode::CREATE_CLUSTER)
1774        .union(AclMode::CREATE_NETWORK_POLICY);
1775
1776    const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1777    match object_type {
1778        SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1779        SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1780        SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1781        SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1782        SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1783        SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1784        SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1785        SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1786        SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1787        SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1788        SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1789        SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1790        SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1791        SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1792        SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1793        SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1794        SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT,
1795        SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1796    }
1797}
1798
1799pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1800    MzAclItem {
1801        grantee: owner_id,
1802        grantor: owner_id,
1803        acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1804    }
1805}
1806
1807const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1808    match object_type {
1809        ObjectType::Table
1810        | ObjectType::View
1811        | ObjectType::MaterializedView
1812        | ObjectType::Source
1813        | ObjectType::ContinualTask => AclMode::SELECT,
1814        ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1815        ObjectType::Sink
1816        | ObjectType::Index
1817        | ObjectType::Role
1818        | ObjectType::Cluster
1819        | ObjectType::ClusterReplica
1820        | ObjectType::Secret
1821        | ObjectType::Connection
1822        | ObjectType::Database
1823        | ObjectType::Func
1824        | ObjectType::NetworkPolicy => AclMode::empty(),
1825    }
1826}
1827
1828pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1829    let acl_mode = default_builtin_object_acl_mode(object_type);
1830    MzAclItem {
1831        grantee: MZ_SUPPORT_ROLE_ID,
1832        grantor: MZ_SYSTEM_ROLE_ID,
1833        acl_mode,
1834    }
1835}
1836
1837pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1838    let acl_mode = default_builtin_object_acl_mode(object_type);
1839    MzAclItem {
1840        grantee: RoleId::Public,
1841        grantor: MZ_SYSTEM_ROLE_ID,
1842        acl_mode,
1843    }
1844}