Skip to main content

mz_sql/
rbac.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26    CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29    CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30    SchemaSpecifier, SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34    DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40/// Common checks that need to be performed before we can start checking a role's privileges.
41fn rbac_check_preamble(
42    catalog: &impl SessionCatalog,
43    session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45    // PostgreSQL allows users that have their role dropped to perform some actions,
46    // such as `SET ROLE` and certain `SELECT` queries. We haven't implemented
47    // `SET ROLE` and feel it's safer to force to user to re-authenticate if their
48    // role is dropped.
49    if catalog
50        .try_get_role(&session_meta.role_metadata().current_role)
51        .is_none()
52    {
53        return Err(UnauthorizedError::ConcurrentRoleDrop(
54            session_meta.role_metadata().current_role.clone(),
55        ));
56    };
57    if catalog
58        .try_get_role(&session_meta.role_metadata().session_role)
59        .is_none()
60    {
61        return Err(UnauthorizedError::ConcurrentRoleDrop(
62            session_meta.role_metadata().session_role.clone(),
63        ));
64    };
65    if catalog
66        .try_get_role(&session_meta.role_metadata().authenticated_role)
67        .is_none()
68    {
69        return Err(UnauthorizedError::ConcurrentRoleDrop(
70            session_meta.role_metadata().authenticated_role.clone(),
71        ));
72    };
73
74    Ok(())
75}
76
77/// Filters `RbacRequirements` based on the session role metadata and RBAC related feature flags.
78fn filter_requirements(
79    catalog: &impl SessionCatalog,
80    session_meta: &dyn SessionMetadata,
81    rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83    // Skip RBAC non-mandatory checks if RBAC is disabled. However, we never skip RBAC checks for
84    // system roles. This allows us to limit access of system users even when RBAC is off.
85    let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86        && !session_meta.role_metadata().current_role.is_system()
87        && !session_meta.role_metadata().session_role.is_system();
88    // Skip RBAC checks on user items if the session is a superuser.
89    let is_superuser = session_meta.is_superuser();
90    if is_rbac_disabled || is_superuser {
91        return rbac_requirements.filter_to_mandatory_requirements();
92    }
93
94    rbac_requirements
95}
96
97// The default item types that most statements require USAGE privileges for.
98static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99    btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101// CREATE statements require USAGE privileges on the default item types and USAGE privileges on
102// Types.
103pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104    let mut items = DEFAULT_ITEM_USAGE.clone();
105    items.insert(CatalogItemType::Type);
106    items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110/// Errors that can occur due to an unauthorized action.
111#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113    /// The action can only be performed by a superuser.
114    #[error("permission denied to {action}")]
115    Superuser { action: String },
116    /// The action requires ownership of an object.
117    #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118    Ownership { objects: Vec<(ObjectType, String)> },
119    /// Altering an owner requires membership of the new owner role.
120    #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121    RoleMembership { role_names: Vec<String> },
122    /// The action requires one or more privileges.
123    #[error("permission denied for {object_description}")]
124    Privilege {
125        object_description: ErrorMessageObjectDescription,
126        role_name: String,
127        privileges: String,
128    },
129    // TODO(jkosh44) When we implement parameter privileges, this can be replaced with a regular
130    //  privilege error.
131    /// The action can only be performed by the mz_system role.
132    #[error("permission denied to {action}")]
133    MzSystem { action: String },
134    /// The action cannot be performed by the mz_support role.
135    #[error("permission denied to {action}")]
136    MzSupport { action: String },
137    /// The active role was dropped while a user was logged in.
138    #[error("role {0} was concurrently dropped")]
139    ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143    pub fn detail(&self) -> Option<String> {
144        match &self {
145            UnauthorizedError::Superuser { action } => {
146                Some(format!("You must be a superuser to {}", action))
147            }
148            UnauthorizedError::Privilege {
149                object_description,
150                role_name,
151                privileges,
152            } => Some(format!(
153                "The '{role_name}' role needs {privileges} privileges on {object_description}"
154            )),
155            UnauthorizedError::MzSystem { .. } => {
156                Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157            }
158            UnauthorizedError::MzSupport { .. } => Some(format!(
159                "The '{}' role has very limited privileges",
160                SUPPORT_USER.name
161            )),
162            UnauthorizedError::ConcurrentRoleDrop(_) => {
163                Some("Please disconnect and re-connect with a valid role.".into())
164            }
165            UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166        }
167    }
168}
169
170/// RBAC requirements for executing a given plan.
171#[derive(Debug)]
172struct RbacRequirements {
173    /// The role memberships required.
174    role_membership: BTreeSet<RoleId>,
175    /// The object ownerships required.
176    ownership: Vec<ObjectId>,
177    /// The privileges required. The tuples are of the form:
178    /// (What object the privilege is on, What privilege is required, Who must possess the privilege).
179    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180    /// The types of catalog items that this plan requires USAGE privileges on.
181    ///
182    /// Most plans will require USAGE on secrets and connections but some plans, like SHOW CREATE,
183    /// can reference an item without requiring any privileges on that item.
184    item_usage: &'static BTreeSet<CatalogItemType>,
185    /// Some action if superuser is required to perform that action, None otherwise.
186    superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190    fn empty() -> RbacRequirements {
191        RbacRequirements {
192            role_membership: BTreeSet::new(),
193            ownership: Vec::new(),
194            privileges: Vec::new(),
195            item_usage: &EMPTY_ITEM_USAGE,
196            superuser_action: None,
197        }
198    }
199
200    fn validate(
201        self,
202        catalog: &impl SessionCatalog,
203        session: &dyn SessionMetadata,
204        resolved_ids: &ResolvedIds,
205    ) -> Result<(), UnauthorizedError> {
206        // Obtain all roles that the current session is a member of.
207        let role_membership =
208            catalog.collect_role_membership(&session.role_metadata().current_role);
209
210        check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212        // Validate that the current session has the required role membership to execute the provided
213        // plan.
214        let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215        if !unheld_membership.is_empty() {
216            let role_names = unheld_membership
217                .into_iter()
218                .map(|role_id| {
219                    // Some role references may no longer exist due to concurrent drops.
220                    catalog
221                        .try_get_role(role_id)
222                        .map(|role| role.name().to_string())
223                        .unwrap_or_else(|| role_id.to_string())
224                })
225                .collect();
226            return Err(UnauthorizedError::RoleMembership { role_names });
227        }
228
229        // Validate that the current session has the required object ownership to execute the provided
230        // plan.
231        let unheld_ownership = self
232            .ownership
233            .into_iter()
234            .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235            .collect();
236        ownership_err(unheld_ownership, catalog)?;
237
238        check_object_privileges(
239            catalog,
240            self.privileges,
241            role_membership,
242            session.role_metadata().current_role,
243        )?;
244
245        if let Some(action) = self.superuser_action {
246            return Err(UnauthorizedError::Superuser { action });
247        }
248
249        Ok(())
250    }
251
252    fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253        let RbacRequirements {
254            role_membership,
255            ownership,
256            privileges,
257            item_usage,
258            superuser_action: _,
259        } = self;
260        let role_membership = role_membership
261            .into_iter()
262            .filter(|id| id.is_system())
263            .collect();
264        let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265        let privileges = privileges
266            .into_iter()
267            .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268            // We allow reading objects for superusers and when RBAC is off.
269            .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270            .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271            .collect();
272        let superuser_action = None;
273        RbacRequirements {
274            role_membership,
275            ownership,
276            privileges,
277            item_usage,
278            superuser_action,
279        }
280    }
281}
282
283impl Default for RbacRequirements {
284    fn default() -> Self {
285        RbacRequirements {
286            role_membership: BTreeSet::new(),
287            ownership: Vec::new(),
288            privileges: Vec::new(),
289            item_usage: &DEFAULT_ITEM_USAGE,
290            superuser_action: None,
291        }
292    }
293}
294
295/// Checks if a `session` is authorized to use `resolved_ids`. If not, an error is returned.
296pub fn check_usage(
297    catalog: &impl SessionCatalog,
298    session: &dyn SessionMetadata,
299    resolved_ids: &ResolvedIds,
300    item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302    rbac_check_preamble(catalog, session)?;
303
304    // Obtain all roles that the current session is a member of.
305    let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307    // Certain statements depend on objects that haven't been created yet, like sub-sources, so we
308    // need to filter those out.
309    let existing_resolved_ids =
310        resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312    let required_privileges = generate_usage_privileges(
313        catalog,
314        &existing_resolved_ids,
315        session.role_metadata().current_role,
316        item_types,
317    )
318    .into_iter()
319    .collect();
320
321    let mut rbac_requirements = RbacRequirements::empty();
322    rbac_requirements.privileges = required_privileges;
323    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324    let required_privileges = rbac_requirements.privileges;
325
326    check_object_privileges(
327        catalog,
328        required_privileges,
329        role_membership,
330        session.role_metadata().current_role,
331    )?;
332
333    Ok(())
334}
335
336/// Checks if a session is authorized to execute a plan. If not, an error is returned.
337pub fn check_plan(
338    catalog: &impl SessionCatalog,
339    // Function mapping a connection ID to an authenticated role. The roles may have been dropped concurrently.
340    // Only required for Plan::SideEffectingFunc; can be None for other plan types.
341    // TODO(peek-seq): Remove this when deleting the old peek sequencing. The logic here that uses
342    // `active_conns` is mirrored in `execute_side_effecting_func`, which is what the frontend peek
343    // sequencing uses.
344    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
345    session: &dyn SessionMetadata,
346    plan: &Plan,
347    target_cluster_id: Option<ClusterId>,
348    resolved_ids: &ResolvedIds,
349) -> Result<(), UnauthorizedError> {
350    rbac_check_preamble(catalog, session)?;
351
352    let rbac_requirements = generate_rbac_requirements(
353        catalog,
354        plan,
355        active_conns,
356        target_cluster_id,
357        session.role_metadata().current_role,
358    );
359    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
360    debug!(
361        "rbac requirements {rbac_requirements:?} for plan {:?}",
362        PlanKind::from(plan)
363    );
364    rbac_requirements.validate(catalog, session, resolved_ids)
365}
366
367/// Returns true if RBAC is turned on for a session, false otherwise.
368pub fn is_rbac_enabled_for_session(
369    system_vars: &SystemVars,
370    session: &dyn SessionMetadata,
371) -> bool {
372    let server_enabled = system_vars.enable_rbac_checks();
373    let session_enabled = session.enable_session_rbac_checks();
374
375    // The session flag allows users to turn RBAC on for just their session while the server flag
376    // allows users to turn RBAC on for everyone.
377    server_enabled || session_enabled
378}
379
380/// Generates all requirements needed to execute a given plan.
381fn generate_rbac_requirements(
382    catalog: &impl SessionCatalog,
383    plan: &Plan,
384    active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
385    target_cluster_id: Option<ClusterId>,
386    role_id: RoleId,
387) -> RbacRequirements {
388    match plan {
389        Plan::CreateConnection(plan::CreateConnectionPlan {
390            name,
391            if_not_exists: _,
392            connection: _,
393            validate: _,
394        }) => RbacRequirements {
395            privileges: vec![(
396                SystemObjectId::Object(name.qualifiers.clone().into()),
397                AclMode::CREATE,
398                role_id,
399            )],
400            item_usage: &CREATE_ITEM_USAGE,
401            ..Default::default()
402        },
403        Plan::CreateDatabase(plan::CreateDatabasePlan {
404            name: _,
405            if_not_exists: _,
406        }) => RbacRequirements {
407            privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
408            item_usage: &CREATE_ITEM_USAGE,
409            ..Default::default()
410        },
411        Plan::CreateSchema(plan::CreateSchemaPlan {
412            database_spec,
413            schema_name: _,
414            if_not_exists: _,
415        }) => {
416            let privileges = match database_spec {
417                ResolvedDatabaseSpecifier::Ambient => Vec::new(),
418                ResolvedDatabaseSpecifier::Id(database_id) => {
419                    vec![(
420                        SystemObjectId::Object(database_id.into()),
421                        AclMode::CREATE,
422                        role_id,
423                    )]
424                }
425            };
426            RbacRequirements {
427                privileges,
428                item_usage: &CREATE_ITEM_USAGE,
429                ..Default::default()
430            }
431        }
432        Plan::CreateRole(plan::CreateRolePlan {
433            name: _,
434            attributes,
435        }) => {
436            if attributes.superuser.unwrap_or(false) {
437                RbacRequirements {
438                    superuser_action: Some("create superuser role".to_string()),
439                    ..Default::default()
440                }
441            } else {
442                RbacRequirements {
443                    privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
444                    item_usage: &CREATE_ITEM_USAGE,
445                    ..Default::default()
446                }
447            }
448        }
449        Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
450            privileges: vec![(
451                SystemObjectId::System,
452                AclMode::CREATE_NETWORK_POLICY,
453                role_id,
454            )],
455            item_usage: &CREATE_ITEM_USAGE,
456            ..Default::default()
457        },
458        Plan::CreateCluster(plan::CreateClusterPlan {
459            name: _,
460            variant: _,
461            workload_class: _,
462        }) => RbacRequirements {
463            privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
464            item_usage: &CREATE_ITEM_USAGE,
465            ..Default::default()
466        },
467        Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
468            cluster_id,
469            name: _,
470            config: _,
471        }) => RbacRequirements {
472            ownership: vec![ObjectId::Cluster(*cluster_id)],
473            item_usage: &CREATE_ITEM_USAGE,
474            ..Default::default()
475        },
476        Plan::CreateSource(plan::CreateSourcePlan {
477            name,
478            source,
479            if_not_exists: _,
480            timeline: _,
481            in_cluster,
482        }) => RbacRequirements {
483            privileges: generate_required_source_privileges(
484                name,
485                &source.data_source,
486                *in_cluster,
487                role_id,
488            ),
489            item_usage: &CREATE_ITEM_USAGE,
490            ..Default::default()
491        },
492        Plan::CreateSources(plans) => RbacRequirements {
493            privileges: plans
494                .iter()
495                .flat_map(
496                    |plan::CreateSourcePlanBundle {
497                         item_id: _,
498                         global_id: _,
499                         plan:
500                             plan::CreateSourcePlan {
501                                 name,
502                                 source,
503                                 if_not_exists: _,
504                                 timeline: _,
505                                 in_cluster,
506                             },
507                         resolved_ids: _,
508                         available_source_references: _,
509                     }| {
510                        generate_required_source_privileges(
511                            name,
512                            &source.data_source,
513                            *in_cluster,
514                            role_id,
515                        )
516                        .into_iter()
517                    },
518                )
519                .collect(),
520            item_usage: &CREATE_ITEM_USAGE,
521            ..Default::default()
522        },
523        Plan::CreateSecret(plan::CreateSecretPlan {
524            name,
525            secret: _,
526            if_not_exists: _,
527        }) => RbacRequirements {
528            privileges: vec![(
529                SystemObjectId::Object(name.qualifiers.clone().into()),
530                AclMode::CREATE,
531                role_id,
532            )],
533            item_usage: &CREATE_ITEM_USAGE,
534            ..Default::default()
535        },
536        Plan::CreateSink(plan::CreateSinkPlan {
537            name,
538            sink,
539            with_snapshot: _,
540            if_not_exists: _,
541            in_cluster,
542        }) => {
543            let mut privileges = vec![(
544                SystemObjectId::Object(name.qualifiers.clone().into()),
545                AclMode::CREATE,
546                role_id,
547            )];
548            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
549            privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
550            privileges.push((
551                SystemObjectId::Object(in_cluster.into()),
552                AclMode::CREATE,
553                role_id,
554            ));
555            RbacRequirements {
556                privileges,
557                item_usage: &CREATE_ITEM_USAGE,
558                ..Default::default()
559            }
560        }
561        Plan::CreateTable(plan::CreateTablePlan {
562            name,
563            table: _,
564            if_not_exists: _,
565        }) => RbacRequirements {
566            privileges: vec![(
567                SystemObjectId::Object(name.qualifiers.clone().into()),
568                AclMode::CREATE,
569                role_id,
570            )],
571            item_usage: &CREATE_ITEM_USAGE,
572            ..Default::default()
573        },
574        Plan::CreateView(plan::CreateViewPlan {
575            name,
576            view: _,
577            replace,
578            drop_ids: _,
579            if_not_exists: _,
580            ambiguous_columns: _,
581        }) => RbacRequirements {
582            ownership: replace
583                .map(|id| vec![ObjectId::Item(id)])
584                .unwrap_or_default(),
585            privileges: vec![(
586                SystemObjectId::Object(name.qualifiers.clone().into()),
587                AclMode::CREATE,
588                role_id,
589            )],
590            item_usage: &CREATE_ITEM_USAGE,
591            ..Default::default()
592        },
593        Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
594            name,
595            materialized_view,
596            replace,
597            drop_ids: _,
598            if_not_exists: _,
599            ambiguous_columns: _,
600        }) => RbacRequirements {
601            ownership: replace
602                .map(|id| vec![ObjectId::Item(id)])
603                .unwrap_or_default(),
604            privileges: vec![
605                (
606                    SystemObjectId::Object(name.qualifiers.clone().into()),
607                    AclMode::CREATE,
608                    role_id,
609                ),
610                (
611                    SystemObjectId::Object(materialized_view.cluster_id.into()),
612                    AclMode::CREATE,
613                    role_id,
614                ),
615            ],
616            item_usage: &CREATE_ITEM_USAGE,
617            ..Default::default()
618        },
619        Plan::CreateIndex(plan::CreateIndexPlan {
620            name,
621            index,
622            if_not_exists: _,
623        }) => {
624            let index_on_item = catalog.resolve_item_id(&index.on);
625            RbacRequirements {
626                ownership: vec![ObjectId::Item(index_on_item)],
627                privileges: vec![
628                    (
629                        SystemObjectId::Object(name.qualifiers.clone().into()),
630                        AclMode::CREATE,
631                        role_id,
632                    ),
633                    (
634                        SystemObjectId::Object(index.cluster_id.into()),
635                        AclMode::CREATE,
636                        role_id,
637                    ),
638                ],
639                item_usage: &CREATE_ITEM_USAGE,
640                ..Default::default()
641            }
642        }
643        Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
644            privileges: vec![(
645                SystemObjectId::Object(name.qualifiers.clone().into()),
646                AclMode::CREATE,
647                role_id,
648            )],
649            item_usage: &CREATE_ITEM_USAGE,
650            ..Default::default()
651        },
652        Plan::Comment(plan::CommentPlan {
653            object_id,
654            sub_component: _,
655            comment: _,
656        }) => {
657            let (ownership, privileges) = match object_id {
658                // Roles don't have owners, instead we require the current session to have the
659                // `CREATEROLE` privilege.
660                CommentObjectId::Role(_) => (
661                    Vec::new(),
662                    vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
663                ),
664                _ => (vec![ObjectId::from(*object_id)], Vec::new()),
665            };
666            RbacRequirements {
667                ownership,
668                privileges,
669                ..Default::default()
670            }
671        }
672        Plan::DropObjects(plan::DropObjectsPlan {
673            referenced_ids,
674            drop_ids: _,
675            object_type,
676        }) => {
677            let privileges = if object_type == &ObjectType::Role {
678                vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
679            } else {
680                referenced_ids
681                    .iter()
682                    .filter_map(|id| match id {
683                        ObjectId::ClusterReplica((cluster_id, _)) => Some((
684                            SystemObjectId::Object(cluster_id.into()),
685                            AclMode::USAGE,
686                            role_id,
687                        )),
688                        ObjectId::Schema((database_spec, _)) => match database_spec {
689                            ResolvedDatabaseSpecifier::Ambient => None,
690                            ResolvedDatabaseSpecifier::Id(database_id) => Some((
691                                SystemObjectId::Object(database_id.into()),
692                                AclMode::USAGE,
693                                role_id,
694                            )),
695                        },
696                        ObjectId::Item(item_id) => {
697                            let item = catalog.get_item(item_id);
698                            Some((
699                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
700                                AclMode::USAGE,
701                                role_id,
702                            ))
703                        }
704                        ObjectId::Cluster(_)
705                        | ObjectId::Database(_)
706                        | ObjectId::Role(_)
707                        | ObjectId::NetworkPolicy(_) => None,
708                    })
709                    .collect()
710            };
711            RbacRequirements {
712                // Do not need ownership of descendant objects.
713                ownership: referenced_ids.clone(),
714                privileges,
715                ..Default::default()
716            }
717        }
718        Plan::DropOwned(plan::DropOwnedPlan {
719            role_ids,
720            drop_ids: _,
721            privilege_revokes: _,
722            default_privilege_revokes: _,
723        }) => RbacRequirements {
724            role_membership: role_ids.into_iter().cloned().collect(),
725            ..Default::default()
726        },
727        Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
728            let container_id = match id {
729                ObjectId::Item(id) => Some(SystemObjectId::Object(
730                    catalog.get_item(id).name().qualifiers.clone().into(),
731                )),
732                ObjectId::Schema((database_id, _schema_id)) => match database_id {
733                    ResolvedDatabaseSpecifier::Ambient => None,
734                    ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
735                },
736                ObjectId::Cluster(_)
737                | ObjectId::ClusterReplica(_)
738                | ObjectId::Database(_)
739                | ObjectId::Role(_)
740                | ObjectId::NetworkPolicy(_) => None,
741            };
742            let privileges = match container_id {
743                Some(id) => vec![(id, AclMode::USAGE, role_id)],
744                None => Vec::new(),
745            };
746            RbacRequirements {
747                privileges,
748                item_usage: &EMPTY_ITEM_USAGE,
749                ..Default::default()
750            }
751        }
752        Plan::ShowColumns(plan::ShowColumnsPlan {
753            id,
754            select_plan,
755            new_resolved_ids: _,
756        }) => {
757            let mut privileges = vec![(
758                SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
759                AclMode::USAGE,
760                role_id,
761            )];
762
763            for privilege in generate_rbac_requirements(
764                catalog,
765                &Plan::Select(select_plan.clone()),
766                active_conns,
767                target_cluster_id,
768                role_id,
769            )
770            .privileges
771            {
772                privileges.push(privilege);
773            }
774            RbacRequirements {
775                privileges,
776                ..Default::default()
777            }
778        }
779        Plan::Select(plan::SelectPlan {
780            source,
781            select: _,
782            when: _,
783            finishing: _,
784            copy_to: _,
785        }) => {
786            let items = source
787                .depends_on()
788                .into_iter()
789                .map(|gid| catalog.resolve_item_id(&gid));
790            let mut privileges = generate_read_privileges(catalog, items, role_id);
791            if let Some(privilege) = generate_cluster_usage_privileges(
792                source.as_const().is_some(),
793                target_cluster_id,
794                role_id,
795            ) {
796                privileges.push(privilege);
797            }
798            RbacRequirements {
799                privileges,
800                ..Default::default()
801            }
802        }
803        Plan::Subscribe(plan::SubscribePlan {
804            from,
805            with_snapshot: _,
806            when: _,
807            up_to: _,
808            copy_to: _,
809            emit_progress: _,
810            output: _,
811        }) => {
812            let items = from
813                .depends_on()
814                .into_iter()
815                .map(|gid| catalog.resolve_item_id(&gid));
816            let mut privileges = generate_read_privileges(catalog, items, role_id);
817            if let Some(cluster_id) = target_cluster_id {
818                privileges.push((
819                    SystemObjectId::Object(cluster_id.into()),
820                    AclMode::USAGE,
821                    role_id,
822                ));
823            }
824            RbacRequirements {
825                privileges,
826                ..Default::default()
827            }
828        }
829        Plan::CopyFrom(plan::CopyFromPlan {
830            target_name: _,
831            target_id,
832            source: _,
833            columns: _,
834            source_desc: _,
835            mfp: _,
836            params: _,
837            filter: _,
838        }) => RbacRequirements {
839            privileges: vec![
840                (
841                    SystemObjectId::Object(
842                        catalog.get_item(target_id).name().qualifiers.clone().into(),
843                    ),
844                    AclMode::USAGE,
845                    role_id,
846                ),
847                (
848                    SystemObjectId::Object(target_id.into()),
849                    AclMode::INSERT,
850                    role_id,
851                ),
852            ],
853            ..Default::default()
854        },
855        Plan::CopyTo(plan::CopyToPlan {
856            select_plan,
857            desc: _,
858            to: _,
859            connection: _,
860            connection_id: _,
861            format: _,
862            max_file_size: _,
863        }) => {
864            let items = select_plan
865                .source
866                .depends_on()
867                .into_iter()
868                .map(|gid| catalog.resolve_item_id(&gid));
869            let mut privileges = generate_read_privileges(catalog, items, role_id);
870            if let Some(cluster_id) = target_cluster_id {
871                privileges.push((
872                    SystemObjectId::Object(cluster_id.into()),
873                    AclMode::USAGE,
874                    role_id,
875                ));
876            }
877            RbacRequirements {
878                privileges,
879                ..Default::default()
880            }
881        }
882        Plan::ExplainPlan(plan::ExplainPlanPlan {
883            stage: _,
884            format: _,
885            config: _,
886            explainee,
887        })
888        | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
889            privileges: match explainee {
890                Explainee::View(id)
891                | Explainee::MaterializedView(id)
892                | Explainee::Index(id)
893                | Explainee::ReplanView(id)
894                | Explainee::ReplanMaterializedView(id)
895                | Explainee::ReplanIndex(id) => {
896                    let item = catalog.get_item(id);
897                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
898                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
899                }
900                Explainee::Statement(stmt) => stmt
901                    .depends_on()
902                    .into_iter()
903                    .map(|id| {
904                        let item = catalog.get_item_by_global_id(&id);
905                        let schema_id: ObjectId = item.name().qualifiers.clone().into();
906                        (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
907                    })
908                    .collect(),
909            },
910            item_usage: match explainee {
911                Explainee::View(..)
912                | Explainee::MaterializedView(..)
913                | Explainee::Index(..)
914                | Explainee::ReplanView(..)
915                | Explainee::ReplanMaterializedView(..)
916                | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
917                Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
918            },
919            ..Default::default()
920        },
921        Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
922            RbacRequirements {
923                privileges: {
924                    let item = catalog.get_item_by_global_id(sink_from);
925                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
926                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
927                },
928                item_usage: &EMPTY_ITEM_USAGE,
929                ..Default::default()
930            }
931        }
932        Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
933            format: _,
934            raw_plan,
935            when: _,
936        }) => RbacRequirements {
937            privileges: raw_plan
938                .depends_on()
939                .into_iter()
940                .map(|id| {
941                    let item = catalog.get_item_by_global_id(&id);
942                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
943                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
944                })
945                .collect(),
946            ..Default::default()
947        },
948        Plan::Insert(plan::InsertPlan {
949            id,
950            values,
951            returning,
952        }) => {
953            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
954            let mut privileges = vec![
955                (
956                    SystemObjectId::Object(schema_id.clone()),
957                    AclMode::USAGE,
958                    role_id,
959                ),
960                (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
961            ];
962            let mut seen = BTreeSet::from([(schema_id, role_id)]);
963
964            // We don't allow arbitrary sub-queries in `returning`. So either it
965            // contains a column reference to the outer table or it's constant.
966            if returning
967                .iter()
968                .any(|assignment| assignment.contains_column())
969            {
970                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
971                seen.insert((id.into(), role_id));
972            }
973
974            let items = values
975                .depends_on()
976                .into_iter()
977                .map(|gid| catalog.resolve_item_id(&gid));
978            privileges.extend_from_slice(&generate_read_privileges_inner(
979                catalog, items, role_id, &mut seen,
980            ));
981
982            if let Some(privilege) = generate_cluster_usage_privileges(
983                values.as_const().is_some(),
984                target_cluster_id,
985                role_id,
986            ) {
987                privileges.push(privilege);
988            } else if !returning.is_empty() {
989                // TODO(jkosh44) returning may be a constant, but for now we are overly protective
990                //  and require cluster privileges for all returning.
991                if let Some(cluster_id) = target_cluster_id {
992                    privileges.push((
993                        SystemObjectId::Object(cluster_id.into()),
994                        AclMode::USAGE,
995                        role_id,
996                    ));
997                }
998            }
999            RbacRequirements {
1000                privileges,
1001                ..Default::default()
1002            }
1003        }
1004        Plan::AlterCluster(plan::AlterClusterPlan {
1005            id,
1006            name: _,
1007            options: _,
1008            strategy: _,
1009        }) => RbacRequirements {
1010            ownership: vec![ObjectId::Cluster(*id)],
1011            item_usage: &CREATE_ITEM_USAGE,
1012            ..Default::default()
1013        },
1014        Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1015            ownership: vec![ObjectId::Item(*id)],
1016            privileges: vec![(
1017                SystemObjectId::Object(set_cluster.into()),
1018                AclMode::CREATE,
1019                role_id,
1020            )],
1021            item_usage: &CREATE_ITEM_USAGE,
1022            ..Default::default()
1023        },
1024        Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1025            id,
1026            window: _,
1027            value: _,
1028            object_type: _,
1029        }) => RbacRequirements {
1030            ownership: vec![ObjectId::Item(*id)],
1031            item_usage: &CREATE_ITEM_USAGE,
1032            ..Default::default()
1033        },
1034        Plan::AlterSourceTimestampInterval(plan::AlterSourceTimestampIntervalPlan {
1035            id,
1036            value: _,
1037            interval: _,
1038        }) => RbacRequirements {
1039            ownership: vec![ObjectId::Item(*id)],
1040            item_usage: &CREATE_ITEM_USAGE,
1041            ..Default::default()
1042        },
1043        Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1044            ownership: vec![ObjectId::Item(*id)],
1045            ..Default::default()
1046        },
1047        Plan::AlterSource(plan::AlterSourcePlan {
1048            item_id,
1049            ingestion_id: _,
1050            action: _,
1051        }) => RbacRequirements {
1052            ownership: vec![ObjectId::Item(*item_id)],
1053            item_usage: &CREATE_ITEM_USAGE,
1054            ..Default::default()
1055        },
1056        Plan::AlterSink(plan::AlterSinkPlan {
1057            item_id,
1058            global_id: _,
1059            sink,
1060            with_snapshot: _,
1061            in_cluster,
1062        }) => {
1063            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1064            let mut privileges = generate_read_privileges(catalog, items, role_id);
1065            privileges.push((
1066                SystemObjectId::Object(in_cluster.into()),
1067                AclMode::CREATE,
1068                role_id,
1069            ));
1070            RbacRequirements {
1071                ownership: vec![ObjectId::Item(*item_id)],
1072                privileges,
1073                item_usage: &CREATE_ITEM_USAGE,
1074                ..Default::default()
1075            }
1076        }
1077        Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1078            id,
1079            name: _,
1080            to_name: _,
1081        }) => RbacRequirements {
1082            ownership: vec![ObjectId::Cluster(*id)],
1083            ..Default::default()
1084        },
1085        Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1086            id_a,
1087            id_b,
1088            name_a: _,
1089            name_b: _,
1090            name_temp: _,
1091        }) => RbacRequirements {
1092            ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1093            ..Default::default()
1094        },
1095        Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1096            cluster_id,
1097            replica_id,
1098            name: _,
1099            to_name: _,
1100        }) => RbacRequirements {
1101            ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1102            ..Default::default()
1103        },
1104        Plan::AlterItemRename(plan::AlterItemRenamePlan {
1105            id,
1106            current_full_name: _,
1107            to_name: _,
1108            object_type: _,
1109        }) => RbacRequirements {
1110            ownership: vec![ObjectId::Item(*id)],
1111            ..Default::default()
1112        },
1113        Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1114            cur_schema_spec,
1115            new_schema_name: _,
1116        }) => {
1117            let privileges = match cur_schema_spec.0 {
1118                ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1119                    SystemObjectId::Object(ObjectId::Database(db_id)),
1120                    AclMode::CREATE,
1121                    role_id,
1122                )],
1123                ResolvedDatabaseSpecifier::Ambient => vec![],
1124            };
1125
1126            RbacRequirements {
1127                ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1128                privileges,
1129                ..Default::default()
1130            }
1131        }
1132        Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1133            schema_a_spec,
1134            schema_a_name: _,
1135            schema_b_spec,
1136            schema_b_name: _,
1137            name_temp: _,
1138        }) => {
1139            let mut privileges = vec![];
1140            if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1141                privileges.push((
1142                    SystemObjectId::Object(ObjectId::Database(id_a)),
1143                    AclMode::CREATE,
1144                    role_id,
1145                ));
1146            }
1147            if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1148                privileges.push((
1149                    SystemObjectId::Object(ObjectId::Database(id_b)),
1150                    AclMode::CREATE,
1151                    role_id,
1152                ));
1153            }
1154
1155            RbacRequirements {
1156                ownership: vec![
1157                    ObjectId::Schema(*schema_a_spec),
1158                    ObjectId::Schema(*schema_b_spec),
1159                ],
1160                privileges,
1161                ..Default::default()
1162            }
1163        }
1164        Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1165            ownership: vec![ObjectId::Item(*id)],
1166            item_usage: &CREATE_ITEM_USAGE,
1167            ..Default::default()
1168        },
1169        Plan::AlterRole(plan::AlterRolePlan {
1170            id,
1171            name: _,
1172            option,
1173        }) => match option {
1174            // Only superusers can alter the superuserness of a role.
1175            plan::PlannedAlterRoleOption::Attributes(attributes)
1176                if attributes.superuser.is_some() =>
1177            {
1178                RbacRequirements {
1179                    superuser_action: Some("alter superuser role".to_string()),
1180                    ..Default::default()
1181                }
1182            }
1183            // Roles are allowed to change their own password, but only if
1184            // password is the sole attribute being changed.
1185            plan::PlannedAlterRoleOption::Attributes(plan::PlannedRoleAttributes {
1186                password,
1187                // scram_iterations and nopassword are password-related, so
1188                // they're fine to change alongside the password.
1189                scram_iterations: _,
1190                nopassword: _,
1191                // superuser is already handled by the match arm above, so it
1192                // will always be None here.
1193                superuser: None,
1194                inherit: None,
1195                login: None,
1196            }) if password.is_some() && role_id == *id => RbacRequirements::default(),
1197            // But no one elses...
1198            plan::PlannedAlterRoleOption::Attributes(attributes)
1199                if attributes.password.is_some() && role_id != *id =>
1200            {
1201                RbacRequirements {
1202                    superuser_action: Some("alter password of role".to_string()),
1203                    ..Default::default()
1204                }
1205            }
1206            // Roles are allowed to change their own variables.
1207            plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1208                RbacRequirements::default()
1209            }
1210            // Otherwise to ALTER a role, you need to have the CREATE_ROLE privilege.
1211            _ => RbacRequirements {
1212                privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1213                item_usage: &CREATE_ITEM_USAGE,
1214                ..Default::default()
1215            },
1216        },
1217        Plan::AlterOwner(plan::AlterOwnerPlan {
1218            id,
1219            object_type: _,
1220            new_owner,
1221        }) => {
1222            let privileges = match id {
1223                ObjectId::ClusterReplica((cluster_id, _)) => {
1224                    vec![(
1225                        SystemObjectId::Object(cluster_id.into()),
1226                        AclMode::CREATE,
1227                        role_id,
1228                    )]
1229                }
1230                ObjectId::Schema((database_spec, _)) => match database_spec {
1231                    ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1232                    ResolvedDatabaseSpecifier::Id(database_id) => {
1233                        vec![(
1234                            SystemObjectId::Object(database_id.into()),
1235                            AclMode::CREATE,
1236                            role_id,
1237                        )]
1238                    }
1239                },
1240                ObjectId::Item(item_id) => {
1241                    let item = catalog.get_item(item_id);
1242                    vec![(
1243                        SystemObjectId::Object(item.name().qualifiers.clone().into()),
1244                        AclMode::CREATE,
1245                        role_id,
1246                    )]
1247                }
1248                ObjectId::Cluster(_)
1249                | ObjectId::Database(_)
1250                | ObjectId::Role(_)
1251                | ObjectId::NetworkPolicy(_) => Vec::new(),
1252            };
1253            RbacRequirements {
1254                role_membership: BTreeSet::from([*new_owner]),
1255                ownership: vec![id.clone()],
1256                privileges,
1257                ..Default::default()
1258            }
1259        }
1260        Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1261            ownership: vec![ObjectId::Item(*relation_id)],
1262            item_usage: &CREATE_ITEM_USAGE,
1263            ..Default::default()
1264        },
1265        Plan::AlterMaterializedViewApplyReplacement(
1266            plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1267        ) => RbacRequirements {
1268            ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1269            item_usage: &CREATE_ITEM_USAGE,
1270            ..Default::default()
1271        },
1272        Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1273            ownership: vec![ObjectId::NetworkPolicy(*id)],
1274            item_usage: &CREATE_ITEM_USAGE,
1275            ..Default::default()
1276        },
1277        Plan::ReadThenWrite(plan::ReadThenWritePlan {
1278            id,
1279            selection,
1280            finishing: _,
1281            assignments,
1282            kind,
1283            returning,
1284        }) => {
1285            let acl_mode = match kind {
1286                MutationKind::Insert => AclMode::INSERT,
1287                MutationKind::Update => AclMode::UPDATE,
1288                MutationKind::Delete => AclMode::DELETE,
1289            };
1290            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1291            let mut privileges = vec![
1292                (
1293                    SystemObjectId::Object(schema_id.clone()),
1294                    AclMode::USAGE,
1295                    role_id,
1296                ),
1297                (SystemObjectId::Object(id.into()), acl_mode, role_id),
1298            ];
1299            let mut seen = BTreeSet::from([(schema_id, role_id)]);
1300
1301            // We don't allow arbitrary sub-queries in `assignments` or `returning`. So either they
1302            // contains a column reference to the outer table or it's constant.
1303            if assignments
1304                .values()
1305                .chain(returning.iter())
1306                .any(|assignment| assignment.contains_column())
1307            {
1308                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1309                seen.insert((id.into(), role_id));
1310            }
1311
1312            // TODO(jkosh44) It's fairly difficult to determine what part of `selection` is from a
1313            //  user specified read and what part is from the implementation of the read then write.
1314            //  instead we are overly protective and always require SELECT privileges even though
1315            //  PostgreSQL doesn't always do this.
1316            //  As a concrete example, we require SELECT and UPDATE privileges to execute
1317            //  `UPDATE t SET a = 42;`, while PostgreSQL only requires UPDATE privileges.
1318            let items = selection
1319                .depends_on()
1320                .into_iter()
1321                .map(|gid| catalog.resolve_item_id(&gid));
1322            privileges.extend_from_slice(&generate_read_privileges_inner(
1323                catalog, items, role_id, &mut seen,
1324            ));
1325
1326            if let Some(privilege) = generate_cluster_usage_privileges(
1327                selection.as_const().is_some(),
1328                target_cluster_id,
1329                role_id,
1330            ) {
1331                privileges.push(privilege);
1332            }
1333            RbacRequirements {
1334                privileges,
1335                ..Default::default()
1336            }
1337        }
1338        Plan::GrantRole(plan::GrantRolePlan {
1339            role_ids: _,
1340            member_ids: _,
1341            grantor_id: _,
1342        })
1343        | Plan::RevokeRole(plan::RevokeRolePlan {
1344            role_ids: _,
1345            member_ids: _,
1346            grantor_id: _,
1347        }) => RbacRequirements {
1348            privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1349            ..Default::default()
1350        },
1351        Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1352            update_privileges,
1353            grantees: _,
1354        })
1355        | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1356            update_privileges,
1357            revokees: _,
1358        }) => {
1359            let mut privileges = Vec::with_capacity(update_privileges.len());
1360            for UpdatePrivilege { target_id, .. } in update_privileges {
1361                match target_id {
1362                    SystemObjectId::Object(object_id) => match object_id {
1363                        ObjectId::ClusterReplica((cluster_id, _)) => {
1364                            privileges.push((
1365                                SystemObjectId::Object(cluster_id.into()),
1366                                AclMode::USAGE,
1367                                role_id,
1368                            ));
1369                        }
1370                        ObjectId::Schema((database_spec, _)) => match database_spec {
1371                            ResolvedDatabaseSpecifier::Ambient => {}
1372                            ResolvedDatabaseSpecifier::Id(database_id) => {
1373                                privileges.push((
1374                                    SystemObjectId::Object(database_id.into()),
1375                                    AclMode::USAGE,
1376                                    role_id,
1377                                ));
1378                            }
1379                        },
1380                        ObjectId::Item(item_id) => {
1381                            let item = catalog.get_item(item_id);
1382                            privileges.push((
1383                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
1384                                AclMode::USAGE,
1385                                role_id,
1386                            ))
1387                        }
1388                        ObjectId::Cluster(_)
1389                        | ObjectId::Database(_)
1390                        | ObjectId::Role(_)
1391                        | ObjectId::NetworkPolicy(_) => {}
1392                    },
1393                    SystemObjectId::System => {}
1394                }
1395            }
1396            RbacRequirements {
1397                ownership: update_privileges
1398                    .iter()
1399                    .filter_map(|update_privilege| update_privilege.target_id.object_id())
1400                    .cloned()
1401                    .collect(),
1402                privileges,
1403                // To grant/revoke a privilege on some object, generally the grantor/revoker must be the
1404                // owner of that object (or have a grant option on that object which isn't implemented in
1405                // Materialize yet). There is no owner of the entire system, so it's only reasonable to
1406                // restrict granting/revoking system privileges to superusers.
1407                superuser_action: if update_privileges
1408                    .iter()
1409                    .any(|update_privilege| update_privilege.target_id.is_system())
1410                {
1411                    Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1412                } else {
1413                    None
1414                },
1415                ..Default::default()
1416            }
1417        }
1418        Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1419            privilege_objects,
1420            privilege_acl_items: _,
1421            is_grant: _,
1422        }) => RbacRequirements {
1423            role_membership: privilege_objects
1424                .iter()
1425                .map(|privilege_object| privilege_object.role_id)
1426                .collect(),
1427            privileges: privilege_objects
1428                .into_iter()
1429                .filter_map(|privilege_object| {
1430                    if let (Some(database_id), Some(_)) =
1431                        (privilege_object.database_id, privilege_object.schema_id)
1432                    {
1433                        Some((
1434                            SystemObjectId::Object(database_id.into()),
1435                            AclMode::USAGE,
1436                            role_id,
1437                        ))
1438                    } else {
1439                        None
1440                    }
1441                })
1442                .collect(),
1443            // Altering the default privileges for the PUBLIC role (aka ALL ROLES) will affect all roles
1444            // that currently exist and roles that will exist in the future. It's impossible for an exising
1445            // role to be a member of a role that doesn't exist yet, so no current role could possibly have
1446            // the privileges required to alter default privileges for the PUBLIC role. Therefore we
1447            // only superusers can alter default privileges for the PUBLIC role.
1448            superuser_action: if privilege_objects
1449                .iter()
1450                .any(|privilege_object| privilege_object.role_id.is_public())
1451            {
1452                Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1453            } else {
1454                None
1455            },
1456            ..Default::default()
1457        },
1458        Plan::ReassignOwned(plan::ReassignOwnedPlan {
1459            old_roles,
1460            new_role,
1461            reassign_ids: _,
1462        }) => RbacRequirements {
1463            role_membership: old_roles
1464                .into_iter()
1465                .cloned()
1466                .chain(iter::once(*new_role))
1467                .collect(),
1468            ..Default::default()
1469        },
1470        Plan::SideEffectingFunc(func) => {
1471            let role_membership = match func {
1472                SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1473                    .expect("active_conns is required for Plan::SideEffectingFunc")(
1474                    *connection_id
1475                )
1476                .map(|x| [x].into())
1477                .unwrap_or_default(),
1478            };
1479            RbacRequirements {
1480                role_membership,
1481                ..Default::default()
1482            }
1483        }
1484        Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1485            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1486            RbacRequirements {
1487                privileges: vec![
1488                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1489                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1490                ],
1491                ..Default::default()
1492            }
1493        }
1494        Plan::DiscardTemp
1495        | Plan::DiscardAll
1496        | Plan::EmptyQuery
1497        | Plan::ShowAllVariables
1498        | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1499        | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1500        | Plan::SetVariable(plan::SetVariablePlan {
1501            name: _,
1502            value: _,
1503            local: _,
1504        })
1505        | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1506        | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1507        | Plan::StartTransaction(plan::StartTransactionPlan {
1508            access: _,
1509            isolation_level: _,
1510        })
1511        | Plan::CommitTransaction(plan::CommitTransactionPlan {
1512            transaction_type: _,
1513        })
1514        | Plan::AbortTransaction(plan::AbortTransactionPlan {
1515            transaction_type: _,
1516        })
1517        | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1518        | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1519        | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1520        | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1521        | Plan::Declare(plan::DeclarePlan {
1522            name: _,
1523            stmt: _,
1524            sql: _,
1525            params: _,
1526        })
1527        | Plan::Fetch(plan::FetchPlan {
1528            name: _,
1529            count: _,
1530            timeout: _,
1531        })
1532        | Plan::Close(plan::ClosePlan { name: _ })
1533        | Plan::Prepare(plan::PreparePlan {
1534            name: _,
1535            stmt: _,
1536            desc: _,
1537            sql: _,
1538        })
1539        | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1540        | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1541        | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1542    }
1543}
1544
1545/// Reports whether any role has ownership over an object.
1546fn check_owner_roles(
1547    object_id: &ObjectId,
1548    role_ids: &BTreeSet<RoleId>,
1549    catalog: &impl SessionCatalog,
1550) -> bool {
1551    if let Some(owner_id) = catalog.get_owner_id(object_id) {
1552        role_ids.contains(&owner_id)
1553    } else {
1554        true
1555    }
1556}
1557
1558fn ownership_err(
1559    unheld_ownership: Vec<ObjectId>,
1560    catalog: &impl SessionCatalog,
1561) -> Result<(), UnauthorizedError> {
1562    if !unheld_ownership.is_empty() {
1563        let objects = unheld_ownership
1564            .into_iter()
1565            .map(|ownership| match ownership {
1566                ObjectId::Cluster(id) => (
1567                    ObjectType::Cluster,
1568                    catalog.get_cluster(id).name().to_string(),
1569                ),
1570                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1571                    let cluster = catalog.get_cluster(cluster_id);
1572                    let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1573                    // Note: using unchecked here is okay because the values are coming from an
1574                    // already existing name.
1575                    let name = QualifiedReplica {
1576                        cluster: Ident::new_unchecked(cluster.name()),
1577                        replica: Ident::new_unchecked(replica.name()),
1578                    };
1579                    (ObjectType::ClusterReplica, name.to_string())
1580                }
1581                ObjectId::Database(id) => (
1582                    ObjectType::Database,
1583                    catalog.get_database(&id).name().to_string(),
1584                ),
1585                ObjectId::Schema((database_spec, schema_spec)) => {
1586                    let schema = catalog.get_schema(&database_spec, &schema_spec);
1587                    let name = catalog.resolve_full_schema_name(schema.name());
1588                    (ObjectType::Schema, name.to_string())
1589                }
1590                ObjectId::Item(id) => {
1591                    let item = catalog.get_item(&id);
1592                    let name = catalog.resolve_full_name(item.name());
1593                    (item.item_type().into(), name.to_string())
1594                }
1595                ObjectId::NetworkPolicy(id) => (
1596                    ObjectType::NetworkPolicy,
1597                    catalog.get_network_policy(&id).name().to_string(),
1598                ),
1599                ObjectId::Role(_) => unreachable!("roles have no owner"),
1600            })
1601            .collect();
1602        Err(UnauthorizedError::Ownership { objects })
1603    } else {
1604        Ok(())
1605    }
1606}
1607
1608fn generate_required_source_privileges(
1609    name: &QualifiedItemName,
1610    data_source: &DataSourceDesc,
1611    in_cluster: Option<ClusterId>,
1612    role_id: RoleId,
1613) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1614    let mut privileges = vec![(
1615        SystemObjectId::Object(name.qualifiers.clone().into()),
1616        AclMode::CREATE,
1617        role_id,
1618    )];
1619    match (data_source, in_cluster) {
1620        (_, Some(id)) => {
1621            privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1622        }
1623        (DataSourceDesc::Ingestion(_), None) => {
1624            privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1625        }
1626        // Non-ingestion data-sources have meaningless cluster config's (for now...) and they need
1627        // to be ignored.
1628        // This feels very brittle, but there's not much we can do until the UNDEFINED cluster
1629        // config is removed.
1630        (_, None) => {}
1631    }
1632    privileges
1633}
1634
1635/// Generates all the privileges required to execute a read that includes the objects in `ids`.
1636///
1637/// Not only do we need to validate that `role_id` has read privileges on all relations in `ids`,
1638/// but if any object is a view or materialized view then we need to validate that the owner of
1639/// that view has all of the privileges required to execute the query within the view.
1640///
1641/// For more details see: <https://www.postgresql.org/docs/15/rules-privileges.html>
1642fn generate_read_privileges(
1643    catalog: &impl SessionCatalog,
1644    ids: impl Iterator<Item = CatalogItemId>,
1645    role_id: RoleId,
1646) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1647    generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1648}
1649
1650fn generate_read_privileges_inner(
1651    catalog: &impl SessionCatalog,
1652    ids: impl Iterator<Item = CatalogItemId>,
1653    role_id: RoleId,
1654    seen: &mut BTreeSet<(ObjectId, RoleId)>,
1655) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1656    let mut privileges = Vec::new();
1657    let mut views = Vec::new();
1658
1659    for id in ids {
1660        if seen.insert((id.into(), role_id)) {
1661            let item = catalog.get_item(&id);
1662            let schema_id: ObjectId = item.name().qualifiers.clone().into();
1663            if seen.insert((schema_id.clone(), role_id)) {
1664                privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1665            }
1666            match item.item_type() {
1667                CatalogItemType::View | CatalogItemType::MaterializedView => {
1668                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1669                    views.push((item.references().items().copied(), item.owner_id()));
1670                }
1671                CatalogItemType::Table | CatalogItemType::Source => {
1672                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1673                }
1674                CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1675                    privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1676                }
1677                CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1678            }
1679        }
1680    }
1681
1682    for (view_ids, view_owner) in views {
1683        privileges.extend_from_slice(&generate_read_privileges_inner(
1684            catalog, view_ids, view_owner, seen,
1685        ));
1686    }
1687
1688    privileges
1689}
1690
1691fn generate_usage_privileges(
1692    catalog: &impl SessionCatalog,
1693    ids: &ResolvedIds,
1694    role_id: RoleId,
1695    item_types: &BTreeSet<CatalogItemType>,
1696) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1697    // Use a `BTreeSet` to remove duplicate privileges.
1698    ids.items()
1699        .filter_map(move |id| {
1700            let item = catalog.get_item(id);
1701            if item_types.contains(&item.item_type()) {
1702                let schema_id = item.name().qualifiers.clone().into();
1703                Some([
1704                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1705                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1706                ])
1707            } else {
1708                None
1709            }
1710        })
1711        .flatten()
1712        .collect()
1713}
1714
1715fn generate_cluster_usage_privileges(
1716    expr_is_const: bool,
1717    target_cluster_id: Option<ClusterId>,
1718    role_id: RoleId,
1719) -> Option<(SystemObjectId, AclMode, RoleId)> {
1720    // TODO(jkosh44) expr hasn't been fully optimized yet, so it might actually be a constant,
1721    //  but we mistakenly think that it's not. For now it's ok to be overly protective.
1722    if !expr_is_const {
1723        if let Some(cluster_id) = target_cluster_id {
1724            return Some((
1725                SystemObjectId::Object(cluster_id.into()),
1726                AclMode::USAGE,
1727                role_id,
1728            ));
1729        }
1730    }
1731
1732    None
1733}
1734
1735fn check_object_privileges(
1736    catalog: &impl SessionCatalog,
1737    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1738    role_membership: BTreeSet<RoleId>,
1739    current_role_id: RoleId,
1740) -> Result<(), UnauthorizedError> {
1741    let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1742    role_memberships.insert(current_role_id, role_membership);
1743    for (object_id, acl_mode, role_id) in privileges {
1744        // Temporary schemas are owned by the connection that created them,
1745        // so users implicitly have all privileges on their own temp schema.
1746        // The schema may not exist yet (lazy creation), so we skip the check.
1747        if matches!(
1748            &object_id,
1749            SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary)))
1750        ) {
1751            continue;
1752        }
1753
1754        let role_membership = role_memberships
1755            .entry(role_id)
1756            .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1757        let object_privileges = catalog
1758            .get_privileges(&object_id)
1759            .expect("only object types with privileges will generate required privileges");
1760        let role_privileges = role_membership
1761            .iter()
1762            .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1763            .map(|mz_acl_item| mz_acl_item.acl_mode)
1764            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1765        if !role_privileges.contains(acl_mode) {
1766            let role_name = catalog.get_role(&role_id).name().to_string();
1767            let privileges = acl_mode.to_error_string();
1768            return Err(UnauthorizedError::Privilege {
1769                object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1770                role_name,
1771                privileges,
1772            });
1773        }
1774    }
1775
1776    Ok(())
1777}
1778
1779pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1780    const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1781        .union(AclMode::SELECT)
1782        .union(AclMode::UPDATE)
1783        .union(AclMode::DELETE);
1784    const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1785    const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1786        .union(AclMode::CREATE_DB)
1787        .union(AclMode::CREATE_CLUSTER)
1788        .union(AclMode::CREATE_NETWORK_POLICY);
1789
1790    const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1791    match object_type {
1792        SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1793        SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1794        SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1795        SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1796        SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1797        SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1798        SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1799        SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1800        SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1801        SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1802        SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1803        SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1804        SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1805        SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1806        SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1807        SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1808        SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1809    }
1810}
1811
1812pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1813    MzAclItem {
1814        grantee: owner_id,
1815        grantor: owner_id,
1816        acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1817    }
1818}
1819
1820const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1821    match object_type {
1822        ObjectType::Table
1823        | ObjectType::View
1824        | ObjectType::MaterializedView
1825        | ObjectType::Source => AclMode::SELECT,
1826        ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1827        ObjectType::Sink
1828        | ObjectType::Index
1829        | ObjectType::Role
1830        | ObjectType::Cluster
1831        | ObjectType::ClusterReplica
1832        | ObjectType::Secret
1833        | ObjectType::Connection
1834        | ObjectType::Database
1835        | ObjectType::Func
1836        | ObjectType::NetworkPolicy => AclMode::empty(),
1837    }
1838}
1839
1840pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1841    let acl_mode = default_builtin_object_acl_mode(object_type);
1842    MzAclItem {
1843        grantee: MZ_SUPPORT_ROLE_ID,
1844        grantor: MZ_SYSTEM_ROLE_ID,
1845        acl_mode,
1846    }
1847}
1848
1849pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1850    let acl_mode = default_builtin_object_acl_mode(object_type);
1851    MzAclItem {
1852        grantee: RoleId::Public,
1853        grantor: MZ_SYSTEM_ROLE_ID,
1854        acl_mode,
1855    }
1856}