Skip to main content

mz_sql/
rbac.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26    CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29    CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30    SchemaSpecifier, SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34    DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40/// Common checks that need to be performed before we can start checking a role's privileges.
41fn rbac_check_preamble(
42    catalog: &impl SessionCatalog,
43    session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45    // PostgreSQL allows users that have their role dropped to perform some actions,
46    // such as `SET ROLE` and certain `SELECT` queries. We haven't implemented
47    // `SET ROLE` and feel it's safer to force to user to re-authenticate if their
48    // role is dropped.
49    if catalog
50        .try_get_role(&session_meta.role_metadata().current_role)
51        .is_none()
52    {
53        return Err(UnauthorizedError::ConcurrentRoleDrop(
54            session_meta.role_metadata().current_role.clone(),
55        ));
56    };
57    if catalog
58        .try_get_role(&session_meta.role_metadata().session_role)
59        .is_none()
60    {
61        return Err(UnauthorizedError::ConcurrentRoleDrop(
62            session_meta.role_metadata().session_role.clone(),
63        ));
64    };
65    if catalog
66        .try_get_role(&session_meta.role_metadata().authenticated_role)
67        .is_none()
68    {
69        return Err(UnauthorizedError::ConcurrentRoleDrop(
70            session_meta.role_metadata().authenticated_role.clone(),
71        ));
72    };
73
74    Ok(())
75}
76
77/// Filters `RbacRequirements` based on the session role metadata and RBAC related feature flags.
78fn filter_requirements(
79    catalog: &impl SessionCatalog,
80    session_meta: &dyn SessionMetadata,
81    rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83    // Skip RBAC non-mandatory checks if RBAC is disabled. However, we never skip RBAC checks for
84    // system roles. This allows us to limit access of system users even when RBAC is off.
85    let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86        && !session_meta.role_metadata().current_role.is_system()
87        && !session_meta.role_metadata().session_role.is_system();
88    // Skip RBAC checks on user items if the session is a superuser.
89    let is_superuser = session_meta.is_superuser();
90    if is_rbac_disabled || is_superuser {
91        return rbac_requirements.filter_to_mandatory_requirements();
92    }
93
94    rbac_requirements
95}
96
97// The default item types that most statements require USAGE privileges for.
98static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99    btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101// CREATE statements require USAGE privileges on the default item types and USAGE privileges on
102// Types.
103pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104    let mut items = DEFAULT_ITEM_USAGE.clone();
105    items.insert(CatalogItemType::Type);
106    items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110/// Errors that can occur due to an unauthorized action.
111#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113    /// The action can only be performed by a superuser.
114    #[error("permission denied to {action}")]
115    Superuser { action: String },
116    /// The action requires ownership of an object.
117    #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118    Ownership { objects: Vec<(ObjectType, String)> },
119    /// Altering an owner requires membership of the new owner role.
120    #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121    RoleMembership { role_names: Vec<String> },
122    /// The action requires one or more privileges.
123    #[error("permission denied for {object_description}")]
124    Privilege {
125        object_description: ErrorMessageObjectDescription,
126        role_name: String,
127        privileges: String,
128    },
129    // TODO(jkosh44) When we implement parameter privileges, this can be replaced with a regular
130    //  privilege error.
131    /// The action can only be performed by the mz_system role.
132    #[error("permission denied to {action}")]
133    MzSystem { action: String },
134    /// The action cannot be performed by the mz_support role.
135    #[error("permission denied to {action}")]
136    MzSupport { action: String },
137    /// The active role was dropped while a user was logged in.
138    #[error("role {0} was concurrently dropped")]
139    ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143    pub fn detail(&self) -> Option<String> {
144        match &self {
145            UnauthorizedError::Superuser { action } => {
146                Some(format!("You must be a superuser to {}", action))
147            }
148            UnauthorizedError::Privilege {
149                object_description,
150                role_name,
151                privileges,
152            } => Some(format!(
153                "The '{role_name}' role needs {privileges} privileges on {object_description}"
154            )),
155            UnauthorizedError::MzSystem { .. } => {
156                Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157            }
158            UnauthorizedError::MzSupport { .. } => Some(format!(
159                "The '{}' role has very limited privileges",
160                SUPPORT_USER.name
161            )),
162            UnauthorizedError::ConcurrentRoleDrop(_) => {
163                Some("Please disconnect and re-connect with a valid role.".into())
164            }
165            UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166        }
167    }
168}
169
170/// RBAC requirements for executing a given plan.
171#[derive(Debug)]
172struct RbacRequirements {
173    /// The role memberships required.
174    role_membership: BTreeSet<RoleId>,
175    /// The object ownerships required.
176    ownership: Vec<ObjectId>,
177    /// The privileges required. The tuples are of the form:
178    /// (What object the privilege is on, What privilege is required, Who must possess the privilege).
179    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180    /// The types of catalog items that this plan requires USAGE privileges on.
181    ///
182    /// Most plans will require USAGE on secrets and connections but some plans, like SHOW CREATE,
183    /// can reference an item without requiring any privileges on that item.
184    item_usage: &'static BTreeSet<CatalogItemType>,
185    /// Some action if superuser is required to perform that action, None otherwise.
186    superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190    fn empty() -> RbacRequirements {
191        RbacRequirements {
192            role_membership: BTreeSet::new(),
193            ownership: Vec::new(),
194            privileges: Vec::new(),
195            item_usage: &EMPTY_ITEM_USAGE,
196            superuser_action: None,
197        }
198    }
199
200    fn validate(
201        self,
202        catalog: &impl SessionCatalog,
203        session: &dyn SessionMetadata,
204        resolved_ids: &ResolvedIds,
205    ) -> Result<(), UnauthorizedError> {
206        // Obtain all roles that the current session is a member of.
207        let role_membership =
208            catalog.collect_role_membership(&session.role_metadata().current_role);
209
210        check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212        // Validate that the current session has the required role membership to execute the provided
213        // plan.
214        let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215        if !unheld_membership.is_empty() {
216            let role_names = unheld_membership
217                .into_iter()
218                .map(|role_id| {
219                    // Some role references may no longer exist due to concurrent drops.
220                    catalog
221                        .try_get_role(role_id)
222                        .map(|role| role.name().to_string())
223                        .unwrap_or_else(|| role_id.to_string())
224                })
225                .collect();
226            return Err(UnauthorizedError::RoleMembership { role_names });
227        }
228
229        // Validate that the current session has the required object ownership to execute the provided
230        // plan.
231        let unheld_ownership = self
232            .ownership
233            .into_iter()
234            .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235            .collect();
236        ownership_err(unheld_ownership, catalog)?;
237
238        check_object_privileges(
239            catalog,
240            self.privileges,
241            role_membership,
242            session.role_metadata().current_role,
243        )?;
244
245        if let Some(action) = self.superuser_action {
246            return Err(UnauthorizedError::Superuser { action });
247        }
248
249        Ok(())
250    }
251
252    fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253        let RbacRequirements {
254            role_membership,
255            ownership,
256            privileges,
257            item_usage,
258            superuser_action: _,
259        } = self;
260        let role_membership = role_membership
261            .into_iter()
262            .filter(|id| id.is_system())
263            .collect();
264        let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265        let privileges = privileges
266            .into_iter()
267            .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268            // We allow reading objects for superusers and when RBAC is off.
269            .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270            .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271            .collect();
272        let superuser_action = None;
273        RbacRequirements {
274            role_membership,
275            ownership,
276            privileges,
277            item_usage,
278            superuser_action,
279        }
280    }
281}
282
283impl Default for RbacRequirements {
284    fn default() -> Self {
285        RbacRequirements {
286            role_membership: BTreeSet::new(),
287            ownership: Vec::new(),
288            privileges: Vec::new(),
289            item_usage: &DEFAULT_ITEM_USAGE,
290            superuser_action: None,
291        }
292    }
293}
294
295/// Checks if a `session` is authorized to use `resolved_ids`. If not, an error is returned.
296pub fn check_usage(
297    catalog: &impl SessionCatalog,
298    session: &dyn SessionMetadata,
299    resolved_ids: &ResolvedIds,
300    item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302    rbac_check_preamble(catalog, session)?;
303
304    // Obtain all roles that the current session is a member of.
305    let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307    // Certain statements depend on objects that haven't been created yet, like sub-sources, so we
308    // need to filter those out.
309    let existing_resolved_ids =
310        resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312    let required_privileges = generate_usage_privileges(
313        catalog,
314        &existing_resolved_ids,
315        session.role_metadata().current_role,
316        item_types,
317    )
318    .into_iter()
319    .collect();
320
321    let mut rbac_requirements = RbacRequirements::empty();
322    rbac_requirements.privileges = required_privileges;
323    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324    let required_privileges = rbac_requirements.privileges;
325
326    check_object_privileges(
327        catalog,
328        required_privileges,
329        role_membership,
330        session.role_metadata().current_role,
331    )?;
332
333    Ok(())
334}
335
336/// Checks if a session is authorized to execute a plan. If not, an error is returned.
337pub fn check_plan(
338    catalog: &impl SessionCatalog,
339    // Function mapping a connection ID to an authenticated role. The roles may have been dropped concurrently.
340    // Only required for Plan::SideEffectingFunc; can be None for other plan types.
341    // TODO(peek-seq): Remove this when deleting the old peek sequencing. The logic here that uses
342    // `active_conns` is mirrored in `execute_side_effecting_func`, which is what the frontend peek
343    // sequencing uses.
344    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
345    session: &dyn SessionMetadata,
346    plan: &Plan,
347    target_cluster_id: Option<ClusterId>,
348    resolved_ids: &ResolvedIds,
349) -> Result<(), UnauthorizedError> {
350    rbac_check_preamble(catalog, session)?;
351
352    let rbac_requirements = generate_rbac_requirements(
353        catalog,
354        plan,
355        active_conns,
356        target_cluster_id,
357        session.role_metadata().current_role,
358    );
359    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
360    debug!(
361        "rbac requirements {rbac_requirements:?} for plan {:?}",
362        PlanKind::from(plan)
363    );
364    rbac_requirements.validate(catalog, session, resolved_ids)
365}
366
367/// Returns true if RBAC is turned on for a session, false otherwise.
368pub fn is_rbac_enabled_for_session(
369    system_vars: &SystemVars,
370    session: &dyn SessionMetadata,
371) -> bool {
372    let server_enabled = system_vars.enable_rbac_checks();
373    let session_enabled = session.enable_session_rbac_checks();
374
375    // The session flag allows users to turn RBAC on for just their session while the server flag
376    // allows users to turn RBAC on for everyone.
377    server_enabled || session_enabled
378}
379
380/// Generates all requirements needed to execute a given plan.
381fn generate_rbac_requirements(
382    catalog: &impl SessionCatalog,
383    plan: &Plan,
384    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
385    target_cluster_id: Option<ClusterId>,
386    role_id: RoleId,
387) -> RbacRequirements {
388    match plan {
389        Plan::CreateConnection(plan::CreateConnectionPlan {
390            name,
391            if_not_exists: _,
392            connection: _,
393            validate: _,
394        }) => RbacRequirements {
395            privileges: vec![(
396                SystemObjectId::Object(name.qualifiers.clone().into()),
397                AclMode::CREATE,
398                role_id,
399            )],
400            item_usage: &CREATE_ITEM_USAGE,
401            ..Default::default()
402        },
403        Plan::CreateDatabase(plan::CreateDatabasePlan {
404            name: _,
405            if_not_exists: _,
406        }) => RbacRequirements {
407            privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
408            item_usage: &CREATE_ITEM_USAGE,
409            ..Default::default()
410        },
411        Plan::CreateSchema(plan::CreateSchemaPlan {
412            database_spec,
413            schema_name: _,
414            if_not_exists: _,
415        }) => {
416            let privileges = match database_spec {
417                ResolvedDatabaseSpecifier::Ambient => Vec::new(),
418                ResolvedDatabaseSpecifier::Id(database_id) => {
419                    vec![(
420                        SystemObjectId::Object(database_id.into()),
421                        AclMode::CREATE,
422                        role_id,
423                    )]
424                }
425            };
426            RbacRequirements {
427                privileges,
428                item_usage: &CREATE_ITEM_USAGE,
429                ..Default::default()
430            }
431        }
432        Plan::CreateRole(plan::CreateRolePlan {
433            name: _,
434            attributes,
435        }) => {
436            if attributes.superuser.unwrap_or(false) {
437                RbacRequirements {
438                    superuser_action: Some("create superuser role".to_string()),
439                    ..Default::default()
440                }
441            } else {
442                RbacRequirements {
443                    privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
444                    item_usage: &CREATE_ITEM_USAGE,
445                    ..Default::default()
446                }
447            }
448        }
449        Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
450            privileges: vec![(
451                SystemObjectId::System,
452                AclMode::CREATE_NETWORK_POLICY,
453                role_id,
454            )],
455            item_usage: &CREATE_ITEM_USAGE,
456            ..Default::default()
457        },
458        Plan::CreateCluster(plan::CreateClusterPlan {
459            name: _,
460            variant: _,
461            workload_class: _,
462        }) => RbacRequirements {
463            privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
464            item_usage: &CREATE_ITEM_USAGE,
465            ..Default::default()
466        },
467        Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
468            cluster_id,
469            name: _,
470            config: _,
471        }) => RbacRequirements {
472            ownership: vec![ObjectId::Cluster(*cluster_id)],
473            item_usage: &CREATE_ITEM_USAGE,
474            ..Default::default()
475        },
476        Plan::CreateSource(plan::CreateSourcePlan {
477            name,
478            source,
479            if_not_exists: _,
480            timeline: _,
481            in_cluster,
482        }) => RbacRequirements {
483            privileges: generate_required_source_privileges(
484                name,
485                &source.data_source,
486                *in_cluster,
487                role_id,
488            ),
489            item_usage: &CREATE_ITEM_USAGE,
490            ..Default::default()
491        },
492        Plan::CreateSources(plans) => RbacRequirements {
493            privileges: plans
494                .iter()
495                .flat_map(
496                    |plan::CreateSourcePlanBundle {
497                         item_id: _,
498                         global_id: _,
499                         plan:
500                             plan::CreateSourcePlan {
501                                 name,
502                                 source,
503                                 if_not_exists: _,
504                                 timeline: _,
505                                 in_cluster,
506                             },
507                         resolved_ids: _,
508                         available_source_references: _,
509                     }| {
510                        generate_required_source_privileges(
511                            name,
512                            &source.data_source,
513                            *in_cluster,
514                            role_id,
515                        )
516                        .into_iter()
517                    },
518                )
519                .collect(),
520            item_usage: &CREATE_ITEM_USAGE,
521            ..Default::default()
522        },
523        Plan::CreateSecret(plan::CreateSecretPlan {
524            name,
525            secret: _,
526            if_not_exists: _,
527        }) => RbacRequirements {
528            privileges: vec![(
529                SystemObjectId::Object(name.qualifiers.clone().into()),
530                AclMode::CREATE,
531                role_id,
532            )],
533            item_usage: &CREATE_ITEM_USAGE,
534            ..Default::default()
535        },
536        Plan::CreateSink(plan::CreateSinkPlan {
537            name,
538            sink,
539            with_snapshot: _,
540            if_not_exists: _,
541            in_cluster,
542        }) => {
543            let mut privileges = vec![(
544                SystemObjectId::Object(name.qualifiers.clone().into()),
545                AclMode::CREATE,
546                role_id,
547            )];
548            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
549            privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
550            privileges.push((
551                SystemObjectId::Object(in_cluster.into()),
552                AclMode::CREATE,
553                role_id,
554            ));
555            RbacRequirements {
556                privileges,
557                item_usage: &CREATE_ITEM_USAGE,
558                ..Default::default()
559            }
560        }
561        Plan::CreateTable(plan::CreateTablePlan {
562            name,
563            table: _,
564            if_not_exists: _,
565        }) => RbacRequirements {
566            privileges: vec![(
567                SystemObjectId::Object(name.qualifiers.clone().into()),
568                AclMode::CREATE,
569                role_id,
570            )],
571            item_usage: &CREATE_ITEM_USAGE,
572            ..Default::default()
573        },
574        Plan::CreateView(plan::CreateViewPlan {
575            name,
576            view: _,
577            replace,
578            drop_ids: _,
579            if_not_exists: _,
580            ambiguous_columns: _,
581        }) => RbacRequirements {
582            ownership: replace
583                .map(|id| vec![ObjectId::Item(id)])
584                .unwrap_or_default(),
585            privileges: vec![(
586                SystemObjectId::Object(name.qualifiers.clone().into()),
587                AclMode::CREATE,
588                role_id,
589            )],
590            item_usage: &CREATE_ITEM_USAGE,
591            ..Default::default()
592        },
593        Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
594            name,
595            materialized_view,
596            replace,
597            drop_ids: _,
598            if_not_exists: _,
599            ambiguous_columns: _,
600        }) => RbacRequirements {
601            ownership: replace
602                .map(|id| vec![ObjectId::Item(id)])
603                .unwrap_or_default(),
604            privileges: vec![
605                (
606                    SystemObjectId::Object(name.qualifiers.clone().into()),
607                    AclMode::CREATE,
608                    role_id,
609                ),
610                (
611                    SystemObjectId::Object(materialized_view.cluster_id.into()),
612                    AclMode::CREATE,
613                    role_id,
614                ),
615            ],
616            item_usage: &CREATE_ITEM_USAGE,
617            ..Default::default()
618        },
619        Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
620            name,
621            placeholder_id: _,
622            desc: _,
623            input_id: _,
624            with_snapshot: _,
625            continual_task,
626        }) => RbacRequirements {
627            privileges: vec![
628                (
629                    SystemObjectId::Object(name.qualifiers.clone().into()),
630                    AclMode::CREATE,
631                    role_id,
632                ),
633                (
634                    SystemObjectId::Object(continual_task.cluster_id.into()),
635                    AclMode::CREATE,
636                    role_id,
637                ),
638            ],
639            item_usage: &CREATE_ITEM_USAGE,
640            ..Default::default()
641        },
642        Plan::CreateIndex(plan::CreateIndexPlan {
643            name,
644            index,
645            if_not_exists: _,
646        }) => {
647            let index_on_item = catalog.resolve_item_id(&index.on);
648            RbacRequirements {
649                ownership: vec![ObjectId::Item(index_on_item)],
650                privileges: vec![
651                    (
652                        SystemObjectId::Object(name.qualifiers.clone().into()),
653                        AclMode::CREATE,
654                        role_id,
655                    ),
656                    (
657                        SystemObjectId::Object(index.cluster_id.into()),
658                        AclMode::CREATE,
659                        role_id,
660                    ),
661                ],
662                item_usage: &CREATE_ITEM_USAGE,
663                ..Default::default()
664            }
665        }
666        Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
667            privileges: vec![(
668                SystemObjectId::Object(name.qualifiers.clone().into()),
669                AclMode::CREATE,
670                role_id,
671            )],
672            item_usage: &CREATE_ITEM_USAGE,
673            ..Default::default()
674        },
675        Plan::Comment(plan::CommentPlan {
676            object_id,
677            sub_component: _,
678            comment: _,
679        }) => {
680            let (ownership, privileges) = match object_id {
681                // Roles don't have owners, instead we require the current session to have the
682                // `CREATEROLE` privilege.
683                CommentObjectId::Role(_) => (
684                    Vec::new(),
685                    vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
686                ),
687                _ => (vec![ObjectId::from(*object_id)], Vec::new()),
688            };
689            RbacRequirements {
690                ownership,
691                privileges,
692                ..Default::default()
693            }
694        }
695        Plan::DropObjects(plan::DropObjectsPlan {
696            referenced_ids,
697            drop_ids: _,
698            object_type,
699        }) => {
700            let privileges = if object_type == &ObjectType::Role {
701                vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
702            } else {
703                referenced_ids
704                    .iter()
705                    .filter_map(|id| match id {
706                        ObjectId::ClusterReplica((cluster_id, _)) => Some((
707                            SystemObjectId::Object(cluster_id.into()),
708                            AclMode::USAGE,
709                            role_id,
710                        )),
711                        ObjectId::Schema((database_spec, _)) => match database_spec {
712                            ResolvedDatabaseSpecifier::Ambient => None,
713                            ResolvedDatabaseSpecifier::Id(database_id) => Some((
714                                SystemObjectId::Object(database_id.into()),
715                                AclMode::USAGE,
716                                role_id,
717                            )),
718                        },
719                        ObjectId::Item(item_id) => {
720                            let item = catalog.get_item(item_id);
721                            Some((
722                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
723                                AclMode::USAGE,
724                                role_id,
725                            ))
726                        }
727                        ObjectId::Cluster(_)
728                        | ObjectId::Database(_)
729                        | ObjectId::Role(_)
730                        | ObjectId::NetworkPolicy(_) => None,
731                    })
732                    .collect()
733            };
734            RbacRequirements {
735                // Do not need ownership of descendant objects.
736                ownership: referenced_ids.clone(),
737                privileges,
738                ..Default::default()
739            }
740        }
741        Plan::DropOwned(plan::DropOwnedPlan {
742            role_ids,
743            drop_ids: _,
744            privilege_revokes: _,
745            default_privilege_revokes: _,
746        }) => RbacRequirements {
747            role_membership: role_ids.into_iter().cloned().collect(),
748            ..Default::default()
749        },
750        Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
751            let container_id = match id {
752                ObjectId::Item(id) => Some(SystemObjectId::Object(
753                    catalog.get_item(id).name().qualifiers.clone().into(),
754                )),
755                ObjectId::Schema((database_id, _schema_id)) => match database_id {
756                    ResolvedDatabaseSpecifier::Ambient => None,
757                    ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
758                },
759                ObjectId::Cluster(_)
760                | ObjectId::ClusterReplica(_)
761                | ObjectId::Database(_)
762                | ObjectId::Role(_)
763                | ObjectId::NetworkPolicy(_) => None,
764            };
765            let privileges = match container_id {
766                Some(id) => vec![(id, AclMode::USAGE, role_id)],
767                None => Vec::new(),
768            };
769            RbacRequirements {
770                privileges,
771                item_usage: &EMPTY_ITEM_USAGE,
772                ..Default::default()
773            }
774        }
775        Plan::ShowColumns(plan::ShowColumnsPlan {
776            id,
777            select_plan,
778            new_resolved_ids: _,
779        }) => {
780            let mut privileges = vec![(
781                SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
782                AclMode::USAGE,
783                role_id,
784            )];
785
786            for privilege in generate_rbac_requirements(
787                catalog,
788                &Plan::Select(select_plan.clone()),
789                active_conns,
790                target_cluster_id,
791                role_id,
792            )
793            .privileges
794            {
795                privileges.push(privilege);
796            }
797            RbacRequirements {
798                privileges,
799                ..Default::default()
800            }
801        }
802        Plan::Select(plan::SelectPlan {
803            source,
804            select: _,
805            when: _,
806            finishing: _,
807            copy_to: _,
808        }) => {
809            let items = source
810                .depends_on()
811                .into_iter()
812                .map(|gid| catalog.resolve_item_id(&gid));
813            let mut privileges = generate_read_privileges(catalog, items, role_id);
814            if let Some(privilege) = generate_cluster_usage_privileges(
815                source.as_const().is_some(),
816                target_cluster_id,
817                role_id,
818            ) {
819                privileges.push(privilege);
820            }
821            RbacRequirements {
822                privileges,
823                ..Default::default()
824            }
825        }
826        Plan::Subscribe(plan::SubscribePlan {
827            from,
828            with_snapshot: _,
829            when: _,
830            up_to: _,
831            copy_to: _,
832            emit_progress: _,
833            output: _,
834        }) => {
835            let items = from
836                .depends_on()
837                .into_iter()
838                .map(|gid| catalog.resolve_item_id(&gid));
839            let mut privileges = generate_read_privileges(catalog, items, role_id);
840            if let Some(cluster_id) = target_cluster_id {
841                privileges.push((
842                    SystemObjectId::Object(cluster_id.into()),
843                    AclMode::USAGE,
844                    role_id,
845                ));
846            }
847            RbacRequirements {
848                privileges,
849                ..Default::default()
850            }
851        }
852        Plan::CopyFrom(plan::CopyFromPlan {
853            target_name: _,
854            target_id,
855            source: _,
856            columns: _,
857            source_desc: _,
858            mfp: _,
859            params: _,
860            filter: _,
861        }) => RbacRequirements {
862            privileges: vec![
863                (
864                    SystemObjectId::Object(
865                        catalog.get_item(target_id).name().qualifiers.clone().into(),
866                    ),
867                    AclMode::USAGE,
868                    role_id,
869                ),
870                (
871                    SystemObjectId::Object(target_id.into()),
872                    AclMode::INSERT,
873                    role_id,
874                ),
875            ],
876            ..Default::default()
877        },
878        Plan::CopyTo(plan::CopyToPlan {
879            select_plan,
880            desc: _,
881            to: _,
882            connection: _,
883            connection_id: _,
884            format: _,
885            max_file_size: _,
886        }) => {
887            let items = select_plan
888                .source
889                .depends_on()
890                .into_iter()
891                .map(|gid| catalog.resolve_item_id(&gid));
892            let mut privileges = generate_read_privileges(catalog, items, role_id);
893            if let Some(cluster_id) = target_cluster_id {
894                privileges.push((
895                    SystemObjectId::Object(cluster_id.into()),
896                    AclMode::USAGE,
897                    role_id,
898                ));
899            }
900            RbacRequirements {
901                privileges,
902                ..Default::default()
903            }
904        }
905        Plan::ExplainPlan(plan::ExplainPlanPlan {
906            stage: _,
907            format: _,
908            config: _,
909            explainee,
910        })
911        | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
912            privileges: match explainee {
913                Explainee::View(id)
914                | Explainee::MaterializedView(id)
915                | Explainee::Index(id)
916                | Explainee::ReplanView(id)
917                | Explainee::ReplanMaterializedView(id)
918                | Explainee::ReplanIndex(id) => {
919                    let item = catalog.get_item(id);
920                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
921                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
922                }
923                Explainee::Statement(stmt) => stmt
924                    .depends_on()
925                    .into_iter()
926                    .map(|id| {
927                        let item = catalog.get_item_by_global_id(&id);
928                        let schema_id: ObjectId = item.name().qualifiers.clone().into();
929                        (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
930                    })
931                    .collect(),
932            },
933            item_usage: match explainee {
934                Explainee::View(..)
935                | Explainee::MaterializedView(..)
936                | Explainee::Index(..)
937                | Explainee::ReplanView(..)
938                | Explainee::ReplanMaterializedView(..)
939                | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
940                Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
941            },
942            ..Default::default()
943        },
944        Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
945            RbacRequirements {
946                privileges: {
947                    let item = catalog.get_item_by_global_id(sink_from);
948                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
949                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
950                },
951                item_usage: &EMPTY_ITEM_USAGE,
952                ..Default::default()
953            }
954        }
955        Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
956            format: _,
957            raw_plan,
958            when: _,
959        }) => RbacRequirements {
960            privileges: raw_plan
961                .depends_on()
962                .into_iter()
963                .map(|id| {
964                    let item = catalog.get_item_by_global_id(&id);
965                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
966                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
967                })
968                .collect(),
969            ..Default::default()
970        },
971        Plan::Insert(plan::InsertPlan {
972            id,
973            values,
974            returning,
975        }) => {
976            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
977            let mut privileges = vec![
978                (
979                    SystemObjectId::Object(schema_id.clone()),
980                    AclMode::USAGE,
981                    role_id,
982                ),
983                (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
984            ];
985            let mut seen = BTreeSet::from([(schema_id, role_id)]);
986
987            // We don't allow arbitrary sub-queries in `returning`. So either it
988            // contains a column reference to the outer table or it's constant.
989            if returning
990                .iter()
991                .any(|assignment| assignment.contains_column())
992            {
993                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
994                seen.insert((id.into(), role_id));
995            }
996
997            let items = values
998                .depends_on()
999                .into_iter()
1000                .map(|gid| catalog.resolve_item_id(&gid));
1001            privileges.extend_from_slice(&generate_read_privileges_inner(
1002                catalog, items, role_id, &mut seen,
1003            ));
1004
1005            if let Some(privilege) = generate_cluster_usage_privileges(
1006                values.as_const().is_some(),
1007                target_cluster_id,
1008                role_id,
1009            ) {
1010                privileges.push(privilege);
1011            } else if !returning.is_empty() {
1012                // TODO(jkosh44) returning may be a constant, but for now we are overly protective
1013                //  and require cluster privileges for all returning.
1014                if let Some(cluster_id) = target_cluster_id {
1015                    privileges.push((
1016                        SystemObjectId::Object(cluster_id.into()),
1017                        AclMode::USAGE,
1018                        role_id,
1019                    ));
1020                }
1021            }
1022            RbacRequirements {
1023                privileges,
1024                ..Default::default()
1025            }
1026        }
1027        Plan::AlterCluster(plan::AlterClusterPlan {
1028            id,
1029            name: _,
1030            options: _,
1031            strategy: _,
1032        }) => RbacRequirements {
1033            ownership: vec![ObjectId::Cluster(*id)],
1034            item_usage: &CREATE_ITEM_USAGE,
1035            ..Default::default()
1036        },
1037        Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1038            ownership: vec![ObjectId::Item(*id)],
1039            privileges: vec![(
1040                SystemObjectId::Object(set_cluster.into()),
1041                AclMode::CREATE,
1042                role_id,
1043            )],
1044            item_usage: &CREATE_ITEM_USAGE,
1045            ..Default::default()
1046        },
1047        Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1048            id,
1049            window: _,
1050            value: _,
1051            object_type: _,
1052        }) => RbacRequirements {
1053            ownership: vec![ObjectId::Item(*id)],
1054            item_usage: &CREATE_ITEM_USAGE,
1055            ..Default::default()
1056        },
1057        Plan::AlterSourceTimestampInterval(plan::AlterSourceTimestampIntervalPlan {
1058            id,
1059            value: _,
1060            interval: _,
1061        }) => RbacRequirements {
1062            ownership: vec![ObjectId::Item(*id)],
1063            item_usage: &CREATE_ITEM_USAGE,
1064            ..Default::default()
1065        },
1066        Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1067            ownership: vec![ObjectId::Item(*id)],
1068            ..Default::default()
1069        },
1070        Plan::AlterSource(plan::AlterSourcePlan {
1071            item_id,
1072            ingestion_id: _,
1073            action: _,
1074        }) => RbacRequirements {
1075            ownership: vec![ObjectId::Item(*item_id)],
1076            item_usage: &CREATE_ITEM_USAGE,
1077            ..Default::default()
1078        },
1079        Plan::AlterSink(plan::AlterSinkPlan {
1080            item_id,
1081            global_id: _,
1082            sink,
1083            with_snapshot: _,
1084            in_cluster,
1085        }) => {
1086            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1087            let mut privileges = generate_read_privileges(catalog, items, role_id);
1088            privileges.push((
1089                SystemObjectId::Object(in_cluster.into()),
1090                AclMode::CREATE,
1091                role_id,
1092            ));
1093            RbacRequirements {
1094                ownership: vec![ObjectId::Item(*item_id)],
1095                privileges,
1096                item_usage: &CREATE_ITEM_USAGE,
1097                ..Default::default()
1098            }
1099        }
1100        Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1101            id,
1102            name: _,
1103            to_name: _,
1104        }) => RbacRequirements {
1105            ownership: vec![ObjectId::Cluster(*id)],
1106            ..Default::default()
1107        },
1108        Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1109            id_a,
1110            id_b,
1111            name_a: _,
1112            name_b: _,
1113            name_temp: _,
1114        }) => RbacRequirements {
1115            ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1116            ..Default::default()
1117        },
1118        Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1119            cluster_id,
1120            replica_id,
1121            name: _,
1122            to_name: _,
1123        }) => RbacRequirements {
1124            ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1125            ..Default::default()
1126        },
1127        Plan::AlterItemRename(plan::AlterItemRenamePlan {
1128            id,
1129            current_full_name: _,
1130            to_name: _,
1131            object_type: _,
1132        }) => RbacRequirements {
1133            ownership: vec![ObjectId::Item(*id)],
1134            ..Default::default()
1135        },
1136        Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1137            cur_schema_spec,
1138            new_schema_name: _,
1139        }) => {
1140            let privileges = match cur_schema_spec.0 {
1141                ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1142                    SystemObjectId::Object(ObjectId::Database(db_id)),
1143                    AclMode::CREATE,
1144                    role_id,
1145                )],
1146                ResolvedDatabaseSpecifier::Ambient => vec![],
1147            };
1148
1149            RbacRequirements {
1150                ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1151                privileges,
1152                ..Default::default()
1153            }
1154        }
1155        Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1156            schema_a_spec,
1157            schema_a_name: _,
1158            schema_b_spec,
1159            schema_b_name: _,
1160            name_temp: _,
1161        }) => {
1162            let mut privileges = vec![];
1163            if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1164                privileges.push((
1165                    SystemObjectId::Object(ObjectId::Database(id_a)),
1166                    AclMode::CREATE,
1167                    role_id,
1168                ));
1169            }
1170            if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1171                privileges.push((
1172                    SystemObjectId::Object(ObjectId::Database(id_b)),
1173                    AclMode::CREATE,
1174                    role_id,
1175                ));
1176            }
1177
1178            RbacRequirements {
1179                ownership: vec![
1180                    ObjectId::Schema(*schema_a_spec),
1181                    ObjectId::Schema(*schema_b_spec),
1182                ],
1183                privileges,
1184                ..Default::default()
1185            }
1186        }
1187        Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1188            ownership: vec![ObjectId::Item(*id)],
1189            item_usage: &CREATE_ITEM_USAGE,
1190            ..Default::default()
1191        },
1192        Plan::AlterRole(plan::AlterRolePlan {
1193            id,
1194            name: _,
1195            option,
1196        }) => match option {
1197            // Only superusers can alter the superuserness of a role.
1198            plan::PlannedAlterRoleOption::Attributes(attributes)
1199                if attributes.superuser.is_some() =>
1200            {
1201                RbacRequirements {
1202                    superuser_action: Some("alter superuser role".to_string()),
1203                    ..Default::default()
1204                }
1205            }
1206            // Roles are allowed to change their own password.
1207            plan::PlannedAlterRoleOption::Attributes(attributes)
1208                if attributes.password.is_some() && role_id == *id =>
1209            {
1210                RbacRequirements::default()
1211            }
1212            // But no one elses...
1213            plan::PlannedAlterRoleOption::Attributes(attributes)
1214                if attributes.password.is_some() =>
1215            {
1216                RbacRequirements {
1217                    superuser_action: Some("alter password of role".to_string()),
1218                    ..Default::default()
1219                }
1220            }
1221            // Roles are allowed to change their own variables.
1222            plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1223                RbacRequirements::default()
1224            }
1225            // Otherwise to ALTER a role, you need to have the CREATE_ROLE privilege.
1226            _ => RbacRequirements {
1227                privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1228                item_usage: &CREATE_ITEM_USAGE,
1229                ..Default::default()
1230            },
1231        },
1232        Plan::AlterOwner(plan::AlterOwnerPlan {
1233            id,
1234            object_type: _,
1235            new_owner,
1236        }) => {
1237            let privileges = match id {
1238                ObjectId::ClusterReplica((cluster_id, _)) => {
1239                    vec![(
1240                        SystemObjectId::Object(cluster_id.into()),
1241                        AclMode::CREATE,
1242                        role_id,
1243                    )]
1244                }
1245                ObjectId::Schema((database_spec, _)) => match database_spec {
1246                    ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1247                    ResolvedDatabaseSpecifier::Id(database_id) => {
1248                        vec![(
1249                            SystemObjectId::Object(database_id.into()),
1250                            AclMode::CREATE,
1251                            role_id,
1252                        )]
1253                    }
1254                },
1255                ObjectId::Item(item_id) => {
1256                    let item = catalog.get_item(item_id);
1257                    vec![(
1258                        SystemObjectId::Object(item.name().qualifiers.clone().into()),
1259                        AclMode::CREATE,
1260                        role_id,
1261                    )]
1262                }
1263                ObjectId::Cluster(_)
1264                | ObjectId::Database(_)
1265                | ObjectId::Role(_)
1266                | ObjectId::NetworkPolicy(_) => Vec::new(),
1267            };
1268            RbacRequirements {
1269                role_membership: BTreeSet::from([*new_owner]),
1270                ownership: vec![id.clone()],
1271                privileges,
1272                ..Default::default()
1273            }
1274        }
1275        Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1276            ownership: vec![ObjectId::Item(*relation_id)],
1277            item_usage: &CREATE_ITEM_USAGE,
1278            ..Default::default()
1279        },
1280        Plan::AlterMaterializedViewApplyReplacement(
1281            plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1282        ) => RbacRequirements {
1283            ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1284            item_usage: &CREATE_ITEM_USAGE,
1285            ..Default::default()
1286        },
1287        Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1288            ownership: vec![ObjectId::NetworkPolicy(*id)],
1289            item_usage: &CREATE_ITEM_USAGE,
1290            ..Default::default()
1291        },
1292        Plan::ReadThenWrite(plan::ReadThenWritePlan {
1293            id,
1294            selection,
1295            finishing: _,
1296            assignments,
1297            kind,
1298            returning,
1299        }) => {
1300            let acl_mode = match kind {
1301                MutationKind::Insert => AclMode::INSERT,
1302                MutationKind::Update => AclMode::UPDATE,
1303                MutationKind::Delete => AclMode::DELETE,
1304            };
1305            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1306            let mut privileges = vec![
1307                (
1308                    SystemObjectId::Object(schema_id.clone()),
1309                    AclMode::USAGE,
1310                    role_id,
1311                ),
1312                (SystemObjectId::Object(id.into()), acl_mode, role_id),
1313            ];
1314            let mut seen = BTreeSet::from([(schema_id, role_id)]);
1315
1316            // We don't allow arbitrary sub-queries in `assignments` or `returning`. So either they
1317            // contains a column reference to the outer table or it's constant.
1318            if assignments
1319                .values()
1320                .chain(returning.iter())
1321                .any(|assignment| assignment.contains_column())
1322            {
1323                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1324                seen.insert((id.into(), role_id));
1325            }
1326
1327            // TODO(jkosh44) It's fairly difficult to determine what part of `selection` is from a
1328            //  user specified read and what part is from the implementation of the read then write.
1329            //  instead we are overly protective and always require SELECT privileges even though
1330            //  PostgreSQL doesn't always do this.
1331            //  As a concrete example, we require SELECT and UPDATE privileges to execute
1332            //  `UPDATE t SET a = 42;`, while PostgreSQL only requires UPDATE privileges.
1333            let items = selection
1334                .depends_on()
1335                .into_iter()
1336                .map(|gid| catalog.resolve_item_id(&gid));
1337            privileges.extend_from_slice(&generate_read_privileges_inner(
1338                catalog, items, role_id, &mut seen,
1339            ));
1340
1341            if let Some(privilege) = generate_cluster_usage_privileges(
1342                selection.as_const().is_some(),
1343                target_cluster_id,
1344                role_id,
1345            ) {
1346                privileges.push(privilege);
1347            }
1348            RbacRequirements {
1349                privileges,
1350                ..Default::default()
1351            }
1352        }
1353        Plan::GrantRole(plan::GrantRolePlan {
1354            role_ids: _,
1355            member_ids: _,
1356            grantor_id: _,
1357        })
1358        | Plan::RevokeRole(plan::RevokeRolePlan {
1359            role_ids: _,
1360            member_ids: _,
1361            grantor_id: _,
1362        }) => RbacRequirements {
1363            privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1364            ..Default::default()
1365        },
1366        Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1367            update_privileges,
1368            grantees: _,
1369        })
1370        | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1371            update_privileges,
1372            revokees: _,
1373        }) => {
1374            let mut privileges = Vec::with_capacity(update_privileges.len());
1375            for UpdatePrivilege { target_id, .. } in update_privileges {
1376                match target_id {
1377                    SystemObjectId::Object(object_id) => match object_id {
1378                        ObjectId::ClusterReplica((cluster_id, _)) => {
1379                            privileges.push((
1380                                SystemObjectId::Object(cluster_id.into()),
1381                                AclMode::USAGE,
1382                                role_id,
1383                            ));
1384                        }
1385                        ObjectId::Schema((database_spec, _)) => match database_spec {
1386                            ResolvedDatabaseSpecifier::Ambient => {}
1387                            ResolvedDatabaseSpecifier::Id(database_id) => {
1388                                privileges.push((
1389                                    SystemObjectId::Object(database_id.into()),
1390                                    AclMode::USAGE,
1391                                    role_id,
1392                                ));
1393                            }
1394                        },
1395                        ObjectId::Item(item_id) => {
1396                            let item = catalog.get_item(item_id);
1397                            privileges.push((
1398                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
1399                                AclMode::USAGE,
1400                                role_id,
1401                            ))
1402                        }
1403                        ObjectId::Cluster(_)
1404                        | ObjectId::Database(_)
1405                        | ObjectId::Role(_)
1406                        | ObjectId::NetworkPolicy(_) => {}
1407                    },
1408                    SystemObjectId::System => {}
1409                }
1410            }
1411            RbacRequirements {
1412                ownership: update_privileges
1413                    .iter()
1414                    .filter_map(|update_privilege| update_privilege.target_id.object_id())
1415                    .cloned()
1416                    .collect(),
1417                privileges,
1418                // To grant/revoke a privilege on some object, generally the grantor/revoker must be the
1419                // owner of that object (or have a grant option on that object which isn't implemented in
1420                // Materialize yet). There is no owner of the entire system, so it's only reasonable to
1421                // restrict granting/revoking system privileges to superusers.
1422                superuser_action: if update_privileges
1423                    .iter()
1424                    .any(|update_privilege| update_privilege.target_id.is_system())
1425                {
1426                    Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1427                } else {
1428                    None
1429                },
1430                ..Default::default()
1431            }
1432        }
1433        Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1434            privilege_objects,
1435            privilege_acl_items: _,
1436            is_grant: _,
1437        }) => RbacRequirements {
1438            role_membership: privilege_objects
1439                .iter()
1440                .map(|privilege_object| privilege_object.role_id)
1441                .collect(),
1442            privileges: privilege_objects
1443                .into_iter()
1444                .filter_map(|privilege_object| {
1445                    if let (Some(database_id), Some(_)) =
1446                        (privilege_object.database_id, privilege_object.schema_id)
1447                    {
1448                        Some((
1449                            SystemObjectId::Object(database_id.into()),
1450                            AclMode::USAGE,
1451                            role_id,
1452                        ))
1453                    } else {
1454                        None
1455                    }
1456                })
1457                .collect(),
1458            // Altering the default privileges for the PUBLIC role (aka ALL ROLES) will affect all roles
1459            // that currently exist and roles that will exist in the future. It's impossible for an exising
1460            // role to be a member of a role that doesn't exist yet, so no current role could possibly have
1461            // the privileges required to alter default privileges for the PUBLIC role. Therefore we
1462            // only superusers can alter default privileges for the PUBLIC role.
1463            superuser_action: if privilege_objects
1464                .iter()
1465                .any(|privilege_object| privilege_object.role_id.is_public())
1466            {
1467                Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1468            } else {
1469                None
1470            },
1471            ..Default::default()
1472        },
1473        Plan::ReassignOwned(plan::ReassignOwnedPlan {
1474            old_roles,
1475            new_role,
1476            reassign_ids: _,
1477        }) => RbacRequirements {
1478            role_membership: old_roles
1479                .into_iter()
1480                .cloned()
1481                .chain(iter::once(*new_role))
1482                .collect(),
1483            ..Default::default()
1484        },
1485        Plan::SideEffectingFunc(func) => {
1486            let role_membership = match func {
1487                SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1488                    .expect("active_conns is required for Plan::SideEffectingFunc")(
1489                    *connection_id
1490                )
1491                .map(|x| [x].into())
1492                .unwrap_or_default(),
1493            };
1494            RbacRequirements {
1495                role_membership,
1496                ..Default::default()
1497            }
1498        }
1499        Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1500            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1501            RbacRequirements {
1502                privileges: vec![
1503                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1504                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1505                ],
1506                ..Default::default()
1507            }
1508        }
1509        Plan::DiscardTemp
1510        | Plan::DiscardAll
1511        | Plan::EmptyQuery
1512        | Plan::ShowAllVariables
1513        | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1514        | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1515        | Plan::SetVariable(plan::SetVariablePlan {
1516            name: _,
1517            value: _,
1518            local: _,
1519        })
1520        | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1521        | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1522        | Plan::StartTransaction(plan::StartTransactionPlan {
1523            access: _,
1524            isolation_level: _,
1525        })
1526        | Plan::CommitTransaction(plan::CommitTransactionPlan {
1527            transaction_type: _,
1528        })
1529        | Plan::AbortTransaction(plan::AbortTransactionPlan {
1530            transaction_type: _,
1531        })
1532        | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1533        | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1534        | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1535        | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1536        | Plan::Declare(plan::DeclarePlan {
1537            name: _,
1538            stmt: _,
1539            sql: _,
1540            params: _,
1541        })
1542        | Plan::Fetch(plan::FetchPlan {
1543            name: _,
1544            count: _,
1545            timeout: _,
1546        })
1547        | Plan::Close(plan::ClosePlan { name: _ })
1548        | Plan::Prepare(plan::PreparePlan {
1549            name: _,
1550            stmt: _,
1551            desc: _,
1552            sql: _,
1553        })
1554        | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1555        | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1556        | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1557    }
1558}
1559
1560/// Reports whether any role has ownership over an object.
1561fn check_owner_roles(
1562    object_id: &ObjectId,
1563    role_ids: &BTreeSet<RoleId>,
1564    catalog: &impl SessionCatalog,
1565) -> bool {
1566    if let Some(owner_id) = catalog.get_owner_id(object_id) {
1567        role_ids.contains(&owner_id)
1568    } else {
1569        true
1570    }
1571}
1572
1573fn ownership_err(
1574    unheld_ownership: Vec<ObjectId>,
1575    catalog: &impl SessionCatalog,
1576) -> Result<(), UnauthorizedError> {
1577    if !unheld_ownership.is_empty() {
1578        let objects = unheld_ownership
1579            .into_iter()
1580            .map(|ownership| match ownership {
1581                ObjectId::Cluster(id) => (
1582                    ObjectType::Cluster,
1583                    catalog.get_cluster(id).name().to_string(),
1584                ),
1585                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1586                    let cluster = catalog.get_cluster(cluster_id);
1587                    let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1588                    // Note: using unchecked here is okay because the values are coming from an
1589                    // already existing name.
1590                    let name = QualifiedReplica {
1591                        cluster: Ident::new_unchecked(cluster.name()),
1592                        replica: Ident::new_unchecked(replica.name()),
1593                    };
1594                    (ObjectType::ClusterReplica, name.to_string())
1595                }
1596                ObjectId::Database(id) => (
1597                    ObjectType::Database,
1598                    catalog.get_database(&id).name().to_string(),
1599                ),
1600                ObjectId::Schema((database_spec, schema_spec)) => {
1601                    let schema = catalog.get_schema(&database_spec, &schema_spec);
1602                    let name = catalog.resolve_full_schema_name(schema.name());
1603                    (ObjectType::Schema, name.to_string())
1604                }
1605                ObjectId::Item(id) => {
1606                    let item = catalog.get_item(&id);
1607                    let name = catalog.resolve_full_name(item.name());
1608                    (item.item_type().into(), name.to_string())
1609                }
1610                ObjectId::NetworkPolicy(id) => (
1611                    ObjectType::NetworkPolicy,
1612                    catalog.get_network_policy(&id).name().to_string(),
1613                ),
1614                ObjectId::Role(_) => unreachable!("roles have no owner"),
1615            })
1616            .collect();
1617        Err(UnauthorizedError::Ownership { objects })
1618    } else {
1619        Ok(())
1620    }
1621}
1622
1623fn generate_required_source_privileges(
1624    name: &QualifiedItemName,
1625    data_source: &DataSourceDesc,
1626    in_cluster: Option<ClusterId>,
1627    role_id: RoleId,
1628) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1629    let mut privileges = vec![(
1630        SystemObjectId::Object(name.qualifiers.clone().into()),
1631        AclMode::CREATE,
1632        role_id,
1633    )];
1634    match (data_source, in_cluster) {
1635        (_, Some(id)) => {
1636            privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1637        }
1638        (DataSourceDesc::Ingestion(_), None) => {
1639            privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1640        }
1641        // Non-ingestion data-sources have meaningless cluster config's (for now...) and they need
1642        // to be ignored.
1643        // This feels very brittle, but there's not much we can do until the UNDEFINED cluster
1644        // config is removed.
1645        (_, None) => {}
1646    }
1647    privileges
1648}
1649
1650/// Generates all the privileges required to execute a read that includes the objects in `ids`.
1651///
1652/// Not only do we need to validate that `role_id` has read privileges on all relations in `ids`,
1653/// but if any object is a view or materialized view then we need to validate that the owner of
1654/// that view has all of the privileges required to execute the query within the view.
1655///
1656/// For more details see: <https://www.postgresql.org/docs/15/rules-privileges.html>
1657fn generate_read_privileges(
1658    catalog: &impl SessionCatalog,
1659    ids: impl Iterator<Item = CatalogItemId>,
1660    role_id: RoleId,
1661) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1662    generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1663}
1664
1665fn generate_read_privileges_inner(
1666    catalog: &impl SessionCatalog,
1667    ids: impl Iterator<Item = CatalogItemId>,
1668    role_id: RoleId,
1669    seen: &mut BTreeSet<(ObjectId, RoleId)>,
1670) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1671    let mut privileges = Vec::new();
1672    let mut views = Vec::new();
1673
1674    for id in ids {
1675        if seen.insert((id.into(), role_id)) {
1676            let item = catalog.get_item(&id);
1677            let schema_id: ObjectId = item.name().qualifiers.clone().into();
1678            if seen.insert((schema_id.clone(), role_id)) {
1679                privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1680            }
1681            match item.item_type() {
1682                CatalogItemType::View
1683                | CatalogItemType::MaterializedView
1684                | CatalogItemType::ContinualTask => {
1685                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1686                    views.push((item.references().items().copied(), item.owner_id()));
1687                }
1688                CatalogItemType::Table | CatalogItemType::Source => {
1689                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1690                }
1691                CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1692                    privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1693                }
1694                CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1695            }
1696        }
1697    }
1698
1699    for (view_ids, view_owner) in views {
1700        privileges.extend_from_slice(&generate_read_privileges_inner(
1701            catalog, view_ids, view_owner, seen,
1702        ));
1703    }
1704
1705    privileges
1706}
1707
1708fn generate_usage_privileges(
1709    catalog: &impl SessionCatalog,
1710    ids: &ResolvedIds,
1711    role_id: RoleId,
1712    item_types: &BTreeSet<CatalogItemType>,
1713) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1714    // Use a `BTreeSet` to remove duplicate privileges.
1715    ids.items()
1716        .filter_map(move |id| {
1717            let item = catalog.get_item(id);
1718            if item_types.contains(&item.item_type()) {
1719                let schema_id = item.name().qualifiers.clone().into();
1720                Some([
1721                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1722                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1723                ])
1724            } else {
1725                None
1726            }
1727        })
1728        .flatten()
1729        .collect()
1730}
1731
1732fn generate_cluster_usage_privileges(
1733    expr_is_const: bool,
1734    target_cluster_id: Option<ClusterId>,
1735    role_id: RoleId,
1736) -> Option<(SystemObjectId, AclMode, RoleId)> {
1737    // TODO(jkosh44) expr hasn't been fully optimized yet, so it might actually be a constant,
1738    //  but we mistakenly think that it's not. For now it's ok to be overly protective.
1739    if !expr_is_const {
1740        if let Some(cluster_id) = target_cluster_id {
1741            return Some((
1742                SystemObjectId::Object(cluster_id.into()),
1743                AclMode::USAGE,
1744                role_id,
1745            ));
1746        }
1747    }
1748
1749    None
1750}
1751
1752fn check_object_privileges(
1753    catalog: &impl SessionCatalog,
1754    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1755    role_membership: BTreeSet<RoleId>,
1756    current_role_id: RoleId,
1757) -> Result<(), UnauthorizedError> {
1758    let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1759    role_memberships.insert(current_role_id, role_membership);
1760    for (object_id, acl_mode, role_id) in privileges {
1761        // Temporary schemas are owned by the connection that created them,
1762        // so users implicitly have all privileges on their own temp schema.
1763        // The schema may not exist yet (lazy creation), so we skip the check.
1764        if matches!(
1765            &object_id,
1766            SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary)))
1767        ) {
1768            continue;
1769        }
1770
1771        let role_membership = role_memberships
1772            .entry(role_id)
1773            .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1774        let object_privileges = catalog
1775            .get_privileges(&object_id)
1776            .expect("only object types with privileges will generate required privileges");
1777        let role_privileges = role_membership
1778            .iter()
1779            .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1780            .map(|mz_acl_item| mz_acl_item.acl_mode)
1781            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1782        if !role_privileges.contains(acl_mode) {
1783            let role_name = catalog.get_role(&role_id).name().to_string();
1784            let privileges = acl_mode.to_error_string();
1785            return Err(UnauthorizedError::Privilege {
1786                object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1787                role_name,
1788                privileges,
1789            });
1790        }
1791    }
1792
1793    Ok(())
1794}
1795
1796pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1797    const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1798        .union(AclMode::SELECT)
1799        .union(AclMode::UPDATE)
1800        .union(AclMode::DELETE);
1801    const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1802    const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1803        .union(AclMode::CREATE_DB)
1804        .union(AclMode::CREATE_CLUSTER)
1805        .union(AclMode::CREATE_NETWORK_POLICY);
1806
1807    const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1808    match object_type {
1809        SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1810        SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1811        SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1812        SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1813        SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1814        SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1815        SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1816        SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1817        SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1818        SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1819        SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1820        SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1821        SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1822        SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1823        SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1824        SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1825        SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT,
1826        SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1827    }
1828}
1829
1830pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1831    MzAclItem {
1832        grantee: owner_id,
1833        grantor: owner_id,
1834        acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1835    }
1836}
1837
1838const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1839    match object_type {
1840        ObjectType::Table
1841        | ObjectType::View
1842        | ObjectType::MaterializedView
1843        | ObjectType::Source
1844        | ObjectType::ContinualTask => AclMode::SELECT,
1845        ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1846        ObjectType::Sink
1847        | ObjectType::Index
1848        | ObjectType::Role
1849        | ObjectType::Cluster
1850        | ObjectType::ClusterReplica
1851        | ObjectType::Secret
1852        | ObjectType::Connection
1853        | ObjectType::Database
1854        | ObjectType::Func
1855        | ObjectType::NetworkPolicy => AclMode::empty(),
1856    }
1857}
1858
1859pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1860    let acl_mode = default_builtin_object_acl_mode(object_type);
1861    MzAclItem {
1862        grantee: MZ_SUPPORT_ROLE_ID,
1863        grantor: MZ_SYSTEM_ROLE_ID,
1864        acl_mode,
1865    }
1866}
1867
1868pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1869    let acl_mode = default_builtin_object_acl_mode(object_type);
1870    MzAclItem {
1871        grantee: RoleId::Public,
1872        grantor: MZ_SYSTEM_ROLE_ID,
1873        acl_mode,
1874    }
1875}