Skip to main content

mz_sql/
rbac.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26    CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29    CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30    SchemaSpecifier, SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34    DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40/// Common checks that need to be performed before we can start checking a role's privileges.
41fn rbac_check_preamble(
42    catalog: &impl SessionCatalog,
43    session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45    // PostgreSQL allows users that have their role dropped to perform some actions,
46    // such as `SET ROLE` and certain `SELECT` queries. We haven't implemented
47    // `SET ROLE` and feel it's safer to force to user to re-authenticate if their
48    // role is dropped.
49    if catalog
50        .try_get_role(&session_meta.role_metadata().current_role)
51        .is_none()
52    {
53        return Err(UnauthorizedError::ConcurrentRoleDrop(
54            session_meta.role_metadata().current_role.clone(),
55        ));
56    };
57    if catalog
58        .try_get_role(&session_meta.role_metadata().session_role)
59        .is_none()
60    {
61        return Err(UnauthorizedError::ConcurrentRoleDrop(
62            session_meta.role_metadata().session_role.clone(),
63        ));
64    };
65    if catalog
66        .try_get_role(&session_meta.role_metadata().authenticated_role)
67        .is_none()
68    {
69        return Err(UnauthorizedError::ConcurrentRoleDrop(
70            session_meta.role_metadata().authenticated_role.clone(),
71        ));
72    };
73
74    Ok(())
75}
76
77/// Filters `RbacRequirements` based on the session role metadata and RBAC related feature flags.
78fn filter_requirements(
79    catalog: &impl SessionCatalog,
80    session_meta: &dyn SessionMetadata,
81    rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83    // Skip RBAC non-mandatory checks if RBAC is disabled. However, we never skip RBAC checks for
84    // system roles. This allows us to limit access of system users even when RBAC is off.
85    let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86        && !session_meta.role_metadata().current_role.is_system()
87        && !session_meta.role_metadata().session_role.is_system();
88    // Skip RBAC checks on user items if the session is a superuser.
89    let is_superuser = session_meta.is_superuser();
90    if is_rbac_disabled || is_superuser {
91        return rbac_requirements.filter_to_mandatory_requirements();
92    }
93
94    rbac_requirements
95}
96
97// The default item types that most statements require USAGE privileges for.
98static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99    btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101// CREATE statements require USAGE privileges on the default item types and USAGE privileges on
102// Types.
103pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104    let mut items = DEFAULT_ITEM_USAGE.clone();
105    items.insert(CatalogItemType::Type);
106    items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110/// System catalog objects exempted from `check_restrict_to_user_objects`.
111///
112/// These views are the mechanism by which the MCP agent endpoint discovers
113/// data products the user has access to. Blocking them defeats the isolation
114/// model, so they are explicitly allowed even in restricted sessions.
115static RESTRICT_TO_USER_OBJECTS_ALLOWED_OIDS: LazyLock<BTreeSet<u32>> = LazyLock::new(|| {
116    use mz_pgrepr::oid;
117    btreeset! {
118        oid::VIEW_MZ_MCP_DATA_PRODUCTS_OID,
119        oid::VIEW_MZ_MCP_DATA_PRODUCT_DETAILS_OID,
120    }
121});
122
123/// Errors that can occur due to an unauthorized action.
124#[derive(Debug, thiserror::Error)]
125pub enum UnauthorizedError {
126    /// The action can only be performed by a superuser.
127    #[error("permission denied to {action}")]
128    Superuser { action: String },
129    /// The action requires ownership of an object.
130    #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
131    Ownership { objects: Vec<(ObjectType, String)> },
132    /// Altering an owner requires membership of the new owner role.
133    #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
134    RoleMembership { role_names: Vec<String> },
135    /// The action requires one or more privileges.
136    #[error("permission denied for {object_description}")]
137    Privilege {
138        object_description: ErrorMessageObjectDescription,
139        role_name: String,
140        privileges: String,
141    },
142    // TODO(jkosh44) When we implement parameter privileges, this can be replaced with a regular
143    //  privilege error.
144    /// The action can only be performed by the mz_system role.
145    #[error("permission denied to {action}")]
146    MzSystem { action: String },
147    /// The action cannot be performed by the mz_support role.
148    #[error("permission denied to {action}")]
149    MzSupport { action: String },
150    /// The active role was dropped while a user was logged in.
151    #[error("role {0} was concurrently dropped")]
152    ConcurrentRoleDrop(RoleId),
153    /// Access to system objects is restricted by the restrict_to_user_objects session variable.
154    #[error("access to system object {object_name} is restricted")]
155    RestrictedSystemObject { object_name: String },
156}
157
158impl UnauthorizedError {
159    pub fn detail(&self) -> Option<String> {
160        match &self {
161            UnauthorizedError::Superuser { action } => {
162                Some(format!("You must be a superuser to {}", action))
163            }
164            UnauthorizedError::Privilege {
165                object_description,
166                role_name,
167                privileges,
168            } => Some(format!(
169                "The '{role_name}' role needs {privileges} privileges on {object_description}"
170            )),
171            UnauthorizedError::MzSystem { .. } => {
172                Some(format!("You must be the '{}' role", SYSTEM_USER.name))
173            }
174            UnauthorizedError::MzSupport { .. } => Some(format!(
175                "The '{}' role has very limited privileges",
176                SUPPORT_USER.name
177            )),
178            UnauthorizedError::ConcurrentRoleDrop(_) => {
179                Some("Please disconnect and re-connect with a valid role.".into())
180            }
181            UnauthorizedError::RestrictedSystemObject { .. } => Some(
182                "Access to system catalog objects is restricted for this role. \
183                Contact your administrator if you need access."
184                    .into(),
185            ),
186            UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
187        }
188    }
189}
190
191/// RBAC requirements for executing a given plan.
192#[derive(Debug)]
193struct RbacRequirements {
194    /// The role memberships required.
195    role_membership: BTreeSet<RoleId>,
196    /// The object ownerships required.
197    ownership: Vec<ObjectId>,
198    /// The privileges required. The tuples are of the form:
199    /// (What object the privilege is on, What privilege is required, Who must possess the privilege).
200    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
201    /// The types of catalog items that this plan requires USAGE privileges on.
202    ///
203    /// Most plans will require USAGE on secrets and connections but some plans, like SHOW CREATE,
204    /// can reference an item without requiring any privileges on that item.
205    item_usage: &'static BTreeSet<CatalogItemType>,
206    /// Some action if superuser is required to perform that action, None otherwise.
207    superuser_action: Option<String>,
208}
209
210impl RbacRequirements {
211    fn empty() -> RbacRequirements {
212        RbacRequirements {
213            role_membership: BTreeSet::new(),
214            ownership: Vec::new(),
215            privileges: Vec::new(),
216            item_usage: &EMPTY_ITEM_USAGE,
217            superuser_action: None,
218        }
219    }
220
221    fn validate(
222        self,
223        catalog: &impl SessionCatalog,
224        session: &dyn SessionMetadata,
225        resolved_ids: &ResolvedIds,
226    ) -> Result<(), UnauthorizedError> {
227        // Obtain all roles that the current session is a member of.
228        let role_membership =
229            catalog.collect_role_membership(&session.role_metadata().current_role);
230
231        check_usage(catalog, session, resolved_ids, self.item_usage)?;
232
233        // Validate that the current session has the required role membership to execute the provided
234        // plan.
235        let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
236        if !unheld_membership.is_empty() {
237            let role_names = unheld_membership
238                .into_iter()
239                .map(|role_id| {
240                    // Some role references may no longer exist due to concurrent drops.
241                    catalog
242                        .try_get_role(role_id)
243                        .map(|role| role.name().to_string())
244                        .unwrap_or_else(|| role_id.to_string())
245                })
246                .collect();
247            return Err(UnauthorizedError::RoleMembership { role_names });
248        }
249
250        // Validate that the current session has the required object ownership to execute the provided
251        // plan.
252        let unheld_ownership = self
253            .ownership
254            .into_iter()
255            .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
256            .collect();
257        ownership_err(unheld_ownership, catalog)?;
258
259        check_object_privileges(
260            catalog,
261            self.privileges,
262            role_membership,
263            session.role_metadata().current_role,
264        )?;
265
266        if let Some(action) = self.superuser_action {
267            return Err(UnauthorizedError::Superuser { action });
268        }
269
270        Ok(())
271    }
272
273    fn filter_to_mandatory_requirements(self) -> RbacRequirements {
274        let RbacRequirements {
275            role_membership,
276            ownership,
277            privileges,
278            item_usage,
279            superuser_action: _,
280        } = self;
281        let role_membership = role_membership
282            .into_iter()
283            .filter(|id| id.is_system())
284            .collect();
285        let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
286        let privileges = privileges
287            .into_iter()
288            .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
289            // We allow reading objects for superusers and when RBAC is off.
290            .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
291            .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
292            .collect();
293        let superuser_action = None;
294        RbacRequirements {
295            role_membership,
296            ownership,
297            privileges,
298            item_usage,
299            superuser_action,
300        }
301    }
302}
303
304impl Default for RbacRequirements {
305    fn default() -> Self {
306        RbacRequirements {
307            role_membership: BTreeSet::new(),
308            ownership: Vec::new(),
309            privileges: Vec::new(),
310            item_usage: &DEFAULT_ITEM_USAGE,
311            superuser_action: None,
312        }
313    }
314}
315
316/// When `restrict_to_user_objects` is active, rejects access to system catalog objects.
317///
318/// Functions and types are allowed through because they are needed for query execution.
319/// All other system items (tables, views, sources, sinks, etc.) are blocked. This is an
320/// allow-list — new catalog item types are blocked by default.
321///
322/// See: doc/developer/design/20260508_restrict_to_user_objects.md
323fn check_restrict_to_user_objects(
324    catalog: &impl SessionCatalog,
325    session: &dyn SessionMetadata,
326    resolved_ids: &ResolvedIds,
327) -> Result<(), UnauthorizedError> {
328    if !session.restrict_to_user_objects() {
329        return Ok(());
330    }
331    for item_id in resolved_ids.items() {
332        if item_id.is_system() {
333            if let Some(item) = catalog.try_get_item(item_id) {
334                match item.item_type() {
335                    CatalogItemType::Func | CatalogItemType::Type => {}
336                    _ => {
337                        if RESTRICT_TO_USER_OBJECTS_ALLOWED_OIDS.contains(&item.oid()) {
338                            continue;
339                        }
340                        return Err(UnauthorizedError::RestrictedSystemObject {
341                            object_name: item.name().item.clone(),
342                        });
343                    }
344                }
345            }
346        }
347    }
348    Ok(())
349}
350
351/// Checks if a `session` is authorized to use `resolved_ids`. If not, an error is returned.
352pub fn check_usage(
353    catalog: &impl SessionCatalog,
354    session: &dyn SessionMetadata,
355    resolved_ids: &ResolvedIds,
356    item_types: &BTreeSet<CatalogItemType>,
357) -> Result<(), UnauthorizedError> {
358    rbac_check_preamble(catalog, session)?;
359
360    // See: doc/developer/design/20260508_restrict_to_user_objects.md
361    check_restrict_to_user_objects(catalog, session, resolved_ids)?;
362
363    // Obtain all roles that the current session is a member of.
364    let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
365
366    // Certain statements depend on objects that haven't been created yet, like sub-sources, so we
367    // need to filter those out.
368    let existing_resolved_ids =
369        resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
370
371    let required_privileges = generate_usage_privileges(
372        catalog,
373        &existing_resolved_ids,
374        session.role_metadata().current_role,
375        item_types,
376    )
377    .into_iter()
378    .collect();
379
380    let mut rbac_requirements = RbacRequirements::empty();
381    rbac_requirements.privileges = required_privileges;
382    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
383    let required_privileges = rbac_requirements.privileges;
384
385    check_object_privileges(
386        catalog,
387        required_privileges,
388        role_membership,
389        session.role_metadata().current_role,
390    )?;
391
392    Ok(())
393}
394
395/// Checks if a session is authorized to execute a plan. If not, an error is returned.
396///
397/// `sql_impl_resolved_ids` contains resolved IDs discovered inside SQL-implemented function
398/// bodies during planning. These are kept separate from `resolved_ids` because they are
399/// implementation details of the functions, not dependencies of the statement. They are
400/// only checked by the `restrict_to_user_objects` restriction.
401pub fn check_plan(
402    catalog: &impl SessionCatalog,
403    // Function mapping a connection ID to an authenticated role. The roles may have been dropped concurrently.
404    // Only required for Plan::SideEffectingFunc; can be None for other plan types.
405    // TODO(peek-seq): Remove this when deleting the old peek sequencing. The logic here that uses
406    // `active_conns` is mirrored in `execute_side_effecting_func`, which is what the frontend peek
407    // sequencing uses.
408    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
409    session: &dyn SessionMetadata,
410    plan: &Plan,
411    target_cluster_id: Option<ClusterId>,
412    resolved_ids: &ResolvedIds,
413    sql_impl_resolved_ids: &ResolvedIds,
414) -> Result<(), UnauthorizedError> {
415    rbac_check_preamble(catalog, session)?;
416
417    // Check sql_impl function body dependencies against restrict_to_user_objects.
418    // These are checked separately from the main resolved_ids because they are
419    // implementation details that should not affect dependency tracking.
420    check_restrict_to_user_objects(catalog, session, sql_impl_resolved_ids)?;
421
422    let rbac_requirements = generate_rbac_requirements(
423        catalog,
424        plan,
425        active_conns,
426        target_cluster_id,
427        session.role_metadata().current_role,
428    );
429    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
430    debug!(
431        "rbac requirements {rbac_requirements:?} for plan {:?}",
432        PlanKind::from(plan)
433    );
434    rbac_requirements.validate(catalog, session, resolved_ids)
435}
436
437/// Returns true if RBAC is turned on for a session, false otherwise.
438pub fn is_rbac_enabled_for_session(
439    system_vars: &SystemVars,
440    session: &dyn SessionMetadata,
441) -> bool {
442    let server_enabled = system_vars.enable_rbac_checks();
443    let session_enabled = session.enable_session_rbac_checks();
444
445    // The session flag allows users to turn RBAC on for just their session while the server flag
446    // allows users to turn RBAC on for everyone.
447    server_enabled || session_enabled
448}
449
450/// Generates all requirements needed to execute a given plan.
451fn generate_rbac_requirements(
452    catalog: &impl SessionCatalog,
453    plan: &Plan,
454    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
455    target_cluster_id: Option<ClusterId>,
456    role_id: RoleId,
457) -> RbacRequirements {
458    match plan {
459        Plan::CreateConnection(plan::CreateConnectionPlan {
460            name,
461            if_not_exists: _,
462            connection: _,
463            validate: _,
464        }) => RbacRequirements {
465            privileges: vec![(
466                SystemObjectId::Object(name.qualifiers.clone().into()),
467                AclMode::CREATE,
468                role_id,
469            )],
470            item_usage: &CREATE_ITEM_USAGE,
471            ..Default::default()
472        },
473        Plan::CreateDatabase(plan::CreateDatabasePlan {
474            name: _,
475            if_not_exists: _,
476        }) => RbacRequirements {
477            privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
478            item_usage: &CREATE_ITEM_USAGE,
479            ..Default::default()
480        },
481        Plan::CreateSchema(plan::CreateSchemaPlan {
482            database_spec,
483            schema_name: _,
484            if_not_exists: _,
485        }) => {
486            let privileges = match database_spec {
487                ResolvedDatabaseSpecifier::Ambient => Vec::new(),
488                ResolvedDatabaseSpecifier::Id(database_id) => {
489                    vec![(
490                        SystemObjectId::Object(database_id.into()),
491                        AclMode::CREATE,
492                        role_id,
493                    )]
494                }
495            };
496            RbacRequirements {
497                privileges,
498                item_usage: &CREATE_ITEM_USAGE,
499                ..Default::default()
500            }
501        }
502        Plan::CreateRole(plan::CreateRolePlan {
503            name: _,
504            attributes,
505        }) => {
506            if attributes.superuser.unwrap_or(false) {
507                RbacRequirements {
508                    superuser_action: Some("create superuser role".to_string()),
509                    ..Default::default()
510                }
511            } else {
512                RbacRequirements {
513                    privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
514                    item_usage: &CREATE_ITEM_USAGE,
515                    ..Default::default()
516                }
517            }
518        }
519        Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
520            privileges: vec![(
521                SystemObjectId::System,
522                AclMode::CREATE_NETWORK_POLICY,
523                role_id,
524            )],
525            item_usage: &CREATE_ITEM_USAGE,
526            ..Default::default()
527        },
528        Plan::CreateCluster(plan::CreateClusterPlan {
529            name: _,
530            variant: _,
531            workload_class: _,
532        }) => RbacRequirements {
533            privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
534            item_usage: &CREATE_ITEM_USAGE,
535            ..Default::default()
536        },
537        Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
538            cluster_id,
539            name: _,
540            config: _,
541        }) => RbacRequirements {
542            ownership: vec![ObjectId::Cluster(*cluster_id)],
543            item_usage: &CREATE_ITEM_USAGE,
544            ..Default::default()
545        },
546        Plan::CreateSource(plan::CreateSourcePlan {
547            name,
548            source,
549            if_not_exists: _,
550            timeline: _,
551            in_cluster,
552        }) => RbacRequirements {
553            privileges: generate_required_source_privileges(
554                name,
555                &source.data_source,
556                *in_cluster,
557                role_id,
558            ),
559            item_usage: &CREATE_ITEM_USAGE,
560            ..Default::default()
561        },
562        Plan::CreateSources(plans) => RbacRequirements {
563            privileges: plans
564                .iter()
565                .flat_map(
566                    |plan::CreateSourcePlanBundle {
567                         item_id: _,
568                         global_id: _,
569                         plan:
570                             plan::CreateSourcePlan {
571                                 name,
572                                 source,
573                                 if_not_exists: _,
574                                 timeline: _,
575                                 in_cluster,
576                             },
577                         resolved_ids: _,
578                         available_source_references: _,
579                     }| {
580                        generate_required_source_privileges(
581                            name,
582                            &source.data_source,
583                            *in_cluster,
584                            role_id,
585                        )
586                        .into_iter()
587                    },
588                )
589                .collect(),
590            item_usage: &CREATE_ITEM_USAGE,
591            ..Default::default()
592        },
593        Plan::CreateSecret(plan::CreateSecretPlan {
594            name,
595            secret: _,
596            if_not_exists: _,
597        }) => RbacRequirements {
598            privileges: vec![(
599                SystemObjectId::Object(name.qualifiers.clone().into()),
600                AclMode::CREATE,
601                role_id,
602            )],
603            item_usage: &CREATE_ITEM_USAGE,
604            ..Default::default()
605        },
606        Plan::CreateSink(plan::CreateSinkPlan {
607            name,
608            sink,
609            with_snapshot: _,
610            if_not_exists: _,
611            in_cluster,
612        }) => {
613            let mut privileges = vec![(
614                SystemObjectId::Object(name.qualifiers.clone().into()),
615                AclMode::CREATE,
616                role_id,
617            )];
618            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
619            privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
620            privileges.push((
621                SystemObjectId::Object(in_cluster.into()),
622                AclMode::CREATE,
623                role_id,
624            ));
625            RbacRequirements {
626                privileges,
627                item_usage: &CREATE_ITEM_USAGE,
628                ..Default::default()
629            }
630        }
631        Plan::CreateTable(plan::CreateTablePlan {
632            name,
633            table: _,
634            if_not_exists: _,
635        }) => RbacRequirements {
636            privileges: vec![(
637                SystemObjectId::Object(name.qualifiers.clone().into()),
638                AclMode::CREATE,
639                role_id,
640            )],
641            item_usage: &CREATE_ITEM_USAGE,
642            ..Default::default()
643        },
644        Plan::CreateView(plan::CreateViewPlan {
645            name,
646            view: _,
647            replace,
648            drop_ids: _,
649            if_not_exists: _,
650            ambiguous_columns: _,
651        }) => RbacRequirements {
652            ownership: replace
653                .map(|id| vec![ObjectId::Item(id)])
654                .unwrap_or_default(),
655            privileges: vec![(
656                SystemObjectId::Object(name.qualifiers.clone().into()),
657                AclMode::CREATE,
658                role_id,
659            )],
660            item_usage: &CREATE_ITEM_USAGE,
661            ..Default::default()
662        },
663        Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
664            name,
665            materialized_view,
666            replace,
667            drop_ids: _,
668            if_not_exists: _,
669            ambiguous_columns: _,
670        }) => RbacRequirements {
671            ownership: replace
672                .map(|id| vec![ObjectId::Item(id)])
673                .unwrap_or_default(),
674            privileges: vec![
675                (
676                    SystemObjectId::Object(name.qualifiers.clone().into()),
677                    AclMode::CREATE,
678                    role_id,
679                ),
680                (
681                    SystemObjectId::Object(materialized_view.cluster_id.into()),
682                    AclMode::CREATE,
683                    role_id,
684                ),
685            ],
686            item_usage: &CREATE_ITEM_USAGE,
687            ..Default::default()
688        },
689        Plan::CreateIndex(plan::CreateIndexPlan {
690            name,
691            index,
692            if_not_exists: _,
693        }) => {
694            let index_on_item = catalog.resolve_item_id(&index.on);
695            RbacRequirements {
696                ownership: vec![ObjectId::Item(index_on_item)],
697                privileges: vec![
698                    (
699                        SystemObjectId::Object(name.qualifiers.clone().into()),
700                        AclMode::CREATE,
701                        role_id,
702                    ),
703                    (
704                        SystemObjectId::Object(index.cluster_id.into()),
705                        AclMode::CREATE,
706                        role_id,
707                    ),
708                ],
709                item_usage: &CREATE_ITEM_USAGE,
710                ..Default::default()
711            }
712        }
713        Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
714            privileges: vec![(
715                SystemObjectId::Object(name.qualifiers.clone().into()),
716                AclMode::CREATE,
717                role_id,
718            )],
719            item_usage: &CREATE_ITEM_USAGE,
720            ..Default::default()
721        },
722        Plan::Comment(plan::CommentPlan {
723            object_id,
724            sub_component: _,
725            comment: _,
726        }) => {
727            let (ownership, privileges) = match object_id {
728                // Roles don't have owners, instead we require the current session to have the
729                // `CREATEROLE` privilege.
730                CommentObjectId::Role(_) => (
731                    Vec::new(),
732                    vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
733                ),
734                _ => (vec![ObjectId::from(*object_id)], Vec::new()),
735            };
736            RbacRequirements {
737                ownership,
738                privileges,
739                ..Default::default()
740            }
741        }
742        Plan::DropObjects(plan::DropObjectsPlan {
743            referenced_ids,
744            drop_ids: _,
745            object_type,
746        }) => {
747            let privileges = if object_type == &ObjectType::Role {
748                vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
749            } else {
750                referenced_ids
751                    .iter()
752                    .filter_map(|id| match id {
753                        ObjectId::ClusterReplica((cluster_id, _)) => Some((
754                            SystemObjectId::Object(cluster_id.into()),
755                            AclMode::USAGE,
756                            role_id,
757                        )),
758                        ObjectId::Schema((database_spec, _)) => match database_spec {
759                            ResolvedDatabaseSpecifier::Ambient => None,
760                            ResolvedDatabaseSpecifier::Id(database_id) => Some((
761                                SystemObjectId::Object(database_id.into()),
762                                AclMode::USAGE,
763                                role_id,
764                            )),
765                        },
766                        ObjectId::Item(item_id) => {
767                            let item = catalog.get_item(item_id);
768                            Some((
769                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
770                                AclMode::USAGE,
771                                role_id,
772                            ))
773                        }
774                        ObjectId::Cluster(_)
775                        | ObjectId::Database(_)
776                        | ObjectId::Role(_)
777                        | ObjectId::NetworkPolicy(_) => None,
778                    })
779                    .collect()
780            };
781            RbacRequirements {
782                // Do not need ownership of descendant objects.
783                ownership: referenced_ids.clone(),
784                privileges,
785                ..Default::default()
786            }
787        }
788        Plan::DropOwned(plan::DropOwnedPlan {
789            role_ids,
790            drop_ids: _,
791            privilege_revokes: _,
792            default_privilege_revokes: _,
793        }) => RbacRequirements {
794            role_membership: role_ids.into_iter().cloned().collect(),
795            ..Default::default()
796        },
797        Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
798            let container_id = match id {
799                ObjectId::Item(id) => Some(SystemObjectId::Object(
800                    catalog.get_item(id).name().qualifiers.clone().into(),
801                )),
802                ObjectId::Schema((database_id, _schema_id)) => match database_id {
803                    ResolvedDatabaseSpecifier::Ambient => None,
804                    ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
805                },
806                ObjectId::Cluster(_)
807                | ObjectId::ClusterReplica(_)
808                | ObjectId::Database(_)
809                | ObjectId::Role(_)
810                | ObjectId::NetworkPolicy(_) => None,
811            };
812            let privileges = match container_id {
813                Some(id) => vec![(id, AclMode::USAGE, role_id)],
814                None => Vec::new(),
815            };
816            RbacRequirements {
817                privileges,
818                item_usage: &EMPTY_ITEM_USAGE,
819                ..Default::default()
820            }
821        }
822        Plan::ShowColumns(plan::ShowColumnsPlan {
823            id,
824            select_plan,
825            new_resolved_ids: _,
826        }) => {
827            let mut privileges = vec![(
828                SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
829                AclMode::USAGE,
830                role_id,
831            )];
832
833            for privilege in generate_rbac_requirements(
834                catalog,
835                &Plan::Select(select_plan.clone()),
836                active_conns,
837                target_cluster_id,
838                role_id,
839            )
840            .privileges
841            {
842                privileges.push(privilege);
843            }
844            RbacRequirements {
845                privileges,
846                ..Default::default()
847            }
848        }
849        Plan::Select(plan::SelectPlan {
850            source,
851            select: _,
852            when: _,
853            finishing: _,
854            copy_to: _,
855        }) => {
856            let items = source
857                .depends_on()
858                .into_iter()
859                .map(|gid| catalog.resolve_item_id(&gid));
860            let mut privileges = generate_read_privileges(catalog, items, role_id);
861            if let Some(privilege) = generate_cluster_usage_privileges(
862                source.as_const().is_some(),
863                target_cluster_id,
864                role_id,
865            ) {
866                privileges.push(privilege);
867            }
868            RbacRequirements {
869                privileges,
870                ..Default::default()
871            }
872        }
873        Plan::Subscribe(plan::SubscribePlan {
874            from,
875            with_snapshot: _,
876            when: _,
877            up_to: _,
878            copy_to: _,
879            emit_progress: _,
880            output: _,
881        }) => {
882            let items = from
883                .depends_on()
884                .into_iter()
885                .map(|gid| catalog.resolve_item_id(&gid));
886            let mut privileges = generate_read_privileges(catalog, items, role_id);
887            if let Some(cluster_id) = target_cluster_id {
888                privileges.push((
889                    SystemObjectId::Object(cluster_id.into()),
890                    AclMode::USAGE,
891                    role_id,
892                ));
893            }
894            RbacRequirements {
895                privileges,
896                ..Default::default()
897            }
898        }
899        Plan::CopyFrom(plan::CopyFromPlan {
900            target_name: _,
901            target_id,
902            source: _,
903            columns: _,
904            source_desc: _,
905            mfp: _,
906            params: _,
907            filter: _,
908        }) => RbacRequirements {
909            privileges: vec![
910                (
911                    SystemObjectId::Object(
912                        catalog.get_item(target_id).name().qualifiers.clone().into(),
913                    ),
914                    AclMode::USAGE,
915                    role_id,
916                ),
917                (
918                    SystemObjectId::Object(target_id.into()),
919                    AclMode::INSERT,
920                    role_id,
921                ),
922            ],
923            ..Default::default()
924        },
925        Plan::CopyTo(plan::CopyToPlan {
926            select_plan,
927            desc: _,
928            to: _,
929            connection: _,
930            connection_id: _,
931            format: _,
932            max_file_size: _,
933        }) => {
934            let items = select_plan
935                .source
936                .depends_on()
937                .into_iter()
938                .map(|gid| catalog.resolve_item_id(&gid));
939            let mut privileges = generate_read_privileges(catalog, items, role_id);
940            if let Some(cluster_id) = target_cluster_id {
941                privileges.push((
942                    SystemObjectId::Object(cluster_id.into()),
943                    AclMode::USAGE,
944                    role_id,
945                ));
946            }
947            RbacRequirements {
948                privileges,
949                ..Default::default()
950            }
951        }
952        Plan::ExplainPlan(plan::ExplainPlanPlan {
953            stage: _,
954            format: _,
955            config: _,
956            explainee,
957        })
958        | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
959            privileges: match explainee {
960                Explainee::View(id)
961                | Explainee::MaterializedView(id)
962                | Explainee::Index(id)
963                | Explainee::ReplanView(id)
964                | Explainee::ReplanMaterializedView(id)
965                | Explainee::ReplanIndex(id) => {
966                    let item = catalog.get_item(id);
967                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
968                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
969                }
970                Explainee::Statement(stmt) => stmt
971                    .depends_on()
972                    .into_iter()
973                    .map(|id| {
974                        let item = catalog.get_item_by_global_id(&id);
975                        let schema_id: ObjectId = item.name().qualifiers.clone().into();
976                        (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
977                    })
978                    .collect(),
979            },
980            item_usage: match explainee {
981                Explainee::View(..)
982                | Explainee::MaterializedView(..)
983                | Explainee::Index(..)
984                | Explainee::ReplanView(..)
985                | Explainee::ReplanMaterializedView(..)
986                | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
987                Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
988            },
989            ..Default::default()
990        },
991        Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
992            RbacRequirements {
993                privileges: {
994                    let item = catalog.get_item_by_global_id(sink_from);
995                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
996                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
997                },
998                item_usage: &EMPTY_ITEM_USAGE,
999                ..Default::default()
1000            }
1001        }
1002        Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
1003            format: _,
1004            raw_plan,
1005            when: _,
1006        }) => RbacRequirements {
1007            privileges: raw_plan
1008                .depends_on()
1009                .into_iter()
1010                .map(|id| {
1011                    let item = catalog.get_item_by_global_id(&id);
1012                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
1013                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
1014                })
1015                .collect(),
1016            ..Default::default()
1017        },
1018        Plan::Insert(plan::InsertPlan {
1019            id,
1020            values,
1021            returning,
1022        }) => {
1023            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1024            let mut privileges = vec![
1025                (
1026                    SystemObjectId::Object(schema_id.clone()),
1027                    AclMode::USAGE,
1028                    role_id,
1029                ),
1030                (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
1031            ];
1032            let mut seen = BTreeSet::from([(schema_id, role_id)]);
1033
1034            // We don't allow arbitrary sub-queries in `returning`. So either it
1035            // contains a column reference to the outer table or it's constant.
1036            if returning
1037                .iter()
1038                .any(|assignment| assignment.contains_column())
1039            {
1040                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1041                seen.insert((id.into(), role_id));
1042            }
1043
1044            let items = values
1045                .depends_on()
1046                .into_iter()
1047                .map(|gid| catalog.resolve_item_id(&gid));
1048            privileges.extend_from_slice(&generate_read_privileges_inner(
1049                catalog, items, role_id, &mut seen,
1050            ));
1051
1052            if let Some(privilege) = generate_cluster_usage_privileges(
1053                values.as_const().is_some(),
1054                target_cluster_id,
1055                role_id,
1056            ) {
1057                privileges.push(privilege);
1058            } else if !returning.is_empty() {
1059                // TODO(jkosh44) returning may be a constant, but for now we are overly protective
1060                //  and require cluster privileges for all returning.
1061                if let Some(cluster_id) = target_cluster_id {
1062                    privileges.push((
1063                        SystemObjectId::Object(cluster_id.into()),
1064                        AclMode::USAGE,
1065                        role_id,
1066                    ));
1067                }
1068            }
1069            RbacRequirements {
1070                privileges,
1071                ..Default::default()
1072            }
1073        }
1074        Plan::AlterCluster(plan::AlterClusterPlan {
1075            id,
1076            name: _,
1077            options: _,
1078            strategy: _,
1079        }) => RbacRequirements {
1080            ownership: vec![ObjectId::Cluster(*id)],
1081            item_usage: &CREATE_ITEM_USAGE,
1082            ..Default::default()
1083        },
1084        Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1085            ownership: vec![ObjectId::Item(*id)],
1086            privileges: vec![(
1087                SystemObjectId::Object(set_cluster.into()),
1088                AclMode::CREATE,
1089                role_id,
1090            )],
1091            item_usage: &CREATE_ITEM_USAGE,
1092            ..Default::default()
1093        },
1094        Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1095            id,
1096            window: _,
1097            value: _,
1098            object_type: _,
1099        }) => RbacRequirements {
1100            ownership: vec![ObjectId::Item(*id)],
1101            item_usage: &CREATE_ITEM_USAGE,
1102            ..Default::default()
1103        },
1104        Plan::AlterSourceTimestampInterval(plan::AlterSourceTimestampIntervalPlan {
1105            id,
1106            value: _,
1107            interval: _,
1108        }) => RbacRequirements {
1109            ownership: vec![ObjectId::Item(*id)],
1110            item_usage: &CREATE_ITEM_USAGE,
1111            ..Default::default()
1112        },
1113        Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1114            ownership: vec![ObjectId::Item(*id)],
1115            ..Default::default()
1116        },
1117        Plan::AlterSource(plan::AlterSourcePlan {
1118            item_id,
1119            ingestion_id: _,
1120            action: _,
1121        }) => RbacRequirements {
1122            ownership: vec![ObjectId::Item(*item_id)],
1123            item_usage: &CREATE_ITEM_USAGE,
1124            ..Default::default()
1125        },
1126        Plan::AlterSink(plan::AlterSinkPlan {
1127            item_id,
1128            global_id: _,
1129            sink,
1130            with_snapshot: _,
1131            in_cluster,
1132        }) => {
1133            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1134            let mut privileges = generate_read_privileges(catalog, items, role_id);
1135            privileges.push((
1136                SystemObjectId::Object(in_cluster.into()),
1137                AclMode::CREATE,
1138                role_id,
1139            ));
1140            RbacRequirements {
1141                ownership: vec![ObjectId::Item(*item_id)],
1142                privileges,
1143                item_usage: &CREATE_ITEM_USAGE,
1144                ..Default::default()
1145            }
1146        }
1147        Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1148            id,
1149            name: _,
1150            to_name: _,
1151        }) => RbacRequirements {
1152            ownership: vec![ObjectId::Cluster(*id)],
1153            ..Default::default()
1154        },
1155        Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1156            id_a,
1157            id_b,
1158            name_a: _,
1159            name_b: _,
1160            name_temp: _,
1161        }) => RbacRequirements {
1162            ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1163            ..Default::default()
1164        },
1165        Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1166            cluster_id,
1167            replica_id,
1168            name: _,
1169            to_name: _,
1170        }) => RbacRequirements {
1171            ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1172            ..Default::default()
1173        },
1174        Plan::AlterItemRename(plan::AlterItemRenamePlan {
1175            id,
1176            current_full_name: _,
1177            to_name: _,
1178            object_type: _,
1179        }) => RbacRequirements {
1180            ownership: vec![ObjectId::Item(*id)],
1181            ..Default::default()
1182        },
1183        Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1184            cur_schema_spec,
1185            new_schema_name: _,
1186        }) => {
1187            let privileges = match cur_schema_spec.0 {
1188                ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1189                    SystemObjectId::Object(ObjectId::Database(db_id)),
1190                    AclMode::CREATE,
1191                    role_id,
1192                )],
1193                ResolvedDatabaseSpecifier::Ambient => vec![],
1194            };
1195
1196            RbacRequirements {
1197                ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1198                privileges,
1199                ..Default::default()
1200            }
1201        }
1202        Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1203            schema_a_spec,
1204            schema_a_name: _,
1205            schema_b_spec,
1206            schema_b_name: _,
1207            name_temp: _,
1208        }) => {
1209            let mut privileges = vec![];
1210            if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1211                privileges.push((
1212                    SystemObjectId::Object(ObjectId::Database(id_a)),
1213                    AclMode::CREATE,
1214                    role_id,
1215                ));
1216            }
1217            if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1218                privileges.push((
1219                    SystemObjectId::Object(ObjectId::Database(id_b)),
1220                    AclMode::CREATE,
1221                    role_id,
1222                ));
1223            }
1224
1225            RbacRequirements {
1226                ownership: vec![
1227                    ObjectId::Schema(*schema_a_spec),
1228                    ObjectId::Schema(*schema_b_spec),
1229                ],
1230                privileges,
1231                ..Default::default()
1232            }
1233        }
1234        Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1235            ownership: vec![ObjectId::Item(*id)],
1236            item_usage: &CREATE_ITEM_USAGE,
1237            ..Default::default()
1238        },
1239        Plan::AlterRole(plan::AlterRolePlan {
1240            id,
1241            name: _,
1242            option,
1243        }) => match option {
1244            // Only superusers can alter the superuserness of a role.
1245            plan::PlannedAlterRoleOption::Attributes(attributes)
1246                if attributes.superuser.is_some() =>
1247            {
1248                RbacRequirements {
1249                    superuser_action: Some("alter superuser role".to_string()),
1250                    ..Default::default()
1251                }
1252            }
1253            // Roles are allowed to change their own password, but only if
1254            // password is the sole attribute being changed.
1255            plan::PlannedAlterRoleOption::Attributes(plan::PlannedRoleAttributes {
1256                password,
1257                // scram_iterations and nopassword are password-related, so
1258                // they're fine to change alongside the password.
1259                scram_iterations: _,
1260                nopassword: _,
1261                // superuser is already handled by the match arm above, so it
1262                // will always be None here.
1263                superuser: None,
1264                inherit: None,
1265                login: None,
1266            }) if password.is_some() && role_id == *id => RbacRequirements::default(),
1267            // But no one elses...
1268            plan::PlannedAlterRoleOption::Attributes(attributes)
1269                if attributes.password.is_some() && role_id != *id =>
1270            {
1271                RbacRequirements {
1272                    superuser_action: Some("alter password of role".to_string()),
1273                    ..Default::default()
1274                }
1275            }
1276            // restrict_to_user_objects can only be set by superuser.
1277            // SECURITY: This must use case-insensitive comparison because
1278            // var.name() comes from Ident::to_string() which preserves the
1279            // original casing for quoted identifiers.
1280            plan::PlannedAlterRoleOption::Variable(var)
1281                if var.name().eq_ignore_ascii_case("restrict_to_user_objects") =>
1282            {
1283                RbacRequirements {
1284                    superuser_action: Some("set restrict_to_user_objects".to_string()),
1285                    ..Default::default()
1286                }
1287            }
1288            // Roles are allowed to change their own other variables.
1289            plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1290                RbacRequirements::default()
1291            }
1292            // Otherwise to ALTER a role, you need to have the CREATE_ROLE privilege.
1293            _ => RbacRequirements {
1294                privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1295                item_usage: &CREATE_ITEM_USAGE,
1296                ..Default::default()
1297            },
1298        },
1299        Plan::AlterOwner(plan::AlterOwnerPlan {
1300            id,
1301            object_type: _,
1302            new_owner,
1303        }) => {
1304            let privileges = match id {
1305                ObjectId::ClusterReplica((cluster_id, _)) => {
1306                    vec![(
1307                        SystemObjectId::Object(cluster_id.into()),
1308                        AclMode::CREATE,
1309                        role_id,
1310                    )]
1311                }
1312                ObjectId::Schema((database_spec, _)) => match database_spec {
1313                    ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1314                    ResolvedDatabaseSpecifier::Id(database_id) => {
1315                        vec![(
1316                            SystemObjectId::Object(database_id.into()),
1317                            AclMode::CREATE,
1318                            role_id,
1319                        )]
1320                    }
1321                },
1322                ObjectId::Item(item_id) => {
1323                    let item = catalog.get_item(item_id);
1324                    vec![(
1325                        SystemObjectId::Object(item.name().qualifiers.clone().into()),
1326                        AclMode::CREATE,
1327                        role_id,
1328                    )]
1329                }
1330                ObjectId::Cluster(_)
1331                | ObjectId::Database(_)
1332                | ObjectId::Role(_)
1333                | ObjectId::NetworkPolicy(_) => Vec::new(),
1334            };
1335            RbacRequirements {
1336                role_membership: BTreeSet::from([*new_owner]),
1337                ownership: vec![id.clone()],
1338                privileges,
1339                ..Default::default()
1340            }
1341        }
1342        Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1343            ownership: vec![ObjectId::Item(*relation_id)],
1344            item_usage: &CREATE_ITEM_USAGE,
1345            ..Default::default()
1346        },
1347        Plan::AlterMaterializedViewApplyReplacement(
1348            plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1349        ) => RbacRequirements {
1350            ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1351            item_usage: &CREATE_ITEM_USAGE,
1352            ..Default::default()
1353        },
1354        Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1355            ownership: vec![ObjectId::NetworkPolicy(*id)],
1356            item_usage: &CREATE_ITEM_USAGE,
1357            ..Default::default()
1358        },
1359        Plan::ReadThenWrite(plan::ReadThenWritePlan {
1360            id,
1361            selection,
1362            finishing: _,
1363            assignments,
1364            kind,
1365            returning,
1366        }) => {
1367            let acl_mode = match kind {
1368                MutationKind::Insert => AclMode::INSERT,
1369                MutationKind::Update => AclMode::UPDATE,
1370                MutationKind::Delete => AclMode::DELETE,
1371            };
1372            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1373            let mut privileges = vec![
1374                (
1375                    SystemObjectId::Object(schema_id.clone()),
1376                    AclMode::USAGE,
1377                    role_id,
1378                ),
1379                (SystemObjectId::Object(id.into()), acl_mode, role_id),
1380            ];
1381            let mut seen = BTreeSet::from([(schema_id, role_id)]);
1382
1383            // We don't allow arbitrary sub-queries in `assignments` or `returning`. So either they
1384            // contains a column reference to the outer table or it's constant.
1385            if assignments
1386                .values()
1387                .chain(returning.iter())
1388                .any(|assignment| assignment.contains_column())
1389            {
1390                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1391                seen.insert((id.into(), role_id));
1392            }
1393
1394            // TODO(jkosh44) It's fairly difficult to determine what part of `selection` is from a
1395            //  user specified read and what part is from the implementation of the read then write.
1396            //  instead we are overly protective and always require SELECT privileges even though
1397            //  PostgreSQL doesn't always do this.
1398            //  As a concrete example, we require SELECT and UPDATE privileges to execute
1399            //  `UPDATE t SET a = 42;`, while PostgreSQL only requires UPDATE privileges.
1400            let items = selection
1401                .depends_on()
1402                .into_iter()
1403                .map(|gid| catalog.resolve_item_id(&gid));
1404            privileges.extend_from_slice(&generate_read_privileges_inner(
1405                catalog, items, role_id, &mut seen,
1406            ));
1407
1408            if let Some(privilege) = generate_cluster_usage_privileges(
1409                selection.as_const().is_some(),
1410                target_cluster_id,
1411                role_id,
1412            ) {
1413                privileges.push(privilege);
1414            }
1415            RbacRequirements {
1416                privileges,
1417                ..Default::default()
1418            }
1419        }
1420        Plan::GrantRole(plan::GrantRolePlan {
1421            role_ids: _,
1422            member_ids: _,
1423            grantor_id: _,
1424        })
1425        | Plan::RevokeRole(plan::RevokeRolePlan {
1426            role_ids: _,
1427            member_ids: _,
1428            grantor_id: _,
1429        }) => RbacRequirements {
1430            privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1431            ..Default::default()
1432        },
1433        Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1434            update_privileges,
1435            grantees: _,
1436        })
1437        | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1438            update_privileges,
1439            revokees: _,
1440        }) => {
1441            let mut privileges = Vec::with_capacity(update_privileges.len());
1442            for UpdatePrivilege { target_id, .. } in update_privileges {
1443                match target_id {
1444                    SystemObjectId::Object(object_id) => match object_id {
1445                        ObjectId::ClusterReplica((cluster_id, _)) => {
1446                            privileges.push((
1447                                SystemObjectId::Object(cluster_id.into()),
1448                                AclMode::USAGE,
1449                                role_id,
1450                            ));
1451                        }
1452                        ObjectId::Schema((database_spec, _)) => match database_spec {
1453                            ResolvedDatabaseSpecifier::Ambient => {}
1454                            ResolvedDatabaseSpecifier::Id(database_id) => {
1455                                privileges.push((
1456                                    SystemObjectId::Object(database_id.into()),
1457                                    AclMode::USAGE,
1458                                    role_id,
1459                                ));
1460                            }
1461                        },
1462                        ObjectId::Item(item_id) => {
1463                            let item = catalog.get_item(item_id);
1464                            privileges.push((
1465                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
1466                                AclMode::USAGE,
1467                                role_id,
1468                            ))
1469                        }
1470                        ObjectId::Cluster(_)
1471                        | ObjectId::Database(_)
1472                        | ObjectId::Role(_)
1473                        | ObjectId::NetworkPolicy(_) => {}
1474                    },
1475                    SystemObjectId::System => {}
1476                }
1477            }
1478            RbacRequirements {
1479                ownership: update_privileges
1480                    .iter()
1481                    .filter_map(|update_privilege| update_privilege.target_id.object_id())
1482                    .cloned()
1483                    .collect(),
1484                privileges,
1485                // To grant/revoke a privilege on some object, generally the grantor/revoker must be the
1486                // owner of that object (or have a grant option on that object which isn't implemented in
1487                // Materialize yet). There is no owner of the entire system, so it's only reasonable to
1488                // restrict granting/revoking system privileges to superusers.
1489                superuser_action: if update_privileges
1490                    .iter()
1491                    .any(|update_privilege| update_privilege.target_id.is_system())
1492                {
1493                    Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1494                } else {
1495                    None
1496                },
1497                ..Default::default()
1498            }
1499        }
1500        Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1501            privilege_objects,
1502            privilege_acl_items: _,
1503            is_grant: _,
1504        }) => RbacRequirements {
1505            role_membership: privilege_objects
1506                .iter()
1507                .map(|privilege_object| privilege_object.role_id)
1508                .collect(),
1509            privileges: privilege_objects
1510                .into_iter()
1511                .filter_map(|privilege_object| {
1512                    if let (Some(database_id), Some(_)) =
1513                        (privilege_object.database_id, privilege_object.schema_id)
1514                    {
1515                        Some((
1516                            SystemObjectId::Object(database_id.into()),
1517                            AclMode::USAGE,
1518                            role_id,
1519                        ))
1520                    } else {
1521                        None
1522                    }
1523                })
1524                .collect(),
1525            // Altering the default privileges for the PUBLIC role (aka ALL ROLES) will affect all roles
1526            // that currently exist and roles that will exist in the future. It's impossible for an exising
1527            // role to be a member of a role that doesn't exist yet, so no current role could possibly have
1528            // the privileges required to alter default privileges for the PUBLIC role. Therefore we
1529            // only superusers can alter default privileges for the PUBLIC role.
1530            superuser_action: if privilege_objects
1531                .iter()
1532                .any(|privilege_object| privilege_object.role_id.is_public())
1533            {
1534                Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1535            } else {
1536                None
1537            },
1538            ..Default::default()
1539        },
1540        Plan::ReassignOwned(plan::ReassignOwnedPlan {
1541            old_roles,
1542            new_role,
1543            reassign_ids: _,
1544        }) => RbacRequirements {
1545            role_membership: old_roles
1546                .into_iter()
1547                .cloned()
1548                .chain(iter::once(*new_role))
1549                .collect(),
1550            ..Default::default()
1551        },
1552        Plan::SideEffectingFunc(func) => {
1553            let role_membership = match func {
1554                SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1555                    .expect("active_conns is required for Plan::SideEffectingFunc")(
1556                    *connection_id
1557                )
1558                .map(|x| [x].into())
1559                .unwrap_or_default(),
1560            };
1561            RbacRequirements {
1562                role_membership,
1563                ..Default::default()
1564            }
1565        }
1566        Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1567            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1568            RbacRequirements {
1569                privileges: vec![
1570                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1571                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1572                ],
1573                ..Default::default()
1574            }
1575        }
1576        Plan::DiscardTemp
1577        | Plan::DiscardAll
1578        | Plan::EmptyQuery
1579        | Plan::ShowAllVariables
1580        | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1581        | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1582        | Plan::SetVariable(plan::SetVariablePlan {
1583            name: _,
1584            value: _,
1585            local: _,
1586        })
1587        | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1588        | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1589        | Plan::StartTransaction(plan::StartTransactionPlan {
1590            access: _,
1591            isolation_level: _,
1592        })
1593        | Plan::CommitTransaction(plan::CommitTransactionPlan {
1594            transaction_type: _,
1595        })
1596        | Plan::AbortTransaction(plan::AbortTransactionPlan {
1597            transaction_type: _,
1598        })
1599        | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1600        | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1601        | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1602        | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1603        | Plan::Declare(plan::DeclarePlan {
1604            name: _,
1605            stmt: _,
1606            sql: _,
1607            params: _,
1608        })
1609        | Plan::Fetch(plan::FetchPlan {
1610            name: _,
1611            count: _,
1612            timeout: _,
1613        })
1614        | Plan::Close(plan::ClosePlan { name: _ })
1615        | Plan::Prepare(plan::PreparePlan {
1616            name: _,
1617            stmt: _,
1618            desc: _,
1619            sql: _,
1620        })
1621        | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1622        | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1623        | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1624    }
1625}
1626
1627/// Reports whether any role has ownership over an object.
1628fn check_owner_roles(
1629    object_id: &ObjectId,
1630    role_ids: &BTreeSet<RoleId>,
1631    catalog: &impl SessionCatalog,
1632) -> bool {
1633    if let Some(owner_id) = catalog.get_owner_id(object_id) {
1634        role_ids.contains(&owner_id)
1635    } else {
1636        true
1637    }
1638}
1639
1640fn ownership_err(
1641    unheld_ownership: Vec<ObjectId>,
1642    catalog: &impl SessionCatalog,
1643) -> Result<(), UnauthorizedError> {
1644    if !unheld_ownership.is_empty() {
1645        let objects = unheld_ownership
1646            .into_iter()
1647            .map(|ownership| match ownership {
1648                ObjectId::Cluster(id) => (
1649                    ObjectType::Cluster,
1650                    catalog.get_cluster(id).name().to_string(),
1651                ),
1652                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1653                    let cluster = catalog.get_cluster(cluster_id);
1654                    let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1655                    // Note: using unchecked here is okay because the values are coming from an
1656                    // already existing name.
1657                    let name = QualifiedReplica {
1658                        cluster: Ident::new_unchecked(cluster.name()),
1659                        replica: Ident::new_unchecked(replica.name()),
1660                    };
1661                    (ObjectType::ClusterReplica, name.to_string())
1662                }
1663                ObjectId::Database(id) => (
1664                    ObjectType::Database,
1665                    catalog.get_database(&id).name().to_string(),
1666                ),
1667                ObjectId::Schema((database_spec, schema_spec)) => {
1668                    let schema = catalog.get_schema(&database_spec, &schema_spec);
1669                    let name = catalog.resolve_full_schema_name(schema.name());
1670                    (ObjectType::Schema, name.to_string())
1671                }
1672                ObjectId::Item(id) => {
1673                    let item = catalog.get_item(&id);
1674                    let name = catalog.resolve_full_name(item.name());
1675                    (item.item_type().into(), name.to_string())
1676                }
1677                ObjectId::NetworkPolicy(id) => (
1678                    ObjectType::NetworkPolicy,
1679                    catalog.get_network_policy(&id).name().to_string(),
1680                ),
1681                ObjectId::Role(_) => unreachable!("roles have no owner"),
1682            })
1683            .collect();
1684        Err(UnauthorizedError::Ownership { objects })
1685    } else {
1686        Ok(())
1687    }
1688}
1689
1690fn generate_required_source_privileges(
1691    name: &QualifiedItemName,
1692    data_source: &DataSourceDesc,
1693    in_cluster: Option<ClusterId>,
1694    role_id: RoleId,
1695) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1696    let mut privileges = vec![(
1697        SystemObjectId::Object(name.qualifiers.clone().into()),
1698        AclMode::CREATE,
1699        role_id,
1700    )];
1701    match (data_source, in_cluster) {
1702        (_, Some(id)) => {
1703            privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1704        }
1705        (DataSourceDesc::Ingestion(_), None) => {
1706            privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1707        }
1708        // Non-ingestion data-sources have meaningless cluster config's (for now...) and they need
1709        // to be ignored.
1710        // This feels very brittle, but there's not much we can do until the UNDEFINED cluster
1711        // config is removed.
1712        (_, None) => {}
1713    }
1714    privileges
1715}
1716
1717/// Generates all the privileges required to execute a read that includes the objects in `ids`.
1718///
1719/// Not only do we need to validate that `role_id` has read privileges on all relations in `ids`,
1720/// but if any object is a view or materialized view then we need to validate that the owner of
1721/// that view has all of the privileges required to execute the query within the view.
1722///
1723/// For more details see: <https://www.postgresql.org/docs/15/rules-privileges.html>
1724fn generate_read_privileges(
1725    catalog: &impl SessionCatalog,
1726    ids: impl Iterator<Item = CatalogItemId>,
1727    role_id: RoleId,
1728) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1729    generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1730}
1731
1732fn generate_read_privileges_inner(
1733    catalog: &impl SessionCatalog,
1734    ids: impl Iterator<Item = CatalogItemId>,
1735    role_id: RoleId,
1736    seen: &mut BTreeSet<(ObjectId, RoleId)>,
1737) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1738    let mut privileges = Vec::new();
1739    let mut views = Vec::new();
1740
1741    for id in ids {
1742        if seen.insert((id.into(), role_id)) {
1743            let item = catalog.get_item(&id);
1744            let schema_id: ObjectId = item.name().qualifiers.clone().into();
1745            if seen.insert((schema_id.clone(), role_id)) {
1746                privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1747            }
1748            match item.item_type() {
1749                CatalogItemType::View | CatalogItemType::MaterializedView => {
1750                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1751                    views.push((item.references().items().copied(), item.owner_id()));
1752                }
1753                CatalogItemType::Table | CatalogItemType::Source => {
1754                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1755                }
1756                CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1757                    privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1758                }
1759                CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1760            }
1761        }
1762    }
1763
1764    for (view_ids, view_owner) in views {
1765        privileges.extend_from_slice(&generate_read_privileges_inner(
1766            catalog, view_ids, view_owner, seen,
1767        ));
1768    }
1769
1770    privileges
1771}
1772
1773fn generate_usage_privileges(
1774    catalog: &impl SessionCatalog,
1775    ids: &ResolvedIds,
1776    role_id: RoleId,
1777    item_types: &BTreeSet<CatalogItemType>,
1778) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1779    // Use a `BTreeSet` to remove duplicate privileges.
1780    ids.items()
1781        .filter_map(move |id| {
1782            let item = catalog.get_item(id);
1783            if item_types.contains(&item.item_type()) {
1784                let schema_id = item.name().qualifiers.clone().into();
1785                Some([
1786                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1787                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1788                ])
1789            } else {
1790                None
1791            }
1792        })
1793        .flatten()
1794        .collect()
1795}
1796
1797fn generate_cluster_usage_privileges(
1798    expr_is_const: bool,
1799    target_cluster_id: Option<ClusterId>,
1800    role_id: RoleId,
1801) -> Option<(SystemObjectId, AclMode, RoleId)> {
1802    // TODO(jkosh44) expr hasn't been fully optimized yet, so it might actually be a constant,
1803    //  but we mistakenly think that it's not. For now it's ok to be overly protective.
1804    if !expr_is_const {
1805        if let Some(cluster_id) = target_cluster_id {
1806            return Some((
1807                SystemObjectId::Object(cluster_id.into()),
1808                AclMode::USAGE,
1809                role_id,
1810            ));
1811        }
1812    }
1813
1814    None
1815}
1816
1817fn check_object_privileges(
1818    catalog: &impl SessionCatalog,
1819    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1820    role_membership: BTreeSet<RoleId>,
1821    current_role_id: RoleId,
1822) -> Result<(), UnauthorizedError> {
1823    let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1824    role_memberships.insert(current_role_id, role_membership);
1825    for (object_id, acl_mode, role_id) in privileges {
1826        // Temporary schemas are owned by the connection that created them,
1827        // so users implicitly have all privileges on their own temp schema.
1828        // The schema may not exist yet (lazy creation), so we skip the check.
1829        if matches!(
1830            &object_id,
1831            SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary)))
1832        ) {
1833            continue;
1834        }
1835
1836        let role_membership = role_memberships
1837            .entry(role_id)
1838            .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1839        let object_privileges = catalog
1840            .get_privileges(&object_id)
1841            .expect("only object types with privileges will generate required privileges");
1842        let role_privileges = role_membership
1843            .iter()
1844            .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1845            .map(|mz_acl_item| mz_acl_item.acl_mode)
1846            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1847        if !role_privileges.contains(acl_mode) {
1848            let role_name = catalog.get_role(&role_id).name().to_string();
1849            let privileges = acl_mode.to_error_string();
1850            return Err(UnauthorizedError::Privilege {
1851                object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1852                role_name,
1853                privileges,
1854            });
1855        }
1856    }
1857
1858    Ok(())
1859}
1860
1861pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1862    const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1863        .union(AclMode::SELECT)
1864        .union(AclMode::UPDATE)
1865        .union(AclMode::DELETE);
1866    const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1867    const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1868        .union(AclMode::CREATE_DB)
1869        .union(AclMode::CREATE_CLUSTER)
1870        .union(AclMode::CREATE_NETWORK_POLICY);
1871
1872    const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1873    match object_type {
1874        SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1875        SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1876        SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1877        SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1878        SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1879        SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1880        SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1881        SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1882        SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1883        SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1884        SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1885        SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1886        SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1887        SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1888        SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1889        SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1890        SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1891    }
1892}
1893
1894pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1895    MzAclItem {
1896        grantee: owner_id,
1897        grantor: owner_id,
1898        acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1899    }
1900}
1901
1902const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1903    match object_type {
1904        ObjectType::Table
1905        | ObjectType::View
1906        | ObjectType::MaterializedView
1907        | ObjectType::Source => AclMode::SELECT,
1908        ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1909        ObjectType::Sink
1910        | ObjectType::Index
1911        | ObjectType::Role
1912        | ObjectType::Cluster
1913        | ObjectType::ClusterReplica
1914        | ObjectType::Secret
1915        | ObjectType::Connection
1916        | ObjectType::Database
1917        | ObjectType::Func
1918        | ObjectType::NetworkPolicy => AclMode::empty(),
1919    }
1920}
1921
1922pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1923    let acl_mode = default_builtin_object_acl_mode(object_type);
1924    MzAclItem {
1925        grantee: MZ_SUPPORT_ROLE_ID,
1926        grantor: MZ_SYSTEM_ROLE_ID,
1927        acl_mode,
1928    }
1929}
1930
1931pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1932    let acl_mode = default_builtin_object_acl_mode(object_type);
1933    MzAclItem {
1934        grantee: RoleId::Public,
1935        grantor: MZ_SYSTEM_ROLE_ID,
1936        acl_mode,
1937    }
1938}