mz_sql/
rbac.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26    CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29    CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30    SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34    DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40/// Common checks that need to be performed before we can start checking a role's privileges.
41fn rbac_check_preamble(
42    catalog: &impl SessionCatalog,
43    session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45    // PostgreSQL allows users that have their role dropped to perform some actions,
46    // such as `SET ROLE` and certain `SELECT` queries. We haven't implemented
47    // `SET ROLE` and feel it's safer to force to user to re-authenticate if their
48    // role is dropped.
49    if catalog
50        .try_get_role(&session_meta.role_metadata().current_role)
51        .is_none()
52    {
53        return Err(UnauthorizedError::ConcurrentRoleDrop(
54            session_meta.role_metadata().current_role.clone(),
55        ));
56    };
57    if catalog
58        .try_get_role(&session_meta.role_metadata().session_role)
59        .is_none()
60    {
61        return Err(UnauthorizedError::ConcurrentRoleDrop(
62            session_meta.role_metadata().session_role.clone(),
63        ));
64    };
65    if catalog
66        .try_get_role(&session_meta.role_metadata().authenticated_role)
67        .is_none()
68    {
69        return Err(UnauthorizedError::ConcurrentRoleDrop(
70            session_meta.role_metadata().authenticated_role.clone(),
71        ));
72    };
73
74    Ok(())
75}
76
77/// Filters `RbacRequirements` based on the session role metadata and RBAC related feature flags.
78fn filter_requirements(
79    catalog: &impl SessionCatalog,
80    session_meta: &dyn SessionMetadata,
81    rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83    // Skip RBAC non-mandatory checks if RBAC is disabled. However, we never skip RBAC checks for
84    // system roles. This allows us to limit access of system users even when RBAC is off.
85    let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86        && !session_meta.role_metadata().current_role.is_system()
87        && !session_meta.role_metadata().session_role.is_system();
88    // Skip RBAC checks on user items if the session is a superuser.
89    let is_superuser = session_meta.is_superuser();
90    if is_rbac_disabled || is_superuser {
91        return rbac_requirements.filter_to_mandatory_requirements();
92    }
93
94    rbac_requirements
95}
96
97// The default item types that most statements require USAGE privileges for.
98static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99    btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101// CREATE statements require USAGE privileges on the default item types and USAGE privileges on
102// Types.
103pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104    let mut items = DEFAULT_ITEM_USAGE.clone();
105    items.insert(CatalogItemType::Type);
106    items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110/// Errors that can occur due to an unauthorized action.
111#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113    /// The action can only be performed by a superuser.
114    #[error("permission denied to {action}")]
115    Superuser { action: String },
116    /// The action requires ownership of an object.
117    #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118    Ownership { objects: Vec<(ObjectType, String)> },
119    /// Altering an owner requires membership of the new owner role.
120    #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121    RoleMembership { role_names: Vec<String> },
122    /// The action requires one or more privileges.
123    #[error("permission denied for {object_description}")]
124    Privilege {
125        object_description: ErrorMessageObjectDescription,
126        role_name: String,
127        privileges: String,
128    },
129    // TODO(jkosh44) When we implement parameter privileges, this can be replaced with a regular
130    //  privilege error.
131    /// The action can only be performed by the mz_system role.
132    #[error("permission denied to {action}")]
133    MzSystem { action: String },
134    /// The action cannot be performed by the mz_support role.
135    #[error("permission denied to {action}")]
136    MzSupport { action: String },
137    /// The active role was dropped while a user was logged in.
138    #[error("role {0} was concurrently dropped")]
139    ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143    pub fn detail(&self) -> Option<String> {
144        match &self {
145            UnauthorizedError::Superuser { action } => {
146                Some(format!("You must be a superuser to {}", action))
147            }
148            UnauthorizedError::Privilege {
149                object_description,
150                role_name,
151                privileges,
152            } => Some(format!(
153                "The '{role_name}' role needs {privileges} privileges on {object_description}"
154            )),
155            UnauthorizedError::MzSystem { .. } => {
156                Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157            }
158            UnauthorizedError::MzSupport { .. } => Some(format!(
159                "The '{}' role has very limited privileges",
160                SUPPORT_USER.name
161            )),
162            UnauthorizedError::ConcurrentRoleDrop(_) => {
163                Some("Please disconnect and re-connect with a valid role.".into())
164            }
165            UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166        }
167    }
168}
169
170/// RBAC requirements for executing a given plan.
171#[derive(Debug)]
172struct RbacRequirements {
173    /// The role memberships required.
174    role_membership: BTreeSet<RoleId>,
175    /// The object ownerships required.
176    ownership: Vec<ObjectId>,
177    /// The privileges required. The tuples are of the form:
178    /// (What object the privilege is on, What privilege is required, Who must possess the privilege).
179    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180    /// The types of catalog items that this plan requires USAGE privileges on.
181    ///
182    /// Most plans will require USAGE on secrets and connections but some plans, like SHOW CREATE,
183    /// can reference an item without requiring any privileges on that item.
184    item_usage: &'static BTreeSet<CatalogItemType>,
185    /// Some action if superuser is required to perform that action, None otherwise.
186    superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190    fn empty() -> RbacRequirements {
191        RbacRequirements {
192            role_membership: BTreeSet::new(),
193            ownership: Vec::new(),
194            privileges: Vec::new(),
195            item_usage: &EMPTY_ITEM_USAGE,
196            superuser_action: None,
197        }
198    }
199
200    fn validate(
201        self,
202        catalog: &impl SessionCatalog,
203        session: &dyn SessionMetadata,
204        resolved_ids: &ResolvedIds,
205    ) -> Result<(), UnauthorizedError> {
206        // Obtain all roles that the current session is a member of.
207        let role_membership =
208            catalog.collect_role_membership(&session.role_metadata().current_role);
209
210        check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212        // Validate that the current session has the required role membership to execute the provided
213        // plan.
214        let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215        if !unheld_membership.is_empty() {
216            let role_names = unheld_membership
217                .into_iter()
218                .map(|role_id| {
219                    // Some role references may no longer exist due to concurrent drops.
220                    catalog
221                        .try_get_role(role_id)
222                        .map(|role| role.name().to_string())
223                        .unwrap_or_else(|| role_id.to_string())
224                })
225                .collect();
226            return Err(UnauthorizedError::RoleMembership { role_names });
227        }
228
229        // Validate that the current session has the required object ownership to execute the provided
230        // plan.
231        let unheld_ownership = self
232            .ownership
233            .into_iter()
234            .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235            .collect();
236        ownership_err(unheld_ownership, catalog)?;
237
238        check_object_privileges(
239            catalog,
240            self.privileges,
241            role_membership,
242            session.role_metadata().current_role,
243        )?;
244
245        if let Some(action) = self.superuser_action {
246            return Err(UnauthorizedError::Superuser { action });
247        }
248
249        Ok(())
250    }
251
252    fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253        let RbacRequirements {
254            role_membership,
255            ownership,
256            privileges,
257            item_usage,
258            superuser_action: _,
259        } = self;
260        let role_membership = role_membership
261            .into_iter()
262            .filter(|id| id.is_system())
263            .collect();
264        let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265        let privileges = privileges
266            .into_iter()
267            .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268            // We allow reading objects for superusers and when RBAC is off.
269            .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270            .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271            .collect();
272        let superuser_action = None;
273        RbacRequirements {
274            role_membership,
275            ownership,
276            privileges,
277            item_usage,
278            superuser_action,
279        }
280    }
281}
282
283impl Default for RbacRequirements {
284    fn default() -> Self {
285        RbacRequirements {
286            role_membership: BTreeSet::new(),
287            ownership: Vec::new(),
288            privileges: Vec::new(),
289            item_usage: &DEFAULT_ITEM_USAGE,
290            superuser_action: None,
291        }
292    }
293}
294
295/// Checks if a `session` is authorized to use `resolved_ids`. If not, an error is returned.
296pub fn check_usage(
297    catalog: &impl SessionCatalog,
298    session: &dyn SessionMetadata,
299    resolved_ids: &ResolvedIds,
300    item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302    rbac_check_preamble(catalog, session)?;
303
304    // Obtain all roles that the current session is a member of.
305    let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307    // Certain statements depend on objects that haven't been created yet, like sub-sources, so we
308    // need to filter those out.
309    let existing_resolved_ids =
310        resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312    let required_privileges = generate_usage_privileges(
313        catalog,
314        &existing_resolved_ids,
315        session.role_metadata().current_role,
316        item_types,
317    )
318    .into_iter()
319    .collect();
320
321    let mut rbac_requirements = RbacRequirements::empty();
322    rbac_requirements.privileges = required_privileges;
323    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324    let required_privileges = rbac_requirements.privileges;
325
326    check_object_privileges(
327        catalog,
328        required_privileges,
329        role_membership,
330        session.role_metadata().current_role,
331    )?;
332
333    Ok(())
334}
335
336/// Checks if a session is authorized to execute a plan. If not, an error is returned.
337pub fn check_plan(
338    catalog: &impl SessionCatalog,
339    // Function mapping a connection ID to an authenticated role. The roles may have been dropped concurrently.
340    // Only required for Plan::SideEffectingFunc; can be None for other plan types.
341    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
342    session: &dyn SessionMetadata,
343    plan: &Plan,
344    target_cluster_id: Option<ClusterId>,
345    resolved_ids: &ResolvedIds,
346) -> Result<(), UnauthorizedError> {
347    rbac_check_preamble(catalog, session)?;
348
349    let rbac_requirements = generate_rbac_requirements(
350        catalog,
351        plan,
352        active_conns,
353        target_cluster_id,
354        session.role_metadata().current_role,
355    );
356    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
357    debug!(
358        "rbac requirements {rbac_requirements:?} for plan {:?}",
359        PlanKind::from(plan)
360    );
361    rbac_requirements.validate(catalog, session, resolved_ids)
362}
363
364/// Returns true if RBAC is turned on for a session, false otherwise.
365pub fn is_rbac_enabled_for_session(
366    system_vars: &SystemVars,
367    session: &dyn SessionMetadata,
368) -> bool {
369    let server_enabled = system_vars.enable_rbac_checks();
370    let session_enabled = session.enable_session_rbac_checks();
371
372    // The session flag allows users to turn RBAC on for just their session while the server flag
373    // allows users to turn RBAC on for everyone.
374    server_enabled || session_enabled
375}
376
377/// Generates all requirements needed to execute a given plan.
378fn generate_rbac_requirements(
379    catalog: &impl SessionCatalog,
380    plan: &Plan,
381    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
382    target_cluster_id: Option<ClusterId>,
383    role_id: RoleId,
384) -> RbacRequirements {
385    match plan {
386        Plan::CreateConnection(plan::CreateConnectionPlan {
387            name,
388            if_not_exists: _,
389            connection: _,
390            validate: _,
391        }) => RbacRequirements {
392            privileges: vec![(
393                SystemObjectId::Object(name.qualifiers.clone().into()),
394                AclMode::CREATE,
395                role_id,
396            )],
397            item_usage: &CREATE_ITEM_USAGE,
398            ..Default::default()
399        },
400        Plan::CreateDatabase(plan::CreateDatabasePlan {
401            name: _,
402            if_not_exists: _,
403        }) => RbacRequirements {
404            privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
405            item_usage: &CREATE_ITEM_USAGE,
406            ..Default::default()
407        },
408        Plan::CreateSchema(plan::CreateSchemaPlan {
409            database_spec,
410            schema_name: _,
411            if_not_exists: _,
412        }) => {
413            let privileges = match database_spec {
414                ResolvedDatabaseSpecifier::Ambient => Vec::new(),
415                ResolvedDatabaseSpecifier::Id(database_id) => {
416                    vec![(
417                        SystemObjectId::Object(database_id.into()),
418                        AclMode::CREATE,
419                        role_id,
420                    )]
421                }
422            };
423            RbacRequirements {
424                privileges,
425                item_usage: &CREATE_ITEM_USAGE,
426                ..Default::default()
427            }
428        }
429        Plan::CreateRole(plan::CreateRolePlan {
430            name: _,
431            attributes,
432        }) => {
433            if attributes.superuser.unwrap_or(false) {
434                RbacRequirements {
435                    superuser_action: Some("create superuser role".to_string()),
436                    ..Default::default()
437                }
438            } else {
439                RbacRequirements {
440                    privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
441                    item_usage: &CREATE_ITEM_USAGE,
442                    ..Default::default()
443                }
444            }
445        }
446        Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
447            privileges: vec![(
448                SystemObjectId::System,
449                AclMode::CREATE_NETWORK_POLICY,
450                role_id,
451            )],
452            item_usage: &CREATE_ITEM_USAGE,
453            ..Default::default()
454        },
455        Plan::CreateCluster(plan::CreateClusterPlan {
456            name: _,
457            variant: _,
458            workload_class: _,
459        }) => RbacRequirements {
460            privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
461            item_usage: &CREATE_ITEM_USAGE,
462            ..Default::default()
463        },
464        Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
465            cluster_id,
466            name: _,
467            config: _,
468        }) => RbacRequirements {
469            ownership: vec![ObjectId::Cluster(*cluster_id)],
470            item_usage: &CREATE_ITEM_USAGE,
471            ..Default::default()
472        },
473        Plan::CreateSource(plan::CreateSourcePlan {
474            name,
475            source,
476            if_not_exists: _,
477            timeline: _,
478            in_cluster,
479        }) => RbacRequirements {
480            privileges: generate_required_source_privileges(
481                name,
482                &source.data_source,
483                *in_cluster,
484                role_id,
485            ),
486            item_usage: &CREATE_ITEM_USAGE,
487            ..Default::default()
488        },
489        Plan::CreateSources(plans) => RbacRequirements {
490            privileges: plans
491                .iter()
492                .flat_map(
493                    |plan::CreateSourcePlanBundle {
494                         item_id: _,
495                         global_id: _,
496                         plan:
497                             plan::CreateSourcePlan {
498                                 name,
499                                 source,
500                                 if_not_exists: _,
501                                 timeline: _,
502                                 in_cluster,
503                             },
504                         resolved_ids: _,
505                         available_source_references: _,
506                     }| {
507                        generate_required_source_privileges(
508                            name,
509                            &source.data_source,
510                            *in_cluster,
511                            role_id,
512                        )
513                        .into_iter()
514                    },
515                )
516                .collect(),
517            item_usage: &CREATE_ITEM_USAGE,
518            ..Default::default()
519        },
520        Plan::CreateSecret(plan::CreateSecretPlan {
521            name,
522            secret: _,
523            if_not_exists: _,
524        }) => RbacRequirements {
525            privileges: vec![(
526                SystemObjectId::Object(name.qualifiers.clone().into()),
527                AclMode::CREATE,
528                role_id,
529            )],
530            item_usage: &CREATE_ITEM_USAGE,
531            ..Default::default()
532        },
533        Plan::CreateSink(plan::CreateSinkPlan {
534            name,
535            sink,
536            with_snapshot: _,
537            if_not_exists: _,
538            in_cluster,
539        }) => {
540            let mut privileges = vec![(
541                SystemObjectId::Object(name.qualifiers.clone().into()),
542                AclMode::CREATE,
543                role_id,
544            )];
545            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
546            privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
547            privileges.push((
548                SystemObjectId::Object(in_cluster.into()),
549                AclMode::CREATE,
550                role_id,
551            ));
552            RbacRequirements {
553                privileges,
554                item_usage: &CREATE_ITEM_USAGE,
555                ..Default::default()
556            }
557        }
558        Plan::CreateTable(plan::CreateTablePlan {
559            name,
560            table: _,
561            if_not_exists: _,
562        }) => RbacRequirements {
563            privileges: vec![(
564                SystemObjectId::Object(name.qualifiers.clone().into()),
565                AclMode::CREATE,
566                role_id,
567            )],
568            item_usage: &CREATE_ITEM_USAGE,
569            ..Default::default()
570        },
571        Plan::CreateView(plan::CreateViewPlan {
572            name,
573            view: _,
574            replace,
575            drop_ids: _,
576            if_not_exists: _,
577            ambiguous_columns: _,
578        }) => RbacRequirements {
579            ownership: replace
580                .map(|id| vec![ObjectId::Item(id)])
581                .unwrap_or_default(),
582            privileges: vec![(
583                SystemObjectId::Object(name.qualifiers.clone().into()),
584                AclMode::CREATE,
585                role_id,
586            )],
587            item_usage: &CREATE_ITEM_USAGE,
588            ..Default::default()
589        },
590        Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
591            name,
592            materialized_view,
593            replace,
594            drop_ids: _,
595            if_not_exists: _,
596            ambiguous_columns: _,
597        }) => RbacRequirements {
598            ownership: replace
599                .map(|id| vec![ObjectId::Item(id)])
600                .unwrap_or_default(),
601            privileges: vec![
602                (
603                    SystemObjectId::Object(name.qualifiers.clone().into()),
604                    AclMode::CREATE,
605                    role_id,
606                ),
607                (
608                    SystemObjectId::Object(materialized_view.cluster_id.into()),
609                    AclMode::CREATE,
610                    role_id,
611                ),
612            ],
613            item_usage: &CREATE_ITEM_USAGE,
614            ..Default::default()
615        },
616        Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
617            name,
618            placeholder_id: _,
619            desc: _,
620            input_id: _,
621            with_snapshot: _,
622            continual_task,
623        }) => RbacRequirements {
624            privileges: vec![
625                (
626                    SystemObjectId::Object(name.qualifiers.clone().into()),
627                    AclMode::CREATE,
628                    role_id,
629                ),
630                (
631                    SystemObjectId::Object(continual_task.cluster_id.into()),
632                    AclMode::CREATE,
633                    role_id,
634                ),
635            ],
636            item_usage: &CREATE_ITEM_USAGE,
637            ..Default::default()
638        },
639        Plan::CreateIndex(plan::CreateIndexPlan {
640            name,
641            index,
642            if_not_exists: _,
643        }) => {
644            let index_on_item = catalog.resolve_item_id(&index.on);
645            RbacRequirements {
646                ownership: vec![ObjectId::Item(index_on_item)],
647                privileges: vec![
648                    (
649                        SystemObjectId::Object(name.qualifiers.clone().into()),
650                        AclMode::CREATE,
651                        role_id,
652                    ),
653                    (
654                        SystemObjectId::Object(index.cluster_id.into()),
655                        AclMode::CREATE,
656                        role_id,
657                    ),
658                ],
659                item_usage: &CREATE_ITEM_USAGE,
660                ..Default::default()
661            }
662        }
663        Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
664            privileges: vec![(
665                SystemObjectId::Object(name.qualifiers.clone().into()),
666                AclMode::CREATE,
667                role_id,
668            )],
669            item_usage: &CREATE_ITEM_USAGE,
670            ..Default::default()
671        },
672        Plan::Comment(plan::CommentPlan {
673            object_id,
674            sub_component: _,
675            comment: _,
676        }) => {
677            let (ownership, privileges) = match object_id {
678                // Roles don't have owners, instead we require the current session to have the
679                // `CREATEROLE` privilege.
680                CommentObjectId::Role(_) => (
681                    Vec::new(),
682                    vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
683                ),
684                _ => (vec![ObjectId::from(*object_id)], Vec::new()),
685            };
686            RbacRequirements {
687                ownership,
688                privileges,
689                ..Default::default()
690            }
691        }
692        Plan::DropObjects(plan::DropObjectsPlan {
693            referenced_ids,
694            drop_ids: _,
695            object_type,
696        }) => {
697            let privileges = if object_type == &ObjectType::Role {
698                vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
699            } else {
700                referenced_ids
701                    .iter()
702                    .filter_map(|id| match id {
703                        ObjectId::ClusterReplica((cluster_id, _)) => Some((
704                            SystemObjectId::Object(cluster_id.into()),
705                            AclMode::USAGE,
706                            role_id,
707                        )),
708                        ObjectId::Schema((database_spec, _)) => match database_spec {
709                            ResolvedDatabaseSpecifier::Ambient => None,
710                            ResolvedDatabaseSpecifier::Id(database_id) => Some((
711                                SystemObjectId::Object(database_id.into()),
712                                AclMode::USAGE,
713                                role_id,
714                            )),
715                        },
716                        ObjectId::Item(item_id) => {
717                            let item = catalog.get_item(item_id);
718                            Some((
719                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
720                                AclMode::USAGE,
721                                role_id,
722                            ))
723                        }
724                        ObjectId::Cluster(_)
725                        | ObjectId::Database(_)
726                        | ObjectId::Role(_)
727                        | ObjectId::NetworkPolicy(_) => None,
728                    })
729                    .collect()
730            };
731            RbacRequirements {
732                // Do not need ownership of descendant objects.
733                ownership: referenced_ids.clone(),
734                privileges,
735                ..Default::default()
736            }
737        }
738        Plan::DropOwned(plan::DropOwnedPlan {
739            role_ids,
740            drop_ids: _,
741            privilege_revokes: _,
742            default_privilege_revokes: _,
743        }) => RbacRequirements {
744            role_membership: role_ids.into_iter().cloned().collect(),
745            ..Default::default()
746        },
747        Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
748            let container_id = match id {
749                ObjectId::Item(id) => Some(SystemObjectId::Object(
750                    catalog.get_item(id).name().qualifiers.clone().into(),
751                )),
752                ObjectId::Schema((database_id, _schema_id)) => match database_id {
753                    ResolvedDatabaseSpecifier::Ambient => None,
754                    ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
755                },
756                ObjectId::Cluster(_)
757                | ObjectId::ClusterReplica(_)
758                | ObjectId::Database(_)
759                | ObjectId::Role(_)
760                | ObjectId::NetworkPolicy(_) => None,
761            };
762            let privileges = match container_id {
763                Some(id) => vec![(id, AclMode::USAGE, role_id)],
764                None => Vec::new(),
765            };
766            RbacRequirements {
767                privileges,
768                item_usage: &EMPTY_ITEM_USAGE,
769                ..Default::default()
770            }
771        }
772        Plan::ShowColumns(plan::ShowColumnsPlan {
773            id,
774            select_plan,
775            new_resolved_ids: _,
776        }) => {
777            let mut privileges = vec![(
778                SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
779                AclMode::USAGE,
780                role_id,
781            )];
782
783            for privilege in generate_rbac_requirements(
784                catalog,
785                &Plan::Select(select_plan.clone()),
786                active_conns,
787                target_cluster_id,
788                role_id,
789            )
790            .privileges
791            {
792                privileges.push(privilege);
793            }
794            RbacRequirements {
795                privileges,
796                ..Default::default()
797            }
798        }
799        Plan::Select(plan::SelectPlan {
800            source,
801            select: _,
802            when: _,
803            finishing: _,
804            copy_to: _,
805        }) => {
806            let items = source
807                .depends_on()
808                .into_iter()
809                .map(|gid| catalog.resolve_item_id(&gid));
810            let mut privileges = generate_read_privileges(catalog, items, role_id);
811            if let Some(privilege) = generate_cluster_usage_privileges(
812                source.as_const().is_some(),
813                target_cluster_id,
814                role_id,
815            ) {
816                privileges.push(privilege);
817            }
818            RbacRequirements {
819                privileges,
820                ..Default::default()
821            }
822        }
823        Plan::Subscribe(plan::SubscribePlan {
824            from,
825            with_snapshot: _,
826            when: _,
827            up_to: _,
828            copy_to: _,
829            emit_progress: _,
830            output: _,
831        }) => {
832            let items = from
833                .depends_on()
834                .into_iter()
835                .map(|gid| catalog.resolve_item_id(&gid));
836            let mut privileges = generate_read_privileges(catalog, items, role_id);
837            if let Some(cluster_id) = target_cluster_id {
838                privileges.push((
839                    SystemObjectId::Object(cluster_id.into()),
840                    AclMode::USAGE,
841                    role_id,
842                ));
843            }
844            RbacRequirements {
845                privileges,
846                ..Default::default()
847            }
848        }
849        Plan::CopyFrom(plan::CopyFromPlan {
850            target_name: _,
851            target_id,
852            source: _,
853            columns: _,
854            source_desc: _,
855            mfp: _,
856            params: _,
857            filter: _,
858        }) => RbacRequirements {
859            privileges: vec![
860                (
861                    SystemObjectId::Object(
862                        catalog.get_item(target_id).name().qualifiers.clone().into(),
863                    ),
864                    AclMode::USAGE,
865                    role_id,
866                ),
867                (
868                    SystemObjectId::Object(target_id.into()),
869                    AclMode::INSERT,
870                    role_id,
871                ),
872            ],
873            ..Default::default()
874        },
875        Plan::CopyTo(plan::CopyToPlan {
876            select_plan,
877            desc: _,
878            to: _,
879            connection: _,
880            connection_id: _,
881            format: _,
882            max_file_size: _,
883        }) => {
884            let items = select_plan
885                .source
886                .depends_on()
887                .into_iter()
888                .map(|gid| catalog.resolve_item_id(&gid));
889            let mut privileges = generate_read_privileges(catalog, items, role_id);
890            if let Some(cluster_id) = target_cluster_id {
891                privileges.push((
892                    SystemObjectId::Object(cluster_id.into()),
893                    AclMode::USAGE,
894                    role_id,
895                ));
896            }
897            RbacRequirements {
898                privileges,
899                ..Default::default()
900            }
901        }
902        Plan::ExplainPlan(plan::ExplainPlanPlan {
903            stage: _,
904            format: _,
905            config: _,
906            explainee,
907        })
908        | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
909            privileges: match explainee {
910                Explainee::View(id)
911                | Explainee::MaterializedView(id)
912                | Explainee::Index(id)
913                | Explainee::ReplanView(id)
914                | Explainee::ReplanMaterializedView(id)
915                | Explainee::ReplanIndex(id) => {
916                    let item = catalog.get_item(id);
917                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
918                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
919                }
920                Explainee::Statement(stmt) => stmt
921                    .depends_on()
922                    .into_iter()
923                    .map(|id| {
924                        let item = catalog.get_item_by_global_id(&id);
925                        let schema_id: ObjectId = item.name().qualifiers.clone().into();
926                        (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
927                    })
928                    .collect(),
929            },
930            item_usage: match explainee {
931                Explainee::View(..)
932                | Explainee::MaterializedView(..)
933                | Explainee::Index(..)
934                | Explainee::ReplanView(..)
935                | Explainee::ReplanMaterializedView(..)
936                | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
937                Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
938            },
939            ..Default::default()
940        },
941        Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
942            RbacRequirements {
943                privileges: {
944                    let item = catalog.get_item_by_global_id(sink_from);
945                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
946                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
947                },
948                item_usage: &EMPTY_ITEM_USAGE,
949                ..Default::default()
950            }
951        }
952        Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
953            format: _,
954            raw_plan,
955            when: _,
956        }) => RbacRequirements {
957            privileges: raw_plan
958                .depends_on()
959                .into_iter()
960                .map(|id| {
961                    let item = catalog.get_item_by_global_id(&id);
962                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
963                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
964                })
965                .collect(),
966            ..Default::default()
967        },
968        Plan::Insert(plan::InsertPlan {
969            id,
970            values,
971            returning,
972        }) => {
973            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
974            let mut privileges = vec![
975                (
976                    SystemObjectId::Object(schema_id.clone()),
977                    AclMode::USAGE,
978                    role_id,
979                ),
980                (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
981            ];
982            let mut seen = BTreeSet::from([(schema_id, role_id)]);
983
984            // We don't allow arbitrary sub-queries in `returning`. So either it
985            // contains a column reference to the outer table or it's constant.
986            if returning
987                .iter()
988                .any(|assignment| assignment.contains_column())
989            {
990                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
991                seen.insert((id.into(), role_id));
992            }
993
994            let items = values
995                .depends_on()
996                .into_iter()
997                .map(|gid| catalog.resolve_item_id(&gid));
998            privileges.extend_from_slice(&generate_read_privileges_inner(
999                catalog, items, role_id, &mut seen,
1000            ));
1001
1002            if let Some(privilege) = generate_cluster_usage_privileges(
1003                values.as_const().is_some(),
1004                target_cluster_id,
1005                role_id,
1006            ) {
1007                privileges.push(privilege);
1008            } else if !returning.is_empty() {
1009                // TODO(jkosh44) returning may be a constant, but for now we are overly protective
1010                //  and require cluster privileges for all returning.
1011                if let Some(cluster_id) = target_cluster_id {
1012                    privileges.push((
1013                        SystemObjectId::Object(cluster_id.into()),
1014                        AclMode::USAGE,
1015                        role_id,
1016                    ));
1017                }
1018            }
1019            RbacRequirements {
1020                privileges,
1021                ..Default::default()
1022            }
1023        }
1024        Plan::AlterCluster(plan::AlterClusterPlan {
1025            id,
1026            name: _,
1027            options: _,
1028            strategy: _,
1029        }) => RbacRequirements {
1030            ownership: vec![ObjectId::Cluster(*id)],
1031            item_usage: &CREATE_ITEM_USAGE,
1032            ..Default::default()
1033        },
1034        Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1035            ownership: vec![ObjectId::Item(*id)],
1036            privileges: vec![(
1037                SystemObjectId::Object(set_cluster.into()),
1038                AclMode::CREATE,
1039                role_id,
1040            )],
1041            item_usage: &CREATE_ITEM_USAGE,
1042            ..Default::default()
1043        },
1044        Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1045            id,
1046            window: _,
1047            value: _,
1048            object_type: _,
1049        }) => RbacRequirements {
1050            ownership: vec![ObjectId::Item(*id)],
1051            item_usage: &CREATE_ITEM_USAGE,
1052            ..Default::default()
1053        },
1054        Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1055            ownership: vec![ObjectId::Item(*id)],
1056            ..Default::default()
1057        },
1058        Plan::AlterSource(plan::AlterSourcePlan {
1059            item_id,
1060            ingestion_id: _,
1061            action: _,
1062        }) => RbacRequirements {
1063            ownership: vec![ObjectId::Item(*item_id)],
1064            item_usage: &CREATE_ITEM_USAGE,
1065            ..Default::default()
1066        },
1067        Plan::AlterSink(plan::AlterSinkPlan {
1068            item_id,
1069            global_id: _,
1070            sink,
1071            with_snapshot: _,
1072            in_cluster,
1073        }) => {
1074            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1075            let mut privileges = generate_read_privileges(catalog, items, role_id);
1076            privileges.push((
1077                SystemObjectId::Object(in_cluster.into()),
1078                AclMode::CREATE,
1079                role_id,
1080            ));
1081            RbacRequirements {
1082                ownership: vec![ObjectId::Item(*item_id)],
1083                privileges,
1084                item_usage: &CREATE_ITEM_USAGE,
1085                ..Default::default()
1086            }
1087        }
1088        Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1089            id,
1090            name: _,
1091            to_name: _,
1092        }) => RbacRequirements {
1093            ownership: vec![ObjectId::Cluster(*id)],
1094            ..Default::default()
1095        },
1096        Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1097            id_a,
1098            id_b,
1099            name_a: _,
1100            name_b: _,
1101            name_temp: _,
1102        }) => RbacRequirements {
1103            ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1104            ..Default::default()
1105        },
1106        Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1107            cluster_id,
1108            replica_id,
1109            name: _,
1110            to_name: _,
1111        }) => RbacRequirements {
1112            ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1113            ..Default::default()
1114        },
1115        Plan::AlterItemRename(plan::AlterItemRenamePlan {
1116            id,
1117            current_full_name: _,
1118            to_name: _,
1119            object_type: _,
1120        }) => RbacRequirements {
1121            ownership: vec![ObjectId::Item(*id)],
1122            ..Default::default()
1123        },
1124        Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1125            cur_schema_spec,
1126            new_schema_name: _,
1127        }) => {
1128            let privileges = match cur_schema_spec.0 {
1129                ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1130                    SystemObjectId::Object(ObjectId::Database(db_id)),
1131                    AclMode::CREATE,
1132                    role_id,
1133                )],
1134                ResolvedDatabaseSpecifier::Ambient => vec![],
1135            };
1136
1137            RbacRequirements {
1138                ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1139                privileges,
1140                ..Default::default()
1141            }
1142        }
1143        Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1144            schema_a_spec,
1145            schema_a_name: _,
1146            schema_b_spec,
1147            schema_b_name: _,
1148            name_temp: _,
1149        }) => {
1150            let mut privileges = vec![];
1151            if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1152                privileges.push((
1153                    SystemObjectId::Object(ObjectId::Database(id_a)),
1154                    AclMode::CREATE,
1155                    role_id,
1156                ));
1157            }
1158            if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1159                privileges.push((
1160                    SystemObjectId::Object(ObjectId::Database(id_b)),
1161                    AclMode::CREATE,
1162                    role_id,
1163                ));
1164            }
1165
1166            RbacRequirements {
1167                ownership: vec![
1168                    ObjectId::Schema(*schema_a_spec),
1169                    ObjectId::Schema(*schema_b_spec),
1170                ],
1171                privileges,
1172                ..Default::default()
1173            }
1174        }
1175        Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1176            ownership: vec![ObjectId::Item(*id)],
1177            item_usage: &CREATE_ITEM_USAGE,
1178            ..Default::default()
1179        },
1180        Plan::AlterRole(plan::AlterRolePlan {
1181            id,
1182            name: _,
1183            option,
1184        }) => match option {
1185            // Only superusers can alter the superuserness of a role.
1186            plan::PlannedAlterRoleOption::Attributes(attributes)
1187                if attributes.superuser.unwrap_or(false) =>
1188            {
1189                RbacRequirements {
1190                    superuser_action: Some("alter superuser role".to_string()),
1191                    ..Default::default()
1192                }
1193            }
1194            // Roles are allowed to change their own password.
1195            plan::PlannedAlterRoleOption::Attributes(attributes)
1196                if attributes.password.is_some() && role_id == *id =>
1197            {
1198                RbacRequirements::default()
1199            }
1200            // But no one elses...
1201            plan::PlannedAlterRoleOption::Attributes(attributes)
1202                if attributes.password.is_some() =>
1203            {
1204                RbacRequirements {
1205                    superuser_action: Some("alter password of role".to_string()),
1206                    ..Default::default()
1207                }
1208            }
1209            // Roles are allowed to change their own variables.
1210            plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1211                RbacRequirements::default()
1212            }
1213            // Otherwise to ALTER a role, you need to have the CREATE_ROLE privilege.
1214            _ => RbacRequirements {
1215                privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1216                item_usage: &CREATE_ITEM_USAGE,
1217                ..Default::default()
1218            },
1219        },
1220        Plan::AlterOwner(plan::AlterOwnerPlan {
1221            id,
1222            object_type: _,
1223            new_owner,
1224        }) => {
1225            let privileges = match id {
1226                ObjectId::ClusterReplica((cluster_id, _)) => {
1227                    vec![(
1228                        SystemObjectId::Object(cluster_id.into()),
1229                        AclMode::CREATE,
1230                        role_id,
1231                    )]
1232                }
1233                ObjectId::Schema((database_spec, _)) => match database_spec {
1234                    ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1235                    ResolvedDatabaseSpecifier::Id(database_id) => {
1236                        vec![(
1237                            SystemObjectId::Object(database_id.into()),
1238                            AclMode::CREATE,
1239                            role_id,
1240                        )]
1241                    }
1242                },
1243                ObjectId::Item(item_id) => {
1244                    let item = catalog.get_item(item_id);
1245                    vec![(
1246                        SystemObjectId::Object(item.name().qualifiers.clone().into()),
1247                        AclMode::CREATE,
1248                        role_id,
1249                    )]
1250                }
1251                ObjectId::Cluster(_)
1252                | ObjectId::Database(_)
1253                | ObjectId::Role(_)
1254                | ObjectId::NetworkPolicy(_) => Vec::new(),
1255            };
1256            RbacRequirements {
1257                role_membership: BTreeSet::from([*new_owner]),
1258                ownership: vec![id.clone()],
1259                privileges,
1260                ..Default::default()
1261            }
1262        }
1263        Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1264            ownership: vec![ObjectId::Item(*relation_id)],
1265            item_usage: &CREATE_ITEM_USAGE,
1266            ..Default::default()
1267        },
1268        Plan::AlterMaterializedViewApplyReplacement(
1269            plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1270        ) => RbacRequirements {
1271            ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1272            item_usage: &CREATE_ITEM_USAGE,
1273            ..Default::default()
1274        },
1275        Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1276            ownership: vec![ObjectId::NetworkPolicy(*id)],
1277            item_usage: &CREATE_ITEM_USAGE,
1278            ..Default::default()
1279        },
1280        Plan::ReadThenWrite(plan::ReadThenWritePlan {
1281            id,
1282            selection,
1283            finishing: _,
1284            assignments,
1285            kind,
1286            returning,
1287        }) => {
1288            let acl_mode = match kind {
1289                MutationKind::Insert => AclMode::INSERT,
1290                MutationKind::Update => AclMode::UPDATE,
1291                MutationKind::Delete => AclMode::DELETE,
1292            };
1293            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1294            let mut privileges = vec![
1295                (
1296                    SystemObjectId::Object(schema_id.clone()),
1297                    AclMode::USAGE,
1298                    role_id,
1299                ),
1300                (SystemObjectId::Object(id.into()), acl_mode, role_id),
1301            ];
1302            let mut seen = BTreeSet::from([(schema_id, role_id)]);
1303
1304            // We don't allow arbitrary sub-queries in `assignments` or `returning`. So either they
1305            // contains a column reference to the outer table or it's constant.
1306            if assignments
1307                .values()
1308                .chain(returning.iter())
1309                .any(|assignment| assignment.contains_column())
1310            {
1311                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1312                seen.insert((id.into(), role_id));
1313            }
1314
1315            // TODO(jkosh44) It's fairly difficult to determine what part of `selection` is from a
1316            //  user specified read and what part is from the implementation of the read then write.
1317            //  instead we are overly protective and always require SELECT privileges even though
1318            //  PostgreSQL doesn't always do this.
1319            //  As a concrete example, we require SELECT and UPDATE privileges to execute
1320            //  `UPDATE t SET a = 42;`, while PostgreSQL only requires UPDATE privileges.
1321            let items = selection
1322                .depends_on()
1323                .into_iter()
1324                .map(|gid| catalog.resolve_item_id(&gid));
1325            privileges.extend_from_slice(&generate_read_privileges_inner(
1326                catalog, items, role_id, &mut seen,
1327            ));
1328
1329            if let Some(privilege) = generate_cluster_usage_privileges(
1330                selection.as_const().is_some(),
1331                target_cluster_id,
1332                role_id,
1333            ) {
1334                privileges.push(privilege);
1335            }
1336            RbacRequirements {
1337                privileges,
1338                ..Default::default()
1339            }
1340        }
1341        Plan::GrantRole(plan::GrantRolePlan {
1342            role_ids: _,
1343            member_ids: _,
1344            grantor_id: _,
1345        })
1346        | Plan::RevokeRole(plan::RevokeRolePlan {
1347            role_ids: _,
1348            member_ids: _,
1349            grantor_id: _,
1350        }) => RbacRequirements {
1351            privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1352            ..Default::default()
1353        },
1354        Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1355            update_privileges,
1356            grantees: _,
1357        })
1358        | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1359            update_privileges,
1360            revokees: _,
1361        }) => {
1362            let mut privileges = Vec::with_capacity(update_privileges.len());
1363            for UpdatePrivilege { target_id, .. } in update_privileges {
1364                match target_id {
1365                    SystemObjectId::Object(object_id) => match object_id {
1366                        ObjectId::ClusterReplica((cluster_id, _)) => {
1367                            privileges.push((
1368                                SystemObjectId::Object(cluster_id.into()),
1369                                AclMode::USAGE,
1370                                role_id,
1371                            ));
1372                        }
1373                        ObjectId::Schema((database_spec, _)) => match database_spec {
1374                            ResolvedDatabaseSpecifier::Ambient => {}
1375                            ResolvedDatabaseSpecifier::Id(database_id) => {
1376                                privileges.push((
1377                                    SystemObjectId::Object(database_id.into()),
1378                                    AclMode::USAGE,
1379                                    role_id,
1380                                ));
1381                            }
1382                        },
1383                        ObjectId::Item(item_id) => {
1384                            let item = catalog.get_item(item_id);
1385                            privileges.push((
1386                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
1387                                AclMode::USAGE,
1388                                role_id,
1389                            ))
1390                        }
1391                        ObjectId::Cluster(_)
1392                        | ObjectId::Database(_)
1393                        | ObjectId::Role(_)
1394                        | ObjectId::NetworkPolicy(_) => {}
1395                    },
1396                    SystemObjectId::System => {}
1397                }
1398            }
1399            RbacRequirements {
1400                ownership: update_privileges
1401                    .iter()
1402                    .filter_map(|update_privilege| update_privilege.target_id.object_id())
1403                    .cloned()
1404                    .collect(),
1405                privileges,
1406                // To grant/revoke a privilege on some object, generally the grantor/revoker must be the
1407                // owner of that object (or have a grant option on that object which isn't implemented in
1408                // Materialize yet). There is no owner of the entire system, so it's only reasonable to
1409                // restrict granting/revoking system privileges to superusers.
1410                superuser_action: if update_privileges
1411                    .iter()
1412                    .any(|update_privilege| update_privilege.target_id.is_system())
1413                {
1414                    Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1415                } else {
1416                    None
1417                },
1418                ..Default::default()
1419            }
1420        }
1421        Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1422            privilege_objects,
1423            privilege_acl_items: _,
1424            is_grant: _,
1425        }) => RbacRequirements {
1426            role_membership: privilege_objects
1427                .iter()
1428                .map(|privilege_object| privilege_object.role_id)
1429                .collect(),
1430            privileges: privilege_objects
1431                .into_iter()
1432                .filter_map(|privilege_object| {
1433                    if let (Some(database_id), Some(_)) =
1434                        (privilege_object.database_id, privilege_object.schema_id)
1435                    {
1436                        Some((
1437                            SystemObjectId::Object(database_id.into()),
1438                            AclMode::USAGE,
1439                            role_id,
1440                        ))
1441                    } else {
1442                        None
1443                    }
1444                })
1445                .collect(),
1446            // Altering the default privileges for the PUBLIC role (aka ALL ROLES) will affect all roles
1447            // that currently exist and roles that will exist in the future. It's impossible for an exising
1448            // role to be a member of a role that doesn't exist yet, so no current role could possibly have
1449            // the privileges required to alter default privileges for the PUBLIC role. Therefore we
1450            // only superusers can alter default privileges for the PUBLIC role.
1451            superuser_action: if privilege_objects
1452                .iter()
1453                .any(|privilege_object| privilege_object.role_id.is_public())
1454            {
1455                Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1456            } else {
1457                None
1458            },
1459            ..Default::default()
1460        },
1461        Plan::ReassignOwned(plan::ReassignOwnedPlan {
1462            old_roles,
1463            new_role,
1464            reassign_ids: _,
1465        }) => RbacRequirements {
1466            role_membership: old_roles
1467                .into_iter()
1468                .cloned()
1469                .chain(iter::once(*new_role))
1470                .collect(),
1471            ..Default::default()
1472        },
1473        Plan::SideEffectingFunc(func) => {
1474            let role_membership = match func {
1475                SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1476                    .expect("active_conns is required for Plan::SideEffectingFunc")(
1477                    *connection_id
1478                )
1479                .map(|x| [x].into())
1480                .unwrap_or_default(),
1481            };
1482            RbacRequirements {
1483                role_membership,
1484                ..Default::default()
1485            }
1486        }
1487        Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1488            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1489            RbacRequirements {
1490                privileges: vec![
1491                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1492                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1493                ],
1494                ..Default::default()
1495            }
1496        }
1497        Plan::DiscardTemp
1498        | Plan::DiscardAll
1499        | Plan::EmptyQuery
1500        | Plan::ShowAllVariables
1501        | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1502        | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1503        | Plan::SetVariable(plan::SetVariablePlan {
1504            name: _,
1505            value: _,
1506            local: _,
1507        })
1508        | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1509        | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1510        | Plan::StartTransaction(plan::StartTransactionPlan {
1511            access: _,
1512            isolation_level: _,
1513        })
1514        | Plan::CommitTransaction(plan::CommitTransactionPlan {
1515            transaction_type: _,
1516        })
1517        | Plan::AbortTransaction(plan::AbortTransactionPlan {
1518            transaction_type: _,
1519        })
1520        | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1521        | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1522        | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1523        | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1524        | Plan::Declare(plan::DeclarePlan {
1525            name: _,
1526            stmt: _,
1527            sql: _,
1528            params: _,
1529        })
1530        | Plan::Fetch(plan::FetchPlan {
1531            name: _,
1532            count: _,
1533            timeout: _,
1534        })
1535        | Plan::Close(plan::ClosePlan { name: _ })
1536        | Plan::Prepare(plan::PreparePlan {
1537            name: _,
1538            stmt: _,
1539            desc: _,
1540            sql: _,
1541        })
1542        | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1543        | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1544        | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1545    }
1546}
1547
1548/// Reports whether any role has ownership over an object.
1549fn check_owner_roles(
1550    object_id: &ObjectId,
1551    role_ids: &BTreeSet<RoleId>,
1552    catalog: &impl SessionCatalog,
1553) -> bool {
1554    if let Some(owner_id) = catalog.get_owner_id(object_id) {
1555        role_ids.contains(&owner_id)
1556    } else {
1557        true
1558    }
1559}
1560
1561fn ownership_err(
1562    unheld_ownership: Vec<ObjectId>,
1563    catalog: &impl SessionCatalog,
1564) -> Result<(), UnauthorizedError> {
1565    if !unheld_ownership.is_empty() {
1566        let objects = unheld_ownership
1567            .into_iter()
1568            .map(|ownership| match ownership {
1569                ObjectId::Cluster(id) => (
1570                    ObjectType::Cluster,
1571                    catalog.get_cluster(id).name().to_string(),
1572                ),
1573                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1574                    let cluster = catalog.get_cluster(cluster_id);
1575                    let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1576                    // Note: using unchecked here is okay because the values are coming from an
1577                    // already existing name.
1578                    let name = QualifiedReplica {
1579                        cluster: Ident::new_unchecked(cluster.name()),
1580                        replica: Ident::new_unchecked(replica.name()),
1581                    };
1582                    (ObjectType::ClusterReplica, name.to_string())
1583                }
1584                ObjectId::Database(id) => (
1585                    ObjectType::Database,
1586                    catalog.get_database(&id).name().to_string(),
1587                ),
1588                ObjectId::Schema((database_spec, schema_spec)) => {
1589                    let schema = catalog.get_schema(&database_spec, &schema_spec);
1590                    let name = catalog.resolve_full_schema_name(schema.name());
1591                    (ObjectType::Schema, name.to_string())
1592                }
1593                ObjectId::Item(id) => {
1594                    let item = catalog.get_item(&id);
1595                    let name = catalog.resolve_full_name(item.name());
1596                    (item.item_type().into(), name.to_string())
1597                }
1598                ObjectId::NetworkPolicy(id) => (
1599                    ObjectType::NetworkPolicy,
1600                    catalog.get_network_policy(&id).name().to_string(),
1601                ),
1602                ObjectId::Role(_) => unreachable!("roles have no owner"),
1603            })
1604            .collect();
1605        Err(UnauthorizedError::Ownership { objects })
1606    } else {
1607        Ok(())
1608    }
1609}
1610
1611fn generate_required_source_privileges(
1612    name: &QualifiedItemName,
1613    data_source: &DataSourceDesc,
1614    in_cluster: Option<ClusterId>,
1615    role_id: RoleId,
1616) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1617    let mut privileges = vec![(
1618        SystemObjectId::Object(name.qualifiers.clone().into()),
1619        AclMode::CREATE,
1620        role_id,
1621    )];
1622    match (data_source, in_cluster) {
1623        (_, Some(id)) => {
1624            privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1625        }
1626        (DataSourceDesc::Ingestion(_), None) => {
1627            privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1628        }
1629        // Non-ingestion data-sources have meaningless cluster config's (for now...) and they need
1630        // to be ignored.
1631        // This feels very brittle, but there's not much we can do until the UNDEFINED cluster
1632        // config is removed.
1633        (_, None) => {}
1634    }
1635    privileges
1636}
1637
1638/// Generates all the privileges required to execute a read that includes the objects in `ids`.
1639///
1640/// Not only do we need to validate that `role_id` has read privileges on all relations in `ids`,
1641/// but if any object is a view or materialized view then we need to validate that the owner of
1642/// that view has all of the privileges required to execute the query within the view.
1643///
1644/// For more details see: <https://www.postgresql.org/docs/15/rules-privileges.html>
1645fn generate_read_privileges(
1646    catalog: &impl SessionCatalog,
1647    ids: impl Iterator<Item = CatalogItemId>,
1648    role_id: RoleId,
1649) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1650    generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1651}
1652
1653fn generate_read_privileges_inner(
1654    catalog: &impl SessionCatalog,
1655    ids: impl Iterator<Item = CatalogItemId>,
1656    role_id: RoleId,
1657    seen: &mut BTreeSet<(ObjectId, RoleId)>,
1658) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1659    let mut privileges = Vec::new();
1660    let mut views = Vec::new();
1661
1662    for id in ids {
1663        if seen.insert((id.into(), role_id)) {
1664            let item = catalog.get_item(&id);
1665            let schema_id: ObjectId = item.name().qualifiers.clone().into();
1666            if seen.insert((schema_id.clone(), role_id)) {
1667                privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1668            }
1669            match item.item_type() {
1670                CatalogItemType::View
1671                | CatalogItemType::MaterializedView
1672                | CatalogItemType::ContinualTask => {
1673                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1674                    views.push((item.references().items().copied(), item.owner_id()));
1675                }
1676                CatalogItemType::Table | CatalogItemType::Source => {
1677                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1678                }
1679                CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1680                    privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1681                }
1682                CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1683            }
1684        }
1685    }
1686
1687    for (view_ids, view_owner) in views {
1688        privileges.extend_from_slice(&generate_read_privileges_inner(
1689            catalog, view_ids, view_owner, seen,
1690        ));
1691    }
1692
1693    privileges
1694}
1695
1696fn generate_usage_privileges(
1697    catalog: &impl SessionCatalog,
1698    ids: &ResolvedIds,
1699    role_id: RoleId,
1700    item_types: &BTreeSet<CatalogItemType>,
1701) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1702    // Use a `BTreeSet` to remove duplicate privileges.
1703    ids.items()
1704        .filter_map(move |id| {
1705            let item = catalog.get_item(id);
1706            if item_types.contains(&item.item_type()) {
1707                let schema_id = item.name().qualifiers.clone().into();
1708                Some([
1709                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1710                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1711                ])
1712            } else {
1713                None
1714            }
1715        })
1716        .flatten()
1717        .collect()
1718}
1719
1720fn generate_cluster_usage_privileges(
1721    expr_is_const: bool,
1722    target_cluster_id: Option<ClusterId>,
1723    role_id: RoleId,
1724) -> Option<(SystemObjectId, AclMode, RoleId)> {
1725    // TODO(jkosh44) expr hasn't been fully optimized yet, so it might actually be a constant,
1726    //  but we mistakenly think that it's not. For now it's ok to be overly protective.
1727    if !expr_is_const {
1728        if let Some(cluster_id) = target_cluster_id {
1729            return Some((
1730                SystemObjectId::Object(cluster_id.into()),
1731                AclMode::USAGE,
1732                role_id,
1733            ));
1734        }
1735    }
1736
1737    None
1738}
1739
1740fn check_object_privileges(
1741    catalog: &impl SessionCatalog,
1742    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1743    role_membership: BTreeSet<RoleId>,
1744    current_role_id: RoleId,
1745) -> Result<(), UnauthorizedError> {
1746    let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1747    role_memberships.insert(current_role_id, role_membership);
1748    for (object_id, acl_mode, role_id) in privileges {
1749        let role_membership = role_memberships
1750            .entry(role_id)
1751            .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1752        let object_privileges = catalog
1753            .get_privileges(&object_id)
1754            .expect("only object types with privileges will generate required privileges");
1755        let role_privileges = role_membership
1756            .iter()
1757            .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1758            .map(|mz_acl_item| mz_acl_item.acl_mode)
1759            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1760        if !role_privileges.contains(acl_mode) {
1761            let role_name = catalog.get_role(&role_id).name().to_string();
1762            let privileges = acl_mode.to_error_string();
1763            return Err(UnauthorizedError::Privilege {
1764                object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1765                role_name,
1766                privileges,
1767            });
1768        }
1769    }
1770
1771    Ok(())
1772}
1773
1774pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1775    const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1776        .union(AclMode::SELECT)
1777        .union(AclMode::UPDATE)
1778        .union(AclMode::DELETE);
1779    const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1780    const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1781        .union(AclMode::CREATE_DB)
1782        .union(AclMode::CREATE_CLUSTER)
1783        .union(AclMode::CREATE_NETWORK_POLICY);
1784
1785    const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1786    match object_type {
1787        SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1788        SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1789        SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1790        SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1791        SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1792        SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1793        SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1794        SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1795        SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1796        SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1797        SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1798        SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1799        SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1800        SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1801        SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1802        SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1803        SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT,
1804        SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1805    }
1806}
1807
1808pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1809    MzAclItem {
1810        grantee: owner_id,
1811        grantor: owner_id,
1812        acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1813    }
1814}
1815
1816const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1817    match object_type {
1818        ObjectType::Table
1819        | ObjectType::View
1820        | ObjectType::MaterializedView
1821        | ObjectType::Source
1822        | ObjectType::ContinualTask => AclMode::SELECT,
1823        ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1824        ObjectType::Sink
1825        | ObjectType::Index
1826        | ObjectType::Role
1827        | ObjectType::Cluster
1828        | ObjectType::ClusterReplica
1829        | ObjectType::Secret
1830        | ObjectType::Connection
1831        | ObjectType::Database
1832        | ObjectType::Func
1833        | ObjectType::NetworkPolicy => AclMode::empty(),
1834    }
1835}
1836
1837pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1838    let acl_mode = default_builtin_object_acl_mode(object_type);
1839    MzAclItem {
1840        grantee: MZ_SUPPORT_ROLE_ID,
1841        grantor: MZ_SYSTEM_ROLE_ID,
1842        acl_mode,
1843    }
1844}
1845
1846pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1847    let acl_mode = default_builtin_object_acl_mode(object_type);
1848    MzAclItem {
1849        grantee: RoleId::Public,
1850        grantor: MZ_SYSTEM_ROLE_ID,
1851        acl_mode,
1852    }
1853}