mz_sql/
rbac.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26    CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29    CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30    SchemaSpecifier, SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34    DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40/// Common checks that need to be performed before we can start checking a role's privileges.
41fn rbac_check_preamble(
42    catalog: &impl SessionCatalog,
43    session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45    // PostgreSQL allows users that have their role dropped to perform some actions,
46    // such as `SET ROLE` and certain `SELECT` queries. We haven't implemented
47    // `SET ROLE` and feel it's safer to force to user to re-authenticate if their
48    // role is dropped.
49    if catalog
50        .try_get_role(&session_meta.role_metadata().current_role)
51        .is_none()
52    {
53        return Err(UnauthorizedError::ConcurrentRoleDrop(
54            session_meta.role_metadata().current_role.clone(),
55        ));
56    };
57    if catalog
58        .try_get_role(&session_meta.role_metadata().session_role)
59        .is_none()
60    {
61        return Err(UnauthorizedError::ConcurrentRoleDrop(
62            session_meta.role_metadata().session_role.clone(),
63        ));
64    };
65    if catalog
66        .try_get_role(&session_meta.role_metadata().authenticated_role)
67        .is_none()
68    {
69        return Err(UnauthorizedError::ConcurrentRoleDrop(
70            session_meta.role_metadata().authenticated_role.clone(),
71        ));
72    };
73
74    Ok(())
75}
76
77/// Filters `RbacRequirements` based on the session role metadata and RBAC related feature flags.
78fn filter_requirements(
79    catalog: &impl SessionCatalog,
80    session_meta: &dyn SessionMetadata,
81    rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83    // Skip RBAC non-mandatory checks if RBAC is disabled. However, we never skip RBAC checks for
84    // system roles. This allows us to limit access of system users even when RBAC is off.
85    let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86        && !session_meta.role_metadata().current_role.is_system()
87        && !session_meta.role_metadata().session_role.is_system();
88    // Skip RBAC checks on user items if the session is a superuser.
89    let is_superuser = session_meta.is_superuser();
90    if is_rbac_disabled || is_superuser {
91        return rbac_requirements.filter_to_mandatory_requirements();
92    }
93
94    rbac_requirements
95}
96
97// The default item types that most statements require USAGE privileges for.
98static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99    btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101// CREATE statements require USAGE privileges on the default item types and USAGE privileges on
102// Types.
103pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104    let mut items = DEFAULT_ITEM_USAGE.clone();
105    items.insert(CatalogItemType::Type);
106    items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110/// Errors that can occur due to an unauthorized action.
111#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113    /// The action can only be performed by a superuser.
114    #[error("permission denied to {action}")]
115    Superuser { action: String },
116    /// The action requires ownership of an object.
117    #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118    Ownership { objects: Vec<(ObjectType, String)> },
119    /// Altering an owner requires membership of the new owner role.
120    #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121    RoleMembership { role_names: Vec<String> },
122    /// The action requires one or more privileges.
123    #[error("permission denied for {object_description}")]
124    Privilege {
125        object_description: ErrorMessageObjectDescription,
126        role_name: String,
127        privileges: String,
128    },
129    // TODO(jkosh44) When we implement parameter privileges, this can be replaced with a regular
130    //  privilege error.
131    /// The action can only be performed by the mz_system role.
132    #[error("permission denied to {action}")]
133    MzSystem { action: String },
134    /// The action cannot be performed by the mz_support role.
135    #[error("permission denied to {action}")]
136    MzSupport { action: String },
137    /// The active role was dropped while a user was logged in.
138    #[error("role {0} was concurrently dropped")]
139    ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143    pub fn detail(&self) -> Option<String> {
144        match &self {
145            UnauthorizedError::Superuser { action } => {
146                Some(format!("You must be a superuser to {}", action))
147            }
148            UnauthorizedError::Privilege {
149                object_description,
150                role_name,
151                privileges,
152            } => Some(format!(
153                "The '{role_name}' role needs {privileges} privileges on {object_description}"
154            )),
155            UnauthorizedError::MzSystem { .. } => {
156                Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157            }
158            UnauthorizedError::MzSupport { .. } => Some(format!(
159                "The '{}' role has very limited privileges",
160                SUPPORT_USER.name
161            )),
162            UnauthorizedError::ConcurrentRoleDrop(_) => {
163                Some("Please disconnect and re-connect with a valid role.".into())
164            }
165            UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166        }
167    }
168}
169
170/// RBAC requirements for executing a given plan.
171#[derive(Debug)]
172struct RbacRequirements {
173    /// The role memberships required.
174    role_membership: BTreeSet<RoleId>,
175    /// The object ownerships required.
176    ownership: Vec<ObjectId>,
177    /// The privileges required. The tuples are of the form:
178    /// (What object the privilege is on, What privilege is required, Who must possess the privilege).
179    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180    /// The types of catalog items that this plan requires USAGE privileges on.
181    ///
182    /// Most plans will require USAGE on secrets and connections but some plans, like SHOW CREATE,
183    /// can reference an item without requiring any privileges on that item.
184    item_usage: &'static BTreeSet<CatalogItemType>,
185    /// Some action if superuser is required to perform that action, None otherwise.
186    superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190    fn empty() -> RbacRequirements {
191        RbacRequirements {
192            role_membership: BTreeSet::new(),
193            ownership: Vec::new(),
194            privileges: Vec::new(),
195            item_usage: &EMPTY_ITEM_USAGE,
196            superuser_action: None,
197        }
198    }
199
200    fn validate(
201        self,
202        catalog: &impl SessionCatalog,
203        session: &dyn SessionMetadata,
204        resolved_ids: &ResolvedIds,
205    ) -> Result<(), UnauthorizedError> {
206        // Obtain all roles that the current session is a member of.
207        let role_membership =
208            catalog.collect_role_membership(&session.role_metadata().current_role);
209
210        check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212        // Validate that the current session has the required role membership to execute the provided
213        // plan.
214        let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215        if !unheld_membership.is_empty() {
216            let role_names = unheld_membership
217                .into_iter()
218                .map(|role_id| {
219                    // Some role references may no longer exist due to concurrent drops.
220                    catalog
221                        .try_get_role(role_id)
222                        .map(|role| role.name().to_string())
223                        .unwrap_or_else(|| role_id.to_string())
224                })
225                .collect();
226            return Err(UnauthorizedError::RoleMembership { role_names });
227        }
228
229        // Validate that the current session has the required object ownership to execute the provided
230        // plan.
231        let unheld_ownership = self
232            .ownership
233            .into_iter()
234            .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235            .collect();
236        ownership_err(unheld_ownership, catalog)?;
237
238        check_object_privileges(
239            catalog,
240            self.privileges,
241            role_membership,
242            session.role_metadata().current_role,
243        )?;
244
245        if let Some(action) = self.superuser_action {
246            return Err(UnauthorizedError::Superuser { action });
247        }
248
249        Ok(())
250    }
251
252    fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253        let RbacRequirements {
254            role_membership,
255            ownership,
256            privileges,
257            item_usage,
258            superuser_action: _,
259        } = self;
260        let role_membership = role_membership
261            .into_iter()
262            .filter(|id| id.is_system())
263            .collect();
264        let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265        let privileges = privileges
266            .into_iter()
267            .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268            // We allow reading objects for superusers and when RBAC is off.
269            .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270            .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271            .collect();
272        let superuser_action = None;
273        RbacRequirements {
274            role_membership,
275            ownership,
276            privileges,
277            item_usage,
278            superuser_action,
279        }
280    }
281}
282
283impl Default for RbacRequirements {
284    fn default() -> Self {
285        RbacRequirements {
286            role_membership: BTreeSet::new(),
287            ownership: Vec::new(),
288            privileges: Vec::new(),
289            item_usage: &DEFAULT_ITEM_USAGE,
290            superuser_action: None,
291        }
292    }
293}
294
295/// Checks if a `session` is authorized to use `resolved_ids`. If not, an error is returned.
296pub fn check_usage(
297    catalog: &impl SessionCatalog,
298    session: &dyn SessionMetadata,
299    resolved_ids: &ResolvedIds,
300    item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302    rbac_check_preamble(catalog, session)?;
303
304    // Obtain all roles that the current session is a member of.
305    let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307    // Certain statements depend on objects that haven't been created yet, like sub-sources, so we
308    // need to filter those out.
309    let existing_resolved_ids =
310        resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312    let required_privileges = generate_usage_privileges(
313        catalog,
314        &existing_resolved_ids,
315        session.role_metadata().current_role,
316        item_types,
317    )
318    .into_iter()
319    .collect();
320
321    let mut rbac_requirements = RbacRequirements::empty();
322    rbac_requirements.privileges = required_privileges;
323    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324    let required_privileges = rbac_requirements.privileges;
325
326    check_object_privileges(
327        catalog,
328        required_privileges,
329        role_membership,
330        session.role_metadata().current_role,
331    )?;
332
333    Ok(())
334}
335
336/// Checks if a session is authorized to execute a plan. If not, an error is returned.
337pub fn check_plan(
338    catalog: &impl SessionCatalog,
339    // Function mapping a connection ID to an authenticated role. The roles may have been dropped concurrently.
340    // Only required for Plan::SideEffectingFunc; can be None for other plan types.
341    // TODO(peek-seq): Remove this when deleting the old peek sequencing. The logic here that uses
342    // `active_conns` is mirrored in `execute_side_effecting_func`, which is what the frontend peek
343    // sequencing uses.
344    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
345    session: &dyn SessionMetadata,
346    plan: &Plan,
347    target_cluster_id: Option<ClusterId>,
348    resolved_ids: &ResolvedIds,
349) -> Result<(), UnauthorizedError> {
350    rbac_check_preamble(catalog, session)?;
351
352    let rbac_requirements = generate_rbac_requirements(
353        catalog,
354        plan,
355        active_conns,
356        target_cluster_id,
357        session.role_metadata().current_role,
358    );
359    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
360    debug!(
361        "rbac requirements {rbac_requirements:?} for plan {:?}",
362        PlanKind::from(plan)
363    );
364    rbac_requirements.validate(catalog, session, resolved_ids)
365}
366
367/// Returns true if RBAC is turned on for a session, false otherwise.
368pub fn is_rbac_enabled_for_session(
369    system_vars: &SystemVars,
370    session: &dyn SessionMetadata,
371) -> bool {
372    let server_enabled = system_vars.enable_rbac_checks();
373    let session_enabled = session.enable_session_rbac_checks();
374
375    // The session flag allows users to turn RBAC on for just their session while the server flag
376    // allows users to turn RBAC on for everyone.
377    server_enabled || session_enabled
378}
379
380/// Generates all requirements needed to execute a given plan.
381fn generate_rbac_requirements(
382    catalog: &impl SessionCatalog,
383    plan: &Plan,
384    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
385    target_cluster_id: Option<ClusterId>,
386    role_id: RoleId,
387) -> RbacRequirements {
388    match plan {
389        Plan::CreateConnection(plan::CreateConnectionPlan {
390            name,
391            if_not_exists: _,
392            connection: _,
393            validate: _,
394        }) => RbacRequirements {
395            privileges: vec![(
396                SystemObjectId::Object(name.qualifiers.clone().into()),
397                AclMode::CREATE,
398                role_id,
399            )],
400            item_usage: &CREATE_ITEM_USAGE,
401            ..Default::default()
402        },
403        Plan::CreateDatabase(plan::CreateDatabasePlan {
404            name: _,
405            if_not_exists: _,
406        }) => RbacRequirements {
407            privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
408            item_usage: &CREATE_ITEM_USAGE,
409            ..Default::default()
410        },
411        Plan::CreateSchema(plan::CreateSchemaPlan {
412            database_spec,
413            schema_name: _,
414            if_not_exists: _,
415        }) => {
416            let privileges = match database_spec {
417                ResolvedDatabaseSpecifier::Ambient => Vec::new(),
418                ResolvedDatabaseSpecifier::Id(database_id) => {
419                    vec![(
420                        SystemObjectId::Object(database_id.into()),
421                        AclMode::CREATE,
422                        role_id,
423                    )]
424                }
425            };
426            RbacRequirements {
427                privileges,
428                item_usage: &CREATE_ITEM_USAGE,
429                ..Default::default()
430            }
431        }
432        Plan::CreateRole(plan::CreateRolePlan {
433            name: _,
434            attributes,
435        }) => {
436            if attributes.superuser.unwrap_or(false) {
437                RbacRequirements {
438                    superuser_action: Some("create superuser role".to_string()),
439                    ..Default::default()
440                }
441            } else {
442                RbacRequirements {
443                    privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
444                    item_usage: &CREATE_ITEM_USAGE,
445                    ..Default::default()
446                }
447            }
448        }
449        Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
450            privileges: vec![(
451                SystemObjectId::System,
452                AclMode::CREATE_NETWORK_POLICY,
453                role_id,
454            )],
455            item_usage: &CREATE_ITEM_USAGE,
456            ..Default::default()
457        },
458        Plan::CreateCluster(plan::CreateClusterPlan {
459            name: _,
460            variant: _,
461            workload_class: _,
462        }) => RbacRequirements {
463            privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
464            item_usage: &CREATE_ITEM_USAGE,
465            ..Default::default()
466        },
467        Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
468            cluster_id,
469            name: _,
470            config: _,
471        }) => RbacRequirements {
472            ownership: vec![ObjectId::Cluster(*cluster_id)],
473            item_usage: &CREATE_ITEM_USAGE,
474            ..Default::default()
475        },
476        Plan::CreateSource(plan::CreateSourcePlan {
477            name,
478            source,
479            if_not_exists: _,
480            timeline: _,
481            in_cluster,
482        }) => RbacRequirements {
483            privileges: generate_required_source_privileges(
484                name,
485                &source.data_source,
486                *in_cluster,
487                role_id,
488            ),
489            item_usage: &CREATE_ITEM_USAGE,
490            ..Default::default()
491        },
492        Plan::CreateSources(plans) => RbacRequirements {
493            privileges: plans
494                .iter()
495                .flat_map(
496                    |plan::CreateSourcePlanBundle {
497                         item_id: _,
498                         global_id: _,
499                         plan:
500                             plan::CreateSourcePlan {
501                                 name,
502                                 source,
503                                 if_not_exists: _,
504                                 timeline: _,
505                                 in_cluster,
506                             },
507                         resolved_ids: _,
508                         available_source_references: _,
509                     }| {
510                        generate_required_source_privileges(
511                            name,
512                            &source.data_source,
513                            *in_cluster,
514                            role_id,
515                        )
516                        .into_iter()
517                    },
518                )
519                .collect(),
520            item_usage: &CREATE_ITEM_USAGE,
521            ..Default::default()
522        },
523        Plan::CreateSecret(plan::CreateSecretPlan {
524            name,
525            secret: _,
526            if_not_exists: _,
527        }) => RbacRequirements {
528            privileges: vec![(
529                SystemObjectId::Object(name.qualifiers.clone().into()),
530                AclMode::CREATE,
531                role_id,
532            )],
533            item_usage: &CREATE_ITEM_USAGE,
534            ..Default::default()
535        },
536        Plan::CreateSink(plan::CreateSinkPlan {
537            name,
538            sink,
539            with_snapshot: _,
540            if_not_exists: _,
541            in_cluster,
542        }) => {
543            let mut privileges = vec![(
544                SystemObjectId::Object(name.qualifiers.clone().into()),
545                AclMode::CREATE,
546                role_id,
547            )];
548            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
549            privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
550            privileges.push((
551                SystemObjectId::Object(in_cluster.into()),
552                AclMode::CREATE,
553                role_id,
554            ));
555            RbacRequirements {
556                privileges,
557                item_usage: &CREATE_ITEM_USAGE,
558                ..Default::default()
559            }
560        }
561        Plan::CreateTable(plan::CreateTablePlan {
562            name,
563            table: _,
564            if_not_exists: _,
565        }) => RbacRequirements {
566            privileges: vec![(
567                SystemObjectId::Object(name.qualifiers.clone().into()),
568                AclMode::CREATE,
569                role_id,
570            )],
571            item_usage: &CREATE_ITEM_USAGE,
572            ..Default::default()
573        },
574        Plan::CreateView(plan::CreateViewPlan {
575            name,
576            view: _,
577            replace,
578            drop_ids: _,
579            if_not_exists: _,
580            ambiguous_columns: _,
581        }) => RbacRequirements {
582            ownership: replace
583                .map(|id| vec![ObjectId::Item(id)])
584                .unwrap_or_default(),
585            privileges: vec![(
586                SystemObjectId::Object(name.qualifiers.clone().into()),
587                AclMode::CREATE,
588                role_id,
589            )],
590            item_usage: &CREATE_ITEM_USAGE,
591            ..Default::default()
592        },
593        Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
594            name,
595            materialized_view,
596            replace,
597            drop_ids: _,
598            if_not_exists: _,
599            ambiguous_columns: _,
600        }) => RbacRequirements {
601            ownership: replace
602                .map(|id| vec![ObjectId::Item(id)])
603                .unwrap_or_default(),
604            privileges: vec![
605                (
606                    SystemObjectId::Object(name.qualifiers.clone().into()),
607                    AclMode::CREATE,
608                    role_id,
609                ),
610                (
611                    SystemObjectId::Object(materialized_view.cluster_id.into()),
612                    AclMode::CREATE,
613                    role_id,
614                ),
615            ],
616            item_usage: &CREATE_ITEM_USAGE,
617            ..Default::default()
618        },
619        Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
620            name,
621            placeholder_id: _,
622            desc: _,
623            input_id: _,
624            with_snapshot: _,
625            continual_task,
626        }) => RbacRequirements {
627            privileges: vec![
628                (
629                    SystemObjectId::Object(name.qualifiers.clone().into()),
630                    AclMode::CREATE,
631                    role_id,
632                ),
633                (
634                    SystemObjectId::Object(continual_task.cluster_id.into()),
635                    AclMode::CREATE,
636                    role_id,
637                ),
638            ],
639            item_usage: &CREATE_ITEM_USAGE,
640            ..Default::default()
641        },
642        Plan::CreateIndex(plan::CreateIndexPlan {
643            name,
644            index,
645            if_not_exists: _,
646        }) => {
647            let index_on_item = catalog.resolve_item_id(&index.on);
648            RbacRequirements {
649                ownership: vec![ObjectId::Item(index_on_item)],
650                privileges: vec![
651                    (
652                        SystemObjectId::Object(name.qualifiers.clone().into()),
653                        AclMode::CREATE,
654                        role_id,
655                    ),
656                    (
657                        SystemObjectId::Object(index.cluster_id.into()),
658                        AclMode::CREATE,
659                        role_id,
660                    ),
661                ],
662                item_usage: &CREATE_ITEM_USAGE,
663                ..Default::default()
664            }
665        }
666        Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
667            privileges: vec![(
668                SystemObjectId::Object(name.qualifiers.clone().into()),
669                AclMode::CREATE,
670                role_id,
671            )],
672            item_usage: &CREATE_ITEM_USAGE,
673            ..Default::default()
674        },
675        Plan::Comment(plan::CommentPlan {
676            object_id,
677            sub_component: _,
678            comment: _,
679        }) => {
680            let (ownership, privileges) = match object_id {
681                // Roles don't have owners, instead we require the current session to have the
682                // `CREATEROLE` privilege.
683                CommentObjectId::Role(_) => (
684                    Vec::new(),
685                    vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
686                ),
687                _ => (vec![ObjectId::from(*object_id)], Vec::new()),
688            };
689            RbacRequirements {
690                ownership,
691                privileges,
692                ..Default::default()
693            }
694        }
695        Plan::DropObjects(plan::DropObjectsPlan {
696            referenced_ids,
697            drop_ids: _,
698            object_type,
699        }) => {
700            let privileges = if object_type == &ObjectType::Role {
701                vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
702            } else {
703                referenced_ids
704                    .iter()
705                    .filter_map(|id| match id {
706                        ObjectId::ClusterReplica((cluster_id, _)) => Some((
707                            SystemObjectId::Object(cluster_id.into()),
708                            AclMode::USAGE,
709                            role_id,
710                        )),
711                        ObjectId::Schema((database_spec, _)) => match database_spec {
712                            ResolvedDatabaseSpecifier::Ambient => None,
713                            ResolvedDatabaseSpecifier::Id(database_id) => Some((
714                                SystemObjectId::Object(database_id.into()),
715                                AclMode::USAGE,
716                                role_id,
717                            )),
718                        },
719                        ObjectId::Item(item_id) => {
720                            let item = catalog.get_item(item_id);
721                            Some((
722                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
723                                AclMode::USAGE,
724                                role_id,
725                            ))
726                        }
727                        ObjectId::Cluster(_)
728                        | ObjectId::Database(_)
729                        | ObjectId::Role(_)
730                        | ObjectId::NetworkPolicy(_) => None,
731                    })
732                    .collect()
733            };
734            RbacRequirements {
735                // Do not need ownership of descendant objects.
736                ownership: referenced_ids.clone(),
737                privileges,
738                ..Default::default()
739            }
740        }
741        Plan::DropOwned(plan::DropOwnedPlan {
742            role_ids,
743            drop_ids: _,
744            privilege_revokes: _,
745            default_privilege_revokes: _,
746        }) => RbacRequirements {
747            role_membership: role_ids.into_iter().cloned().collect(),
748            ..Default::default()
749        },
750        Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
751            let container_id = match id {
752                ObjectId::Item(id) => Some(SystemObjectId::Object(
753                    catalog.get_item(id).name().qualifiers.clone().into(),
754                )),
755                ObjectId::Schema((database_id, _schema_id)) => match database_id {
756                    ResolvedDatabaseSpecifier::Ambient => None,
757                    ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
758                },
759                ObjectId::Cluster(_)
760                | ObjectId::ClusterReplica(_)
761                | ObjectId::Database(_)
762                | ObjectId::Role(_)
763                | ObjectId::NetworkPolicy(_) => None,
764            };
765            let privileges = match container_id {
766                Some(id) => vec![(id, AclMode::USAGE, role_id)],
767                None => Vec::new(),
768            };
769            RbacRequirements {
770                privileges,
771                item_usage: &EMPTY_ITEM_USAGE,
772                ..Default::default()
773            }
774        }
775        Plan::ShowColumns(plan::ShowColumnsPlan {
776            id,
777            select_plan,
778            new_resolved_ids: _,
779        }) => {
780            let mut privileges = vec![(
781                SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
782                AclMode::USAGE,
783                role_id,
784            )];
785
786            for privilege in generate_rbac_requirements(
787                catalog,
788                &Plan::Select(select_plan.clone()),
789                active_conns,
790                target_cluster_id,
791                role_id,
792            )
793            .privileges
794            {
795                privileges.push(privilege);
796            }
797            RbacRequirements {
798                privileges,
799                ..Default::default()
800            }
801        }
802        Plan::Select(plan::SelectPlan {
803            source,
804            select: _,
805            when: _,
806            finishing: _,
807            copy_to: _,
808        }) => {
809            let items = source
810                .depends_on()
811                .into_iter()
812                .map(|gid| catalog.resolve_item_id(&gid));
813            let mut privileges = generate_read_privileges(catalog, items, role_id);
814            if let Some(privilege) = generate_cluster_usage_privileges(
815                source.as_const().is_some(),
816                target_cluster_id,
817                role_id,
818            ) {
819                privileges.push(privilege);
820            }
821            RbacRequirements {
822                privileges,
823                ..Default::default()
824            }
825        }
826        Plan::Subscribe(plan::SubscribePlan {
827            from,
828            with_snapshot: _,
829            when: _,
830            up_to: _,
831            copy_to: _,
832            emit_progress: _,
833            output: _,
834        }) => {
835            let items = from
836                .depends_on()
837                .into_iter()
838                .map(|gid| catalog.resolve_item_id(&gid));
839            let mut privileges = generate_read_privileges(catalog, items, role_id);
840            if let Some(cluster_id) = target_cluster_id {
841                privileges.push((
842                    SystemObjectId::Object(cluster_id.into()),
843                    AclMode::USAGE,
844                    role_id,
845                ));
846            }
847            RbacRequirements {
848                privileges,
849                ..Default::default()
850            }
851        }
852        Plan::CopyFrom(plan::CopyFromPlan {
853            target_name: _,
854            target_id,
855            source: _,
856            columns: _,
857            source_desc: _,
858            mfp: _,
859            params: _,
860            filter: _,
861        }) => RbacRequirements {
862            privileges: vec![
863                (
864                    SystemObjectId::Object(
865                        catalog.get_item(target_id).name().qualifiers.clone().into(),
866                    ),
867                    AclMode::USAGE,
868                    role_id,
869                ),
870                (
871                    SystemObjectId::Object(target_id.into()),
872                    AclMode::INSERT,
873                    role_id,
874                ),
875            ],
876            ..Default::default()
877        },
878        Plan::CopyTo(plan::CopyToPlan {
879            select_plan,
880            desc: _,
881            to: _,
882            connection: _,
883            connection_id: _,
884            format: _,
885            max_file_size: _,
886        }) => {
887            let items = select_plan
888                .source
889                .depends_on()
890                .into_iter()
891                .map(|gid| catalog.resolve_item_id(&gid));
892            let mut privileges = generate_read_privileges(catalog, items, role_id);
893            if let Some(cluster_id) = target_cluster_id {
894                privileges.push((
895                    SystemObjectId::Object(cluster_id.into()),
896                    AclMode::USAGE,
897                    role_id,
898                ));
899            }
900            RbacRequirements {
901                privileges,
902                ..Default::default()
903            }
904        }
905        Plan::ExplainPlan(plan::ExplainPlanPlan {
906            stage: _,
907            format: _,
908            config: _,
909            explainee,
910        })
911        | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
912            privileges: match explainee {
913                Explainee::View(id)
914                | Explainee::MaterializedView(id)
915                | Explainee::Index(id)
916                | Explainee::ReplanView(id)
917                | Explainee::ReplanMaterializedView(id)
918                | Explainee::ReplanIndex(id) => {
919                    let item = catalog.get_item(id);
920                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
921                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
922                }
923                Explainee::Statement(stmt) => stmt
924                    .depends_on()
925                    .into_iter()
926                    .map(|id| {
927                        let item = catalog.get_item_by_global_id(&id);
928                        let schema_id: ObjectId = item.name().qualifiers.clone().into();
929                        (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
930                    })
931                    .collect(),
932            },
933            item_usage: match explainee {
934                Explainee::View(..)
935                | Explainee::MaterializedView(..)
936                | Explainee::Index(..)
937                | Explainee::ReplanView(..)
938                | Explainee::ReplanMaterializedView(..)
939                | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
940                Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
941            },
942            ..Default::default()
943        },
944        Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
945            RbacRequirements {
946                privileges: {
947                    let item = catalog.get_item_by_global_id(sink_from);
948                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
949                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
950                },
951                item_usage: &EMPTY_ITEM_USAGE,
952                ..Default::default()
953            }
954        }
955        Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
956            format: _,
957            raw_plan,
958            when: _,
959        }) => RbacRequirements {
960            privileges: raw_plan
961                .depends_on()
962                .into_iter()
963                .map(|id| {
964                    let item = catalog.get_item_by_global_id(&id);
965                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
966                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
967                })
968                .collect(),
969            ..Default::default()
970        },
971        Plan::Insert(plan::InsertPlan {
972            id,
973            values,
974            returning,
975        }) => {
976            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
977            let mut privileges = vec![
978                (
979                    SystemObjectId::Object(schema_id.clone()),
980                    AclMode::USAGE,
981                    role_id,
982                ),
983                (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
984            ];
985            let mut seen = BTreeSet::from([(schema_id, role_id)]);
986
987            // We don't allow arbitrary sub-queries in `returning`. So either it
988            // contains a column reference to the outer table or it's constant.
989            if returning
990                .iter()
991                .any(|assignment| assignment.contains_column())
992            {
993                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
994                seen.insert((id.into(), role_id));
995            }
996
997            let items = values
998                .depends_on()
999                .into_iter()
1000                .map(|gid| catalog.resolve_item_id(&gid));
1001            privileges.extend_from_slice(&generate_read_privileges_inner(
1002                catalog, items, role_id, &mut seen,
1003            ));
1004
1005            if let Some(privilege) = generate_cluster_usage_privileges(
1006                values.as_const().is_some(),
1007                target_cluster_id,
1008                role_id,
1009            ) {
1010                privileges.push(privilege);
1011            } else if !returning.is_empty() {
1012                // TODO(jkosh44) returning may be a constant, but for now we are overly protective
1013                //  and require cluster privileges for all returning.
1014                if let Some(cluster_id) = target_cluster_id {
1015                    privileges.push((
1016                        SystemObjectId::Object(cluster_id.into()),
1017                        AclMode::USAGE,
1018                        role_id,
1019                    ));
1020                }
1021            }
1022            RbacRequirements {
1023                privileges,
1024                ..Default::default()
1025            }
1026        }
1027        Plan::AlterCluster(plan::AlterClusterPlan {
1028            id,
1029            name: _,
1030            options: _,
1031            strategy: _,
1032        }) => RbacRequirements {
1033            ownership: vec![ObjectId::Cluster(*id)],
1034            item_usage: &CREATE_ITEM_USAGE,
1035            ..Default::default()
1036        },
1037        Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1038            ownership: vec![ObjectId::Item(*id)],
1039            privileges: vec![(
1040                SystemObjectId::Object(set_cluster.into()),
1041                AclMode::CREATE,
1042                role_id,
1043            )],
1044            item_usage: &CREATE_ITEM_USAGE,
1045            ..Default::default()
1046        },
1047        Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1048            id,
1049            window: _,
1050            value: _,
1051            object_type: _,
1052        }) => RbacRequirements {
1053            ownership: vec![ObjectId::Item(*id)],
1054            item_usage: &CREATE_ITEM_USAGE,
1055            ..Default::default()
1056        },
1057        Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1058            ownership: vec![ObjectId::Item(*id)],
1059            ..Default::default()
1060        },
1061        Plan::AlterSource(plan::AlterSourcePlan {
1062            item_id,
1063            ingestion_id: _,
1064            action: _,
1065        }) => RbacRequirements {
1066            ownership: vec![ObjectId::Item(*item_id)],
1067            item_usage: &CREATE_ITEM_USAGE,
1068            ..Default::default()
1069        },
1070        Plan::AlterSink(plan::AlterSinkPlan {
1071            item_id,
1072            global_id: _,
1073            sink,
1074            with_snapshot: _,
1075            in_cluster,
1076        }) => {
1077            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1078            let mut privileges = generate_read_privileges(catalog, items, role_id);
1079            privileges.push((
1080                SystemObjectId::Object(in_cluster.into()),
1081                AclMode::CREATE,
1082                role_id,
1083            ));
1084            RbacRequirements {
1085                ownership: vec![ObjectId::Item(*item_id)],
1086                privileges,
1087                item_usage: &CREATE_ITEM_USAGE,
1088                ..Default::default()
1089            }
1090        }
1091        Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1092            id,
1093            name: _,
1094            to_name: _,
1095        }) => RbacRequirements {
1096            ownership: vec![ObjectId::Cluster(*id)],
1097            ..Default::default()
1098        },
1099        Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1100            id_a,
1101            id_b,
1102            name_a: _,
1103            name_b: _,
1104            name_temp: _,
1105        }) => RbacRequirements {
1106            ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1107            ..Default::default()
1108        },
1109        Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1110            cluster_id,
1111            replica_id,
1112            name: _,
1113            to_name: _,
1114        }) => RbacRequirements {
1115            ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1116            ..Default::default()
1117        },
1118        Plan::AlterItemRename(plan::AlterItemRenamePlan {
1119            id,
1120            current_full_name: _,
1121            to_name: _,
1122            object_type: _,
1123        }) => RbacRequirements {
1124            ownership: vec![ObjectId::Item(*id)],
1125            ..Default::default()
1126        },
1127        Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1128            cur_schema_spec,
1129            new_schema_name: _,
1130        }) => {
1131            let privileges = match cur_schema_spec.0 {
1132                ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1133                    SystemObjectId::Object(ObjectId::Database(db_id)),
1134                    AclMode::CREATE,
1135                    role_id,
1136                )],
1137                ResolvedDatabaseSpecifier::Ambient => vec![],
1138            };
1139
1140            RbacRequirements {
1141                ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1142                privileges,
1143                ..Default::default()
1144            }
1145        }
1146        Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1147            schema_a_spec,
1148            schema_a_name: _,
1149            schema_b_spec,
1150            schema_b_name: _,
1151            name_temp: _,
1152        }) => {
1153            let mut privileges = vec![];
1154            if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1155                privileges.push((
1156                    SystemObjectId::Object(ObjectId::Database(id_a)),
1157                    AclMode::CREATE,
1158                    role_id,
1159                ));
1160            }
1161            if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1162                privileges.push((
1163                    SystemObjectId::Object(ObjectId::Database(id_b)),
1164                    AclMode::CREATE,
1165                    role_id,
1166                ));
1167            }
1168
1169            RbacRequirements {
1170                ownership: vec![
1171                    ObjectId::Schema(*schema_a_spec),
1172                    ObjectId::Schema(*schema_b_spec),
1173                ],
1174                privileges,
1175                ..Default::default()
1176            }
1177        }
1178        Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1179            ownership: vec![ObjectId::Item(*id)],
1180            item_usage: &CREATE_ITEM_USAGE,
1181            ..Default::default()
1182        },
1183        Plan::AlterRole(plan::AlterRolePlan {
1184            id,
1185            name: _,
1186            option,
1187        }) => match option {
1188            // Only superusers can alter the superuserness of a role.
1189            plan::PlannedAlterRoleOption::Attributes(attributes)
1190                if attributes.superuser.unwrap_or(false) =>
1191            {
1192                RbacRequirements {
1193                    superuser_action: Some("alter superuser role".to_string()),
1194                    ..Default::default()
1195                }
1196            }
1197            // Roles are allowed to change their own password.
1198            plan::PlannedAlterRoleOption::Attributes(attributes)
1199                if attributes.password.is_some() && role_id == *id =>
1200            {
1201                RbacRequirements::default()
1202            }
1203            // But no one elses...
1204            plan::PlannedAlterRoleOption::Attributes(attributes)
1205                if attributes.password.is_some() =>
1206            {
1207                RbacRequirements {
1208                    superuser_action: Some("alter password of role".to_string()),
1209                    ..Default::default()
1210                }
1211            }
1212            // Roles are allowed to change their own variables.
1213            plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1214                RbacRequirements::default()
1215            }
1216            // Otherwise to ALTER a role, you need to have the CREATE_ROLE privilege.
1217            _ => RbacRequirements {
1218                privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1219                item_usage: &CREATE_ITEM_USAGE,
1220                ..Default::default()
1221            },
1222        },
1223        Plan::AlterOwner(plan::AlterOwnerPlan {
1224            id,
1225            object_type: _,
1226            new_owner,
1227        }) => {
1228            let privileges = match id {
1229                ObjectId::ClusterReplica((cluster_id, _)) => {
1230                    vec![(
1231                        SystemObjectId::Object(cluster_id.into()),
1232                        AclMode::CREATE,
1233                        role_id,
1234                    )]
1235                }
1236                ObjectId::Schema((database_spec, _)) => match database_spec {
1237                    ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1238                    ResolvedDatabaseSpecifier::Id(database_id) => {
1239                        vec![(
1240                            SystemObjectId::Object(database_id.into()),
1241                            AclMode::CREATE,
1242                            role_id,
1243                        )]
1244                    }
1245                },
1246                ObjectId::Item(item_id) => {
1247                    let item = catalog.get_item(item_id);
1248                    vec![(
1249                        SystemObjectId::Object(item.name().qualifiers.clone().into()),
1250                        AclMode::CREATE,
1251                        role_id,
1252                    )]
1253                }
1254                ObjectId::Cluster(_)
1255                | ObjectId::Database(_)
1256                | ObjectId::Role(_)
1257                | ObjectId::NetworkPolicy(_) => Vec::new(),
1258            };
1259            RbacRequirements {
1260                role_membership: BTreeSet::from([*new_owner]),
1261                ownership: vec![id.clone()],
1262                privileges,
1263                ..Default::default()
1264            }
1265        }
1266        Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1267            ownership: vec![ObjectId::Item(*relation_id)],
1268            item_usage: &CREATE_ITEM_USAGE,
1269            ..Default::default()
1270        },
1271        Plan::AlterMaterializedViewApplyReplacement(
1272            plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1273        ) => RbacRequirements {
1274            ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1275            item_usage: &CREATE_ITEM_USAGE,
1276            ..Default::default()
1277        },
1278        Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1279            ownership: vec![ObjectId::NetworkPolicy(*id)],
1280            item_usage: &CREATE_ITEM_USAGE,
1281            ..Default::default()
1282        },
1283        Plan::ReadThenWrite(plan::ReadThenWritePlan {
1284            id,
1285            selection,
1286            finishing: _,
1287            assignments,
1288            kind,
1289            returning,
1290        }) => {
1291            let acl_mode = match kind {
1292                MutationKind::Insert => AclMode::INSERT,
1293                MutationKind::Update => AclMode::UPDATE,
1294                MutationKind::Delete => AclMode::DELETE,
1295            };
1296            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1297            let mut privileges = vec![
1298                (
1299                    SystemObjectId::Object(schema_id.clone()),
1300                    AclMode::USAGE,
1301                    role_id,
1302                ),
1303                (SystemObjectId::Object(id.into()), acl_mode, role_id),
1304            ];
1305            let mut seen = BTreeSet::from([(schema_id, role_id)]);
1306
1307            // We don't allow arbitrary sub-queries in `assignments` or `returning`. So either they
1308            // contains a column reference to the outer table or it's constant.
1309            if assignments
1310                .values()
1311                .chain(returning.iter())
1312                .any(|assignment| assignment.contains_column())
1313            {
1314                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1315                seen.insert((id.into(), role_id));
1316            }
1317
1318            // TODO(jkosh44) It's fairly difficult to determine what part of `selection` is from a
1319            //  user specified read and what part is from the implementation of the read then write.
1320            //  instead we are overly protective and always require SELECT privileges even though
1321            //  PostgreSQL doesn't always do this.
1322            //  As a concrete example, we require SELECT and UPDATE privileges to execute
1323            //  `UPDATE t SET a = 42;`, while PostgreSQL only requires UPDATE privileges.
1324            let items = selection
1325                .depends_on()
1326                .into_iter()
1327                .map(|gid| catalog.resolve_item_id(&gid));
1328            privileges.extend_from_slice(&generate_read_privileges_inner(
1329                catalog, items, role_id, &mut seen,
1330            ));
1331
1332            if let Some(privilege) = generate_cluster_usage_privileges(
1333                selection.as_const().is_some(),
1334                target_cluster_id,
1335                role_id,
1336            ) {
1337                privileges.push(privilege);
1338            }
1339            RbacRequirements {
1340                privileges,
1341                ..Default::default()
1342            }
1343        }
1344        Plan::GrantRole(plan::GrantRolePlan {
1345            role_ids: _,
1346            member_ids: _,
1347            grantor_id: _,
1348        })
1349        | Plan::RevokeRole(plan::RevokeRolePlan {
1350            role_ids: _,
1351            member_ids: _,
1352            grantor_id: _,
1353        }) => RbacRequirements {
1354            privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1355            ..Default::default()
1356        },
1357        Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1358            update_privileges,
1359            grantees: _,
1360        })
1361        | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1362            update_privileges,
1363            revokees: _,
1364        }) => {
1365            let mut privileges = Vec::with_capacity(update_privileges.len());
1366            for UpdatePrivilege { target_id, .. } in update_privileges {
1367                match target_id {
1368                    SystemObjectId::Object(object_id) => match object_id {
1369                        ObjectId::ClusterReplica((cluster_id, _)) => {
1370                            privileges.push((
1371                                SystemObjectId::Object(cluster_id.into()),
1372                                AclMode::USAGE,
1373                                role_id,
1374                            ));
1375                        }
1376                        ObjectId::Schema((database_spec, _)) => match database_spec {
1377                            ResolvedDatabaseSpecifier::Ambient => {}
1378                            ResolvedDatabaseSpecifier::Id(database_id) => {
1379                                privileges.push((
1380                                    SystemObjectId::Object(database_id.into()),
1381                                    AclMode::USAGE,
1382                                    role_id,
1383                                ));
1384                            }
1385                        },
1386                        ObjectId::Item(item_id) => {
1387                            let item = catalog.get_item(item_id);
1388                            privileges.push((
1389                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
1390                                AclMode::USAGE,
1391                                role_id,
1392                            ))
1393                        }
1394                        ObjectId::Cluster(_)
1395                        | ObjectId::Database(_)
1396                        | ObjectId::Role(_)
1397                        | ObjectId::NetworkPolicy(_) => {}
1398                    },
1399                    SystemObjectId::System => {}
1400                }
1401            }
1402            RbacRequirements {
1403                ownership: update_privileges
1404                    .iter()
1405                    .filter_map(|update_privilege| update_privilege.target_id.object_id())
1406                    .cloned()
1407                    .collect(),
1408                privileges,
1409                // To grant/revoke a privilege on some object, generally the grantor/revoker must be the
1410                // owner of that object (or have a grant option on that object which isn't implemented in
1411                // Materialize yet). There is no owner of the entire system, so it's only reasonable to
1412                // restrict granting/revoking system privileges to superusers.
1413                superuser_action: if update_privileges
1414                    .iter()
1415                    .any(|update_privilege| update_privilege.target_id.is_system())
1416                {
1417                    Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1418                } else {
1419                    None
1420                },
1421                ..Default::default()
1422            }
1423        }
1424        Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1425            privilege_objects,
1426            privilege_acl_items: _,
1427            is_grant: _,
1428        }) => RbacRequirements {
1429            role_membership: privilege_objects
1430                .iter()
1431                .map(|privilege_object| privilege_object.role_id)
1432                .collect(),
1433            privileges: privilege_objects
1434                .into_iter()
1435                .filter_map(|privilege_object| {
1436                    if let (Some(database_id), Some(_)) =
1437                        (privilege_object.database_id, privilege_object.schema_id)
1438                    {
1439                        Some((
1440                            SystemObjectId::Object(database_id.into()),
1441                            AclMode::USAGE,
1442                            role_id,
1443                        ))
1444                    } else {
1445                        None
1446                    }
1447                })
1448                .collect(),
1449            // Altering the default privileges for the PUBLIC role (aka ALL ROLES) will affect all roles
1450            // that currently exist and roles that will exist in the future. It's impossible for an exising
1451            // role to be a member of a role that doesn't exist yet, so no current role could possibly have
1452            // the privileges required to alter default privileges for the PUBLIC role. Therefore we
1453            // only superusers can alter default privileges for the PUBLIC role.
1454            superuser_action: if privilege_objects
1455                .iter()
1456                .any(|privilege_object| privilege_object.role_id.is_public())
1457            {
1458                Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1459            } else {
1460                None
1461            },
1462            ..Default::default()
1463        },
1464        Plan::ReassignOwned(plan::ReassignOwnedPlan {
1465            old_roles,
1466            new_role,
1467            reassign_ids: _,
1468        }) => RbacRequirements {
1469            role_membership: old_roles
1470                .into_iter()
1471                .cloned()
1472                .chain(iter::once(*new_role))
1473                .collect(),
1474            ..Default::default()
1475        },
1476        Plan::SideEffectingFunc(func) => {
1477            let role_membership = match func {
1478                SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1479                    .expect("active_conns is required for Plan::SideEffectingFunc")(
1480                    *connection_id
1481                )
1482                .map(|x| [x].into())
1483                .unwrap_or_default(),
1484            };
1485            RbacRequirements {
1486                role_membership,
1487                ..Default::default()
1488            }
1489        }
1490        Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1491            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1492            RbacRequirements {
1493                privileges: vec![
1494                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1495                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1496                ],
1497                ..Default::default()
1498            }
1499        }
1500        Plan::DiscardTemp
1501        | Plan::DiscardAll
1502        | Plan::EmptyQuery
1503        | Plan::ShowAllVariables
1504        | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1505        | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1506        | Plan::SetVariable(plan::SetVariablePlan {
1507            name: _,
1508            value: _,
1509            local: _,
1510        })
1511        | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1512        | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1513        | Plan::StartTransaction(plan::StartTransactionPlan {
1514            access: _,
1515            isolation_level: _,
1516        })
1517        | Plan::CommitTransaction(plan::CommitTransactionPlan {
1518            transaction_type: _,
1519        })
1520        | Plan::AbortTransaction(plan::AbortTransactionPlan {
1521            transaction_type: _,
1522        })
1523        | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1524        | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1525        | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1526        | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1527        | Plan::Declare(plan::DeclarePlan {
1528            name: _,
1529            stmt: _,
1530            sql: _,
1531            params: _,
1532        })
1533        | Plan::Fetch(plan::FetchPlan {
1534            name: _,
1535            count: _,
1536            timeout: _,
1537        })
1538        | Plan::Close(plan::ClosePlan { name: _ })
1539        | Plan::Prepare(plan::PreparePlan {
1540            name: _,
1541            stmt: _,
1542            desc: _,
1543            sql: _,
1544        })
1545        | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1546        | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1547        | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1548    }
1549}
1550
1551/// Reports whether any role has ownership over an object.
1552fn check_owner_roles(
1553    object_id: &ObjectId,
1554    role_ids: &BTreeSet<RoleId>,
1555    catalog: &impl SessionCatalog,
1556) -> bool {
1557    if let Some(owner_id) = catalog.get_owner_id(object_id) {
1558        role_ids.contains(&owner_id)
1559    } else {
1560        true
1561    }
1562}
1563
1564fn ownership_err(
1565    unheld_ownership: Vec<ObjectId>,
1566    catalog: &impl SessionCatalog,
1567) -> Result<(), UnauthorizedError> {
1568    if !unheld_ownership.is_empty() {
1569        let objects = unheld_ownership
1570            .into_iter()
1571            .map(|ownership| match ownership {
1572                ObjectId::Cluster(id) => (
1573                    ObjectType::Cluster,
1574                    catalog.get_cluster(id).name().to_string(),
1575                ),
1576                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1577                    let cluster = catalog.get_cluster(cluster_id);
1578                    let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1579                    // Note: using unchecked here is okay because the values are coming from an
1580                    // already existing name.
1581                    let name = QualifiedReplica {
1582                        cluster: Ident::new_unchecked(cluster.name()),
1583                        replica: Ident::new_unchecked(replica.name()),
1584                    };
1585                    (ObjectType::ClusterReplica, name.to_string())
1586                }
1587                ObjectId::Database(id) => (
1588                    ObjectType::Database,
1589                    catalog.get_database(&id).name().to_string(),
1590                ),
1591                ObjectId::Schema((database_spec, schema_spec)) => {
1592                    let schema = catalog.get_schema(&database_spec, &schema_spec);
1593                    let name = catalog.resolve_full_schema_name(schema.name());
1594                    (ObjectType::Schema, name.to_string())
1595                }
1596                ObjectId::Item(id) => {
1597                    let item = catalog.get_item(&id);
1598                    let name = catalog.resolve_full_name(item.name());
1599                    (item.item_type().into(), name.to_string())
1600                }
1601                ObjectId::NetworkPolicy(id) => (
1602                    ObjectType::NetworkPolicy,
1603                    catalog.get_network_policy(&id).name().to_string(),
1604                ),
1605                ObjectId::Role(_) => unreachable!("roles have no owner"),
1606            })
1607            .collect();
1608        Err(UnauthorizedError::Ownership { objects })
1609    } else {
1610        Ok(())
1611    }
1612}
1613
1614fn generate_required_source_privileges(
1615    name: &QualifiedItemName,
1616    data_source: &DataSourceDesc,
1617    in_cluster: Option<ClusterId>,
1618    role_id: RoleId,
1619) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1620    let mut privileges = vec![(
1621        SystemObjectId::Object(name.qualifiers.clone().into()),
1622        AclMode::CREATE,
1623        role_id,
1624    )];
1625    match (data_source, in_cluster) {
1626        (_, Some(id)) => {
1627            privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1628        }
1629        (DataSourceDesc::Ingestion(_), None) => {
1630            privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1631        }
1632        // Non-ingestion data-sources have meaningless cluster config's (for now...) and they need
1633        // to be ignored.
1634        // This feels very brittle, but there's not much we can do until the UNDEFINED cluster
1635        // config is removed.
1636        (_, None) => {}
1637    }
1638    privileges
1639}
1640
1641/// Generates all the privileges required to execute a read that includes the objects in `ids`.
1642///
1643/// Not only do we need to validate that `role_id` has read privileges on all relations in `ids`,
1644/// but if any object is a view or materialized view then we need to validate that the owner of
1645/// that view has all of the privileges required to execute the query within the view.
1646///
1647/// For more details see: <https://www.postgresql.org/docs/15/rules-privileges.html>
1648fn generate_read_privileges(
1649    catalog: &impl SessionCatalog,
1650    ids: impl Iterator<Item = CatalogItemId>,
1651    role_id: RoleId,
1652) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1653    generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1654}
1655
1656fn generate_read_privileges_inner(
1657    catalog: &impl SessionCatalog,
1658    ids: impl Iterator<Item = CatalogItemId>,
1659    role_id: RoleId,
1660    seen: &mut BTreeSet<(ObjectId, RoleId)>,
1661) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1662    let mut privileges = Vec::new();
1663    let mut views = Vec::new();
1664
1665    for id in ids {
1666        if seen.insert((id.into(), role_id)) {
1667            let item = catalog.get_item(&id);
1668            let schema_id: ObjectId = item.name().qualifiers.clone().into();
1669            if seen.insert((schema_id.clone(), role_id)) {
1670                privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1671            }
1672            match item.item_type() {
1673                CatalogItemType::View
1674                | CatalogItemType::MaterializedView
1675                | CatalogItemType::ContinualTask => {
1676                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1677                    views.push((item.references().items().copied(), item.owner_id()));
1678                }
1679                CatalogItemType::Table | CatalogItemType::Source => {
1680                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1681                }
1682                CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1683                    privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1684                }
1685                CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1686            }
1687        }
1688    }
1689
1690    for (view_ids, view_owner) in views {
1691        privileges.extend_from_slice(&generate_read_privileges_inner(
1692            catalog, view_ids, view_owner, seen,
1693        ));
1694    }
1695
1696    privileges
1697}
1698
1699fn generate_usage_privileges(
1700    catalog: &impl SessionCatalog,
1701    ids: &ResolvedIds,
1702    role_id: RoleId,
1703    item_types: &BTreeSet<CatalogItemType>,
1704) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1705    // Use a `BTreeSet` to remove duplicate privileges.
1706    ids.items()
1707        .filter_map(move |id| {
1708            let item = catalog.get_item(id);
1709            if item_types.contains(&item.item_type()) {
1710                let schema_id = item.name().qualifiers.clone().into();
1711                Some([
1712                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1713                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1714                ])
1715            } else {
1716                None
1717            }
1718        })
1719        .flatten()
1720        .collect()
1721}
1722
1723fn generate_cluster_usage_privileges(
1724    expr_is_const: bool,
1725    target_cluster_id: Option<ClusterId>,
1726    role_id: RoleId,
1727) -> Option<(SystemObjectId, AclMode, RoleId)> {
1728    // TODO(jkosh44) expr hasn't been fully optimized yet, so it might actually be a constant,
1729    //  but we mistakenly think that it's not. For now it's ok to be overly protective.
1730    if !expr_is_const {
1731        if let Some(cluster_id) = target_cluster_id {
1732            return Some((
1733                SystemObjectId::Object(cluster_id.into()),
1734                AclMode::USAGE,
1735                role_id,
1736            ));
1737        }
1738    }
1739
1740    None
1741}
1742
1743fn check_object_privileges(
1744    catalog: &impl SessionCatalog,
1745    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1746    role_membership: BTreeSet<RoleId>,
1747    current_role_id: RoleId,
1748) -> Result<(), UnauthorizedError> {
1749    let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1750    role_memberships.insert(current_role_id, role_membership);
1751    for (object_id, acl_mode, role_id) in privileges {
1752        // Temporary schemas are owned by the connection that created them,
1753        // so users implicitly have all privileges on their own temp schema.
1754        // The schema may not exist yet (lazy creation), so we skip the check.
1755        if matches!(
1756            &object_id,
1757            SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary)))
1758        ) {
1759            continue;
1760        }
1761
1762        let role_membership = role_memberships
1763            .entry(role_id)
1764            .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1765        let object_privileges = catalog
1766            .get_privileges(&object_id)
1767            .expect("only object types with privileges will generate required privileges");
1768        let role_privileges = role_membership
1769            .iter()
1770            .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1771            .map(|mz_acl_item| mz_acl_item.acl_mode)
1772            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1773        if !role_privileges.contains(acl_mode) {
1774            let role_name = catalog.get_role(&role_id).name().to_string();
1775            let privileges = acl_mode.to_error_string();
1776            return Err(UnauthorizedError::Privilege {
1777                object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1778                role_name,
1779                privileges,
1780            });
1781        }
1782    }
1783
1784    Ok(())
1785}
1786
1787pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1788    const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1789        .union(AclMode::SELECT)
1790        .union(AclMode::UPDATE)
1791        .union(AclMode::DELETE);
1792    const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1793    const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1794        .union(AclMode::CREATE_DB)
1795        .union(AclMode::CREATE_CLUSTER)
1796        .union(AclMode::CREATE_NETWORK_POLICY);
1797
1798    const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1799    match object_type {
1800        SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1801        SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1802        SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1803        SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1804        SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1805        SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1806        SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1807        SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1808        SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1809        SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1810        SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1811        SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1812        SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1813        SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1814        SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1815        SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1816        SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT,
1817        SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1818    }
1819}
1820
1821pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1822    MzAclItem {
1823        grantee: owner_id,
1824        grantor: owner_id,
1825        acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1826    }
1827}
1828
1829const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1830    match object_type {
1831        ObjectType::Table
1832        | ObjectType::View
1833        | ObjectType::MaterializedView
1834        | ObjectType::Source
1835        | ObjectType::ContinualTask => AclMode::SELECT,
1836        ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1837        ObjectType::Sink
1838        | ObjectType::Index
1839        | ObjectType::Role
1840        | ObjectType::Cluster
1841        | ObjectType::ClusterReplica
1842        | ObjectType::Secret
1843        | ObjectType::Connection
1844        | ObjectType::Database
1845        | ObjectType::Func
1846        | ObjectType::NetworkPolicy => AclMode::empty(),
1847    }
1848}
1849
1850pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1851    let acl_mode = default_builtin_object_acl_mode(object_type);
1852    MzAclItem {
1853        grantee: MZ_SUPPORT_ROLE_ID,
1854        grantor: MZ_SYSTEM_ROLE_ID,
1855        acl_mode,
1856    }
1857}
1858
1859pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1860    let acl_mode = default_builtin_object_acl_mode(object_type);
1861    MzAclItem {
1862        grantee: RoleId::Public,
1863        grantor: MZ_SYSTEM_ROLE_ID,
1864        acl_mode,
1865    }
1866}