mz_sql/
rbac.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26    CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29    CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30    SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34    DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40/// Common checks that need to be performed before we can start checking a role's privileges.
41fn rbac_check_preamble(
42    catalog: &impl SessionCatalog,
43    session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45    // PostgreSQL allows users that have their role dropped to perform some actions,
46    // such as `SET ROLE` and certain `SELECT` queries. We haven't implemented
47    // `SET ROLE` and feel it's safer to force to user to re-authenticate if their
48    // role is dropped.
49    if catalog
50        .try_get_role(&session_meta.role_metadata().current_role)
51        .is_none()
52    {
53        return Err(UnauthorizedError::ConcurrentRoleDrop(
54            session_meta.role_metadata().current_role.clone(),
55        ));
56    };
57    if catalog
58        .try_get_role(&session_meta.role_metadata().session_role)
59        .is_none()
60    {
61        return Err(UnauthorizedError::ConcurrentRoleDrop(
62            session_meta.role_metadata().session_role.clone(),
63        ));
64    };
65    if catalog
66        .try_get_role(&session_meta.role_metadata().authenticated_role)
67        .is_none()
68    {
69        return Err(UnauthorizedError::ConcurrentRoleDrop(
70            session_meta.role_metadata().authenticated_role.clone(),
71        ));
72    };
73
74    Ok(())
75}
76
77/// Filters `RbacRequirements` based on the session role metadata and RBAC related feature flags.
78fn filter_requirements(
79    catalog: &impl SessionCatalog,
80    session_meta: &dyn SessionMetadata,
81    rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83    // Skip RBAC non-mandatory checks if RBAC is disabled. However, we never skip RBAC checks for
84    // system roles. This allows us to limit access of system users even when RBAC is off.
85    let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86        && !session_meta.role_metadata().current_role.is_system()
87        && !session_meta.role_metadata().session_role.is_system();
88    // Skip RBAC checks on user items if the session is a superuser.
89    let is_superuser = session_meta.is_superuser();
90    if is_rbac_disabled || is_superuser {
91        return rbac_requirements.filter_to_mandatory_requirements();
92    }
93
94    rbac_requirements
95}
96
97// The default item types that most statements require USAGE privileges for.
98static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99    btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101// CREATE statements require USAGE privileges on the default item types and USAGE privileges on
102// Types.
103pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104    let mut items = DEFAULT_ITEM_USAGE.clone();
105    items.insert(CatalogItemType::Type);
106    items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110/// Errors that can occur due to an unauthorized action.
111#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113    /// The action can only be performed by a superuser.
114    #[error("permission denied to {action}")]
115    Superuser { action: String },
116    /// The action requires ownership of an object.
117    #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118    Ownership { objects: Vec<(ObjectType, String)> },
119    /// Altering an owner requires membership of the new owner role.
120    #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121    RoleMembership { role_names: Vec<String> },
122    /// The action requires one or more privileges.
123    #[error("permission denied for {object_description}")]
124    Privilege {
125        object_description: ErrorMessageObjectDescription,
126        role_name: String,
127        privileges: String,
128    },
129    // TODO(jkosh44) When we implement parameter privileges, this can be replaced with a regular
130    //  privilege error.
131    /// The action can only be performed by the mz_system role.
132    #[error("permission denied to {action}")]
133    MzSystem { action: String },
134    /// The action cannot be performed by the mz_support role.
135    #[error("permission denied to {action}")]
136    MzSupport { action: String },
137    /// The active role was dropped while a user was logged in.
138    #[error("role {0} was concurrently dropped")]
139    ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143    pub fn detail(&self) -> Option<String> {
144        match &self {
145            UnauthorizedError::Superuser { action } => {
146                Some(format!("You must be a superuser to {}", action))
147            }
148            UnauthorizedError::Privilege {
149                object_description,
150                role_name,
151                privileges,
152            } => Some(format!(
153                "The '{role_name}' role needs {privileges} privileges on {object_description}"
154            )),
155            UnauthorizedError::MzSystem { .. } => {
156                Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157            }
158            UnauthorizedError::MzSupport { .. } => Some(format!(
159                "The '{}' role has very limited privileges",
160                SUPPORT_USER.name
161            )),
162            UnauthorizedError::ConcurrentRoleDrop(_) => {
163                Some("Please disconnect and re-connect with a valid role.".into())
164            }
165            UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166        }
167    }
168}
169
170/// RBAC requirements for executing a given plan.
171#[derive(Debug)]
172struct RbacRequirements {
173    /// The role memberships required.
174    role_membership: BTreeSet<RoleId>,
175    /// The object ownerships required.
176    ownership: Vec<ObjectId>,
177    /// The privileges required. The tuples are of the form:
178    /// (What object the privilege is on, What privilege is required, Who must possess the privilege).
179    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180    /// The types of catalog items that this plan requires USAGE privileges on.
181    ///
182    /// Most plans will require USAGE on secrets and connections but some plans, like SHOW CREATE,
183    /// can reference an item without requiring any privileges on that item.
184    item_usage: &'static BTreeSet<CatalogItemType>,
185    /// Some action if superuser is required to perform that action, None otherwise.
186    superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190    fn empty() -> RbacRequirements {
191        RbacRequirements {
192            role_membership: BTreeSet::new(),
193            ownership: Vec::new(),
194            privileges: Vec::new(),
195            item_usage: &EMPTY_ITEM_USAGE,
196            superuser_action: None,
197        }
198    }
199
200    fn validate(
201        self,
202        catalog: &impl SessionCatalog,
203        session: &dyn SessionMetadata,
204        resolved_ids: &ResolvedIds,
205    ) -> Result<(), UnauthorizedError> {
206        // Obtain all roles that the current session is a member of.
207        let role_membership =
208            catalog.collect_role_membership(&session.role_metadata().current_role);
209
210        check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212        // Validate that the current session has the required role membership to execute the provided
213        // plan.
214        let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215        if !unheld_membership.is_empty() {
216            let role_names = unheld_membership
217                .into_iter()
218                .map(|role_id| {
219                    // Some role references may no longer exist due to concurrent drops.
220                    catalog
221                        .try_get_role(role_id)
222                        .map(|role| role.name().to_string())
223                        .unwrap_or(role_id.to_string())
224                })
225                .collect();
226            return Err(UnauthorizedError::RoleMembership { role_names });
227        }
228
229        // Validate that the current session has the required object ownership to execute the provided
230        // plan.
231        let unheld_ownership = self
232            .ownership
233            .into_iter()
234            .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235            .collect();
236        ownership_err(unheld_ownership, catalog)?;
237
238        check_object_privileges(
239            catalog,
240            self.privileges,
241            role_membership,
242            session.role_metadata().current_role,
243        )?;
244
245        if let Some(action) = self.superuser_action {
246            return Err(UnauthorizedError::Superuser { action });
247        }
248
249        Ok(())
250    }
251
252    fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253        let RbacRequirements {
254            role_membership,
255            ownership,
256            privileges,
257            item_usage,
258            superuser_action: _,
259        } = self;
260        let role_membership = role_membership
261            .into_iter()
262            .filter(|id| id.is_system())
263            .collect();
264        let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265        let privileges = privileges
266            .into_iter()
267            .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268            // We allow reading objects for superusers and when RBAC is off.
269            .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270            .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271            .collect();
272        let superuser_action = None;
273        RbacRequirements {
274            role_membership,
275            ownership,
276            privileges,
277            item_usage,
278            superuser_action,
279        }
280    }
281}
282
283impl Default for RbacRequirements {
284    fn default() -> Self {
285        RbacRequirements {
286            role_membership: BTreeSet::new(),
287            ownership: Vec::new(),
288            privileges: Vec::new(),
289            item_usage: &DEFAULT_ITEM_USAGE,
290            superuser_action: None,
291        }
292    }
293}
294
295/// Checks if a `session` is authorized to use `resolved_ids`. If not, an error is returned.
296pub fn check_usage(
297    catalog: &impl SessionCatalog,
298    session: &dyn SessionMetadata,
299    resolved_ids: &ResolvedIds,
300    item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302    rbac_check_preamble(catalog, session)?;
303
304    // Obtain all roles that the current session is a member of.
305    let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307    // Certain statements depend on objects that haven't been created yet, like sub-sources, so we
308    // need to filter those out.
309    let existing_resolved_ids =
310        resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312    let required_privileges = generate_usage_privileges(
313        catalog,
314        &existing_resolved_ids,
315        session.role_metadata().current_role,
316        item_types,
317    )
318    .into_iter()
319    .collect();
320
321    let mut rbac_requirements = RbacRequirements::empty();
322    rbac_requirements.privileges = required_privileges;
323    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324    let required_privileges = rbac_requirements.privileges;
325
326    check_object_privileges(
327        catalog,
328        required_privileges,
329        role_membership,
330        session.role_metadata().current_role,
331    )?;
332
333    Ok(())
334}
335
336/// Checks if a session is authorized to execute a plan. If not, an error is returned.
337pub fn check_plan(
338    catalog: &impl SessionCatalog,
339    // Function mapping a connection ID to an authenticated role. The roles may have been dropped concurrently.
340    active_conns: impl FnOnce(u32) -> Option<RoleId>,
341    session: &dyn SessionMetadata,
342    plan: &Plan,
343    target_cluster_id: Option<ClusterId>,
344    resolved_ids: &ResolvedIds,
345) -> Result<(), UnauthorizedError> {
346    rbac_check_preamble(catalog, session)?;
347
348    let rbac_requirements = generate_rbac_requirements(
349        catalog,
350        plan,
351        active_conns,
352        target_cluster_id,
353        session.role_metadata().current_role,
354    );
355    let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
356    debug!(
357        "rbac requirements {rbac_requirements:?} for plan {:?}",
358        PlanKind::from(plan)
359    );
360    rbac_requirements.validate(catalog, session, resolved_ids)
361}
362
363/// Returns true if RBAC is turned on for a session, false otherwise.
364pub fn is_rbac_enabled_for_session(
365    system_vars: &SystemVars,
366    session: &dyn SessionMetadata,
367) -> bool {
368    let server_enabled = system_vars.enable_rbac_checks();
369    let session_enabled = session.enable_session_rbac_checks();
370
371    // The session flag allows users to turn RBAC on for just their session while the server flag
372    // allows users to turn RBAC on for everyone.
373    server_enabled || session_enabled
374}
375
376/// Generates all requirements needed to execute a given plan.
377fn generate_rbac_requirements(
378    catalog: &impl SessionCatalog,
379    plan: &Plan,
380    active_conns: impl FnOnce(u32) -> Option<RoleId>,
381    target_cluster_id: Option<ClusterId>,
382    role_id: RoleId,
383) -> RbacRequirements {
384    match plan {
385        Plan::CreateConnection(plan::CreateConnectionPlan {
386            name,
387            if_not_exists: _,
388            connection: _,
389            validate: _,
390        }) => RbacRequirements {
391            privileges: vec![(
392                SystemObjectId::Object(name.qualifiers.clone().into()),
393                AclMode::CREATE,
394                role_id,
395            )],
396            item_usage: &CREATE_ITEM_USAGE,
397            ..Default::default()
398        },
399        Plan::CreateDatabase(plan::CreateDatabasePlan {
400            name: _,
401            if_not_exists: _,
402        }) => RbacRequirements {
403            privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
404            item_usage: &CREATE_ITEM_USAGE,
405            ..Default::default()
406        },
407        Plan::CreateSchema(plan::CreateSchemaPlan {
408            database_spec,
409            schema_name: _,
410            if_not_exists: _,
411        }) => {
412            let privileges = match database_spec {
413                ResolvedDatabaseSpecifier::Ambient => Vec::new(),
414                ResolvedDatabaseSpecifier::Id(database_id) => {
415                    vec![(
416                        SystemObjectId::Object(database_id.into()),
417                        AclMode::CREATE,
418                        role_id,
419                    )]
420                }
421            };
422            RbacRequirements {
423                privileges,
424                item_usage: &CREATE_ITEM_USAGE,
425                ..Default::default()
426            }
427        }
428        Plan::CreateRole(plan::CreateRolePlan {
429            name: _,
430            attributes,
431        }) => {
432            if attributes.superuser.unwrap_or(false) {
433                RbacRequirements {
434                    superuser_action: Some("create superuser role".to_string()),
435                    ..Default::default()
436                }
437            } else {
438                RbacRequirements {
439                    privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
440                    item_usage: &CREATE_ITEM_USAGE,
441                    ..Default::default()
442                }
443            }
444        }
445        Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
446            privileges: vec![(
447                SystemObjectId::System,
448                AclMode::CREATE_NETWORK_POLICY,
449                role_id,
450            )],
451            item_usage: &CREATE_ITEM_USAGE,
452            ..Default::default()
453        },
454        Plan::CreateCluster(plan::CreateClusterPlan {
455            name: _,
456            variant: _,
457            workload_class: _,
458        }) => RbacRequirements {
459            privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
460            item_usage: &CREATE_ITEM_USAGE,
461            ..Default::default()
462        },
463        Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
464            cluster_id,
465            name: _,
466            config: _,
467        }) => RbacRequirements {
468            ownership: vec![ObjectId::Cluster(*cluster_id)],
469            item_usage: &CREATE_ITEM_USAGE,
470            ..Default::default()
471        },
472        Plan::CreateSource(plan::CreateSourcePlan {
473            name,
474            source,
475            if_not_exists: _,
476            timeline: _,
477            in_cluster,
478        }) => RbacRequirements {
479            privileges: generate_required_source_privileges(
480                name,
481                &source.data_source,
482                *in_cluster,
483                role_id,
484            ),
485            item_usage: &CREATE_ITEM_USAGE,
486            ..Default::default()
487        },
488        Plan::CreateSources(plans) => RbacRequirements {
489            privileges: plans
490                .iter()
491                .flat_map(
492                    |plan::CreateSourcePlanBundle {
493                         item_id: _,
494                         global_id: _,
495                         plan:
496                             plan::CreateSourcePlan {
497                                 name,
498                                 source,
499                                 if_not_exists: _,
500                                 timeline: _,
501                                 in_cluster,
502                             },
503                         resolved_ids: _,
504                         available_source_references: _,
505                     }| {
506                        generate_required_source_privileges(
507                            name,
508                            &source.data_source,
509                            *in_cluster,
510                            role_id,
511                        )
512                        .into_iter()
513                    },
514                )
515                .collect(),
516            item_usage: &CREATE_ITEM_USAGE,
517            ..Default::default()
518        },
519        Plan::CreateSecret(plan::CreateSecretPlan {
520            name,
521            secret: _,
522            if_not_exists: _,
523        }) => RbacRequirements {
524            privileges: vec![(
525                SystemObjectId::Object(name.qualifiers.clone().into()),
526                AclMode::CREATE,
527                role_id,
528            )],
529            item_usage: &CREATE_ITEM_USAGE,
530            ..Default::default()
531        },
532        Plan::CreateSink(plan::CreateSinkPlan {
533            name,
534            sink,
535            with_snapshot: _,
536            if_not_exists: _,
537            in_cluster,
538        }) => {
539            let mut privileges = vec![(
540                SystemObjectId::Object(name.qualifiers.clone().into()),
541                AclMode::CREATE,
542                role_id,
543            )];
544            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
545            privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
546            privileges.push((
547                SystemObjectId::Object(in_cluster.into()),
548                AclMode::CREATE,
549                role_id,
550            ));
551            RbacRequirements {
552                privileges,
553                item_usage: &CREATE_ITEM_USAGE,
554                ..Default::default()
555            }
556        }
557        Plan::CreateTable(plan::CreateTablePlan {
558            name,
559            table: _,
560            if_not_exists: _,
561        }) => RbacRequirements {
562            privileges: vec![(
563                SystemObjectId::Object(name.qualifiers.clone().into()),
564                AclMode::CREATE,
565                role_id,
566            )],
567            item_usage: &CREATE_ITEM_USAGE,
568            ..Default::default()
569        },
570        Plan::CreateView(plan::CreateViewPlan {
571            name,
572            view: _,
573            replace,
574            drop_ids: _,
575            if_not_exists: _,
576            ambiguous_columns: _,
577        }) => RbacRequirements {
578            ownership: replace
579                .map(|id| vec![ObjectId::Item(id)])
580                .unwrap_or_default(),
581            privileges: vec![(
582                SystemObjectId::Object(name.qualifiers.clone().into()),
583                AclMode::CREATE,
584                role_id,
585            )],
586            item_usage: &CREATE_ITEM_USAGE,
587            ..Default::default()
588        },
589        Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
590            name,
591            materialized_view,
592            replace,
593            drop_ids: _,
594            if_not_exists: _,
595            ambiguous_columns: _,
596        }) => RbacRequirements {
597            ownership: replace
598                .map(|id| vec![ObjectId::Item(id)])
599                .unwrap_or_default(),
600            privileges: vec![
601                (
602                    SystemObjectId::Object(name.qualifiers.clone().into()),
603                    AclMode::CREATE,
604                    role_id,
605                ),
606                (
607                    SystemObjectId::Object(materialized_view.cluster_id.into()),
608                    AclMode::CREATE,
609                    role_id,
610                ),
611            ],
612            item_usage: &CREATE_ITEM_USAGE,
613            ..Default::default()
614        },
615        Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
616            name,
617            placeholder_id: _,
618            desc: _,
619            input_id: _,
620            with_snapshot: _,
621            continual_task,
622        }) => RbacRequirements {
623            privileges: vec![
624                (
625                    SystemObjectId::Object(name.qualifiers.clone().into()),
626                    AclMode::CREATE,
627                    role_id,
628                ),
629                (
630                    SystemObjectId::Object(continual_task.cluster_id.into()),
631                    AclMode::CREATE,
632                    role_id,
633                ),
634            ],
635            item_usage: &CREATE_ITEM_USAGE,
636            ..Default::default()
637        },
638        Plan::CreateIndex(plan::CreateIndexPlan {
639            name,
640            index,
641            if_not_exists: _,
642        }) => {
643            let index_on_item = catalog.resolve_item_id(&index.on);
644            RbacRequirements {
645                ownership: vec![ObjectId::Item(index_on_item)],
646                privileges: vec![
647                    (
648                        SystemObjectId::Object(name.qualifiers.clone().into()),
649                        AclMode::CREATE,
650                        role_id,
651                    ),
652                    (
653                        SystemObjectId::Object(index.cluster_id.into()),
654                        AclMode::CREATE,
655                        role_id,
656                    ),
657                ],
658                item_usage: &CREATE_ITEM_USAGE,
659                ..Default::default()
660            }
661        }
662        Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
663            privileges: vec![(
664                SystemObjectId::Object(name.qualifiers.clone().into()),
665                AclMode::CREATE,
666                role_id,
667            )],
668            item_usage: &CREATE_ITEM_USAGE,
669            ..Default::default()
670        },
671        Plan::Comment(plan::CommentPlan {
672            object_id,
673            sub_component: _,
674            comment: _,
675        }) => {
676            let (ownership, privileges) = match object_id {
677                // Roles don't have owners, instead we require the current session to have the
678                // `CREATEROLE` privilege.
679                CommentObjectId::Role(_) => (
680                    Vec::new(),
681                    vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
682                ),
683                _ => (vec![ObjectId::from(*object_id)], Vec::new()),
684            };
685            RbacRequirements {
686                ownership,
687                privileges,
688                ..Default::default()
689            }
690        }
691        Plan::DropObjects(plan::DropObjectsPlan {
692            referenced_ids,
693            drop_ids: _,
694            object_type,
695        }) => {
696            let privileges = if object_type == &ObjectType::Role {
697                vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
698            } else {
699                referenced_ids
700                    .iter()
701                    .filter_map(|id| match id {
702                        ObjectId::ClusterReplica((cluster_id, _)) => Some((
703                            SystemObjectId::Object(cluster_id.into()),
704                            AclMode::USAGE,
705                            role_id,
706                        )),
707                        ObjectId::Schema((database_spec, _)) => match database_spec {
708                            ResolvedDatabaseSpecifier::Ambient => None,
709                            ResolvedDatabaseSpecifier::Id(database_id) => Some((
710                                SystemObjectId::Object(database_id.into()),
711                                AclMode::USAGE,
712                                role_id,
713                            )),
714                        },
715                        ObjectId::Item(item_id) => {
716                            let item = catalog.get_item(item_id);
717                            Some((
718                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
719                                AclMode::USAGE,
720                                role_id,
721                            ))
722                        }
723                        ObjectId::Cluster(_)
724                        | ObjectId::Database(_)
725                        | ObjectId::Role(_)
726                        | ObjectId::NetworkPolicy(_) => None,
727                    })
728                    .collect()
729            };
730            RbacRequirements {
731                // Do not need ownership of descendant objects.
732                ownership: referenced_ids.clone(),
733                privileges,
734                ..Default::default()
735            }
736        }
737        Plan::DropOwned(plan::DropOwnedPlan {
738            role_ids,
739            drop_ids: _,
740            privilege_revokes: _,
741            default_privilege_revokes: _,
742        }) => RbacRequirements {
743            role_membership: role_ids.into_iter().cloned().collect(),
744            ..Default::default()
745        },
746        Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
747            let container_id = match id {
748                ObjectId::Item(id) => Some(SystemObjectId::Object(
749                    catalog.get_item(id).name().qualifiers.clone().into(),
750                )),
751                ObjectId::Schema((database_id, _schema_id)) => match database_id {
752                    ResolvedDatabaseSpecifier::Ambient => None,
753                    ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
754                },
755                ObjectId::Cluster(_)
756                | ObjectId::ClusterReplica(_)
757                | ObjectId::Database(_)
758                | ObjectId::Role(_)
759                | ObjectId::NetworkPolicy(_) => None,
760            };
761            let privileges = match container_id {
762                Some(id) => vec![(id, AclMode::USAGE, role_id)],
763                None => Vec::new(),
764            };
765            RbacRequirements {
766                privileges,
767                item_usage: &EMPTY_ITEM_USAGE,
768                ..Default::default()
769            }
770        }
771        Plan::ShowColumns(plan::ShowColumnsPlan {
772            id,
773            select_plan,
774            new_resolved_ids: _,
775        }) => {
776            let mut privileges = vec![(
777                SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
778                AclMode::USAGE,
779                role_id,
780            )];
781
782            for privilege in generate_rbac_requirements(
783                catalog,
784                &Plan::Select(select_plan.clone()),
785                active_conns,
786                target_cluster_id,
787                role_id,
788            )
789            .privileges
790            {
791                privileges.push(privilege);
792            }
793            RbacRequirements {
794                privileges,
795                ..Default::default()
796            }
797        }
798        Plan::Select(plan::SelectPlan {
799            source,
800            select: _,
801            when: _,
802            finishing: _,
803            copy_to: _,
804        }) => {
805            let items = source
806                .depends_on()
807                .into_iter()
808                .map(|gid| catalog.resolve_item_id(&gid));
809            let mut privileges = generate_read_privileges(catalog, items, role_id);
810            if let Some(privilege) = generate_cluster_usage_privileges(
811                source.as_const().is_some(),
812                target_cluster_id,
813                role_id,
814            ) {
815                privileges.push(privilege);
816            }
817            RbacRequirements {
818                privileges,
819                ..Default::default()
820            }
821        }
822        Plan::Subscribe(plan::SubscribePlan {
823            from,
824            with_snapshot: _,
825            when: _,
826            up_to: _,
827            copy_to: _,
828            emit_progress: _,
829            output: _,
830        }) => {
831            let items = from
832                .depends_on()
833                .into_iter()
834                .map(|gid| catalog.resolve_item_id(&gid));
835            let mut privileges = generate_read_privileges(catalog, items, role_id);
836            if let Some(cluster_id) = target_cluster_id {
837                privileges.push((
838                    SystemObjectId::Object(cluster_id.into()),
839                    AclMode::USAGE,
840                    role_id,
841                ));
842            }
843            RbacRequirements {
844                privileges,
845                ..Default::default()
846            }
847        }
848        Plan::CopyFrom(plan::CopyFromPlan {
849            id,
850            source: _,
851            columns: _,
852            source_desc: _,
853            mfp: _,
854            params: _,
855            filter: _,
856        }) => RbacRequirements {
857            privileges: vec![
858                (
859                    SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
860                    AclMode::USAGE,
861                    role_id,
862                ),
863                (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
864            ],
865            ..Default::default()
866        },
867        Plan::CopyTo(plan::CopyToPlan {
868            select_plan,
869            desc: _,
870            to: _,
871            connection: _,
872            connection_id: _,
873            format: _,
874            max_file_size: _,
875        }) => {
876            let items = select_plan
877                .source
878                .depends_on()
879                .into_iter()
880                .map(|gid| catalog.resolve_item_id(&gid));
881            let mut privileges = generate_read_privileges(catalog, items, role_id);
882            if let Some(cluster_id) = target_cluster_id {
883                privileges.push((
884                    SystemObjectId::Object(cluster_id.into()),
885                    AclMode::USAGE,
886                    role_id,
887                ));
888            }
889            RbacRequirements {
890                privileges,
891                ..Default::default()
892            }
893        }
894        Plan::ExplainPlan(plan::ExplainPlanPlan {
895            stage: _,
896            format: _,
897            config: _,
898            explainee,
899        })
900        | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
901            privileges: match explainee {
902                Explainee::View(id)
903                | Explainee::MaterializedView(id)
904                | Explainee::Index(id)
905                | Explainee::ReplanView(id)
906                | Explainee::ReplanMaterializedView(id)
907                | Explainee::ReplanIndex(id) => {
908                    let item = catalog.get_item(id);
909                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
910                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
911                }
912                Explainee::Statement(stmt) => stmt
913                    .depends_on()
914                    .into_iter()
915                    .map(|id| {
916                        let item = catalog.get_item_by_global_id(&id);
917                        let schema_id: ObjectId = item.name().qualifiers.clone().into();
918                        (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
919                    })
920                    .collect(),
921            },
922            item_usage: match explainee {
923                Explainee::View(..)
924                | Explainee::MaterializedView(..)
925                | Explainee::Index(..)
926                | Explainee::ReplanView(..)
927                | Explainee::ReplanMaterializedView(..)
928                | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
929                Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
930            },
931            ..Default::default()
932        },
933        Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
934            RbacRequirements {
935                privileges: {
936                    let item = catalog.get_item_by_global_id(sink_from);
937                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
938                    vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
939                },
940                item_usage: &EMPTY_ITEM_USAGE,
941                ..Default::default()
942            }
943        }
944        Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
945            format: _,
946            raw_plan,
947            when: _,
948        }) => RbacRequirements {
949            privileges: raw_plan
950                .depends_on()
951                .into_iter()
952                .map(|id| {
953                    let item = catalog.get_item_by_global_id(&id);
954                    let schema_id: ObjectId = item.name().qualifiers.clone().into();
955                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
956                })
957                .collect(),
958            ..Default::default()
959        },
960        Plan::Insert(plan::InsertPlan {
961            id,
962            values,
963            returning,
964        }) => {
965            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
966            let mut privileges = vec![
967                (
968                    SystemObjectId::Object(schema_id.clone()),
969                    AclMode::USAGE,
970                    role_id,
971                ),
972                (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
973            ];
974            let mut seen = BTreeSet::from([(schema_id, role_id)]);
975
976            // We don't allow arbitrary sub-queries in `returning`. So either it
977            // contains a column reference to the outer table or it's constant.
978            if returning
979                .iter()
980                .any(|assignment| assignment.contains_column())
981            {
982                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
983                seen.insert((id.into(), role_id));
984            }
985
986            let items = values
987                .depends_on()
988                .into_iter()
989                .map(|gid| catalog.resolve_item_id(&gid));
990            privileges.extend_from_slice(&generate_read_privileges_inner(
991                catalog, items, role_id, &mut seen,
992            ));
993
994            if let Some(privilege) = generate_cluster_usage_privileges(
995                values.as_const().is_some(),
996                target_cluster_id,
997                role_id,
998            ) {
999                privileges.push(privilege);
1000            } else if !returning.is_empty() {
1001                // TODO(jkosh44) returning may be a constant, but for now we are overly protective
1002                //  and require cluster privileges for all returning.
1003                if let Some(cluster_id) = target_cluster_id {
1004                    privileges.push((
1005                        SystemObjectId::Object(cluster_id.into()),
1006                        AclMode::USAGE,
1007                        role_id,
1008                    ));
1009                }
1010            }
1011            RbacRequirements {
1012                privileges,
1013                ..Default::default()
1014            }
1015        }
1016        Plan::AlterCluster(plan::AlterClusterPlan {
1017            id,
1018            name: _,
1019            options: _,
1020            strategy: _,
1021        }) => RbacRequirements {
1022            ownership: vec![ObjectId::Cluster(*id)],
1023            item_usage: &CREATE_ITEM_USAGE,
1024            ..Default::default()
1025        },
1026        Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1027            ownership: vec![ObjectId::Item(*id)],
1028            privileges: vec![(
1029                SystemObjectId::Object(set_cluster.into()),
1030                AclMode::CREATE,
1031                role_id,
1032            )],
1033            item_usage: &CREATE_ITEM_USAGE,
1034            ..Default::default()
1035        },
1036        Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1037            id,
1038            window: _,
1039            value: _,
1040            object_type: _,
1041        }) => RbacRequirements {
1042            ownership: vec![ObjectId::Item(*id)],
1043            item_usage: &CREATE_ITEM_USAGE,
1044            ..Default::default()
1045        },
1046        Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1047            ownership: vec![ObjectId::Item(*id)],
1048            ..Default::default()
1049        },
1050        Plan::AlterSource(plan::AlterSourcePlan {
1051            item_id,
1052            ingestion_id: _,
1053            action: _,
1054        }) => RbacRequirements {
1055            ownership: vec![ObjectId::Item(*item_id)],
1056            item_usage: &CREATE_ITEM_USAGE,
1057            ..Default::default()
1058        },
1059        Plan::AlterSink(plan::AlterSinkPlan {
1060            item_id,
1061            global_id: _,
1062            sink,
1063            with_snapshot: _,
1064            in_cluster,
1065        }) => {
1066            let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1067            let mut privileges = generate_read_privileges(catalog, items, role_id);
1068            privileges.push((
1069                SystemObjectId::Object(in_cluster.into()),
1070                AclMode::CREATE,
1071                role_id,
1072            ));
1073            RbacRequirements {
1074                ownership: vec![ObjectId::Item(*item_id)],
1075                privileges,
1076                item_usage: &CREATE_ITEM_USAGE,
1077                ..Default::default()
1078            }
1079        }
1080        Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1081            id,
1082            name: _,
1083            to_name: _,
1084        }) => RbacRequirements {
1085            ownership: vec![ObjectId::Cluster(*id)],
1086            ..Default::default()
1087        },
1088        Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1089            id_a,
1090            id_b,
1091            name_a: _,
1092            name_b: _,
1093            name_temp: _,
1094        }) => RbacRequirements {
1095            ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1096            ..Default::default()
1097        },
1098        Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1099            cluster_id,
1100            replica_id,
1101            name: _,
1102            to_name: _,
1103        }) => RbacRequirements {
1104            ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1105            ..Default::default()
1106        },
1107        Plan::AlterItemRename(plan::AlterItemRenamePlan {
1108            id,
1109            current_full_name: _,
1110            to_name: _,
1111            object_type: _,
1112        }) => RbacRequirements {
1113            ownership: vec![ObjectId::Item(*id)],
1114            ..Default::default()
1115        },
1116        Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1117            cur_schema_spec,
1118            new_schema_name: _,
1119        }) => {
1120            let privileges = match cur_schema_spec.0 {
1121                ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1122                    SystemObjectId::Object(ObjectId::Database(db_id)),
1123                    AclMode::CREATE,
1124                    role_id,
1125                )],
1126                ResolvedDatabaseSpecifier::Ambient => vec![],
1127            };
1128
1129            RbacRequirements {
1130                ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1131                privileges,
1132                ..Default::default()
1133            }
1134        }
1135        Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1136            schema_a_spec,
1137            schema_a_name: _,
1138            schema_b_spec,
1139            schema_b_name: _,
1140            name_temp: _,
1141        }) => {
1142            let mut privileges = vec![];
1143            if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1144                privileges.push((
1145                    SystemObjectId::Object(ObjectId::Database(id_a)),
1146                    AclMode::CREATE,
1147                    role_id,
1148                ));
1149            }
1150            if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1151                privileges.push((
1152                    SystemObjectId::Object(ObjectId::Database(id_b)),
1153                    AclMode::CREATE,
1154                    role_id,
1155                ));
1156            }
1157
1158            RbacRequirements {
1159                ownership: vec![
1160                    ObjectId::Schema(*schema_a_spec),
1161                    ObjectId::Schema(*schema_b_spec),
1162                ],
1163                privileges,
1164                ..Default::default()
1165            }
1166        }
1167        Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1168            ownership: vec![ObjectId::Item(*id)],
1169            item_usage: &CREATE_ITEM_USAGE,
1170            ..Default::default()
1171        },
1172        Plan::AlterRole(plan::AlterRolePlan {
1173            id,
1174            name: _,
1175            option,
1176        }) => match option {
1177            // Only superusers can alter the superuserness of a role.
1178            plan::PlannedAlterRoleOption::Attributes(attributes)
1179                if attributes.superuser.unwrap_or(false) =>
1180            {
1181                RbacRequirements {
1182                    superuser_action: Some("alter superuser role".to_string()),
1183                    ..Default::default()
1184                }
1185            }
1186            // Roles are allowed to change their own password.
1187            plan::PlannedAlterRoleOption::Attributes(attributes)
1188                if attributes.password.is_some() && role_id == *id =>
1189            {
1190                RbacRequirements::default()
1191            }
1192            // But no one elses...
1193            plan::PlannedAlterRoleOption::Attributes(attributes)
1194                if attributes.password.is_some() =>
1195            {
1196                RbacRequirements {
1197                    superuser_action: Some("alter password of role".to_string()),
1198                    ..Default::default()
1199                }
1200            }
1201            // Roles are allowed to change their own variables.
1202            plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1203                RbacRequirements::default()
1204            }
1205            // Otherwise to ALTER a role, you need to have the CREATE_ROLE privilege.
1206            _ => RbacRequirements {
1207                privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1208                item_usage: &CREATE_ITEM_USAGE,
1209                ..Default::default()
1210            },
1211        },
1212        Plan::AlterOwner(plan::AlterOwnerPlan {
1213            id,
1214            object_type: _,
1215            new_owner,
1216        }) => {
1217            let privileges = match id {
1218                ObjectId::ClusterReplica((cluster_id, _)) => {
1219                    vec![(
1220                        SystemObjectId::Object(cluster_id.into()),
1221                        AclMode::CREATE,
1222                        role_id,
1223                    )]
1224                }
1225                ObjectId::Schema((database_spec, _)) => match database_spec {
1226                    ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1227                    ResolvedDatabaseSpecifier::Id(database_id) => {
1228                        vec![(
1229                            SystemObjectId::Object(database_id.into()),
1230                            AclMode::CREATE,
1231                            role_id,
1232                        )]
1233                    }
1234                },
1235                ObjectId::Item(item_id) => {
1236                    let item = catalog.get_item(item_id);
1237                    vec![(
1238                        SystemObjectId::Object(item.name().qualifiers.clone().into()),
1239                        AclMode::CREATE,
1240                        role_id,
1241                    )]
1242                }
1243                ObjectId::Cluster(_)
1244                | ObjectId::Database(_)
1245                | ObjectId::Role(_)
1246                | ObjectId::NetworkPolicy(_) => Vec::new(),
1247            };
1248            RbacRequirements {
1249                role_membership: BTreeSet::from([*new_owner]),
1250                ownership: vec![id.clone()],
1251                privileges,
1252                ..Default::default()
1253            }
1254        }
1255        Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1256            ownership: vec![ObjectId::Item(*relation_id)],
1257            item_usage: &CREATE_ITEM_USAGE,
1258            ..Default::default()
1259        },
1260        Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1261            ownership: vec![ObjectId::NetworkPolicy(*id)],
1262            item_usage: &CREATE_ITEM_USAGE,
1263            ..Default::default()
1264        },
1265        Plan::ReadThenWrite(plan::ReadThenWritePlan {
1266            id,
1267            selection,
1268            finishing: _,
1269            assignments,
1270            kind,
1271            returning,
1272        }) => {
1273            let acl_mode = match kind {
1274                MutationKind::Insert => AclMode::INSERT,
1275                MutationKind::Update => AclMode::UPDATE,
1276                MutationKind::Delete => AclMode::DELETE,
1277            };
1278            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1279            let mut privileges = vec![
1280                (
1281                    SystemObjectId::Object(schema_id.clone()),
1282                    AclMode::USAGE,
1283                    role_id,
1284                ),
1285                (SystemObjectId::Object(id.into()), acl_mode, role_id),
1286            ];
1287            let mut seen = BTreeSet::from([(schema_id, role_id)]);
1288
1289            // We don't allow arbitrary sub-queries in `assignments` or `returning`. So either they
1290            // contains a column reference to the outer table or it's constant.
1291            if assignments
1292                .values()
1293                .chain(returning.iter())
1294                .any(|assignment| assignment.contains_column())
1295            {
1296                privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1297                seen.insert((id.into(), role_id));
1298            }
1299
1300            // TODO(jkosh44) It's fairly difficult to determine what part of `selection` is from a
1301            //  user specified read and what part is from the implementation of the read then write.
1302            //  instead we are overly protective and always require SELECT privileges even though
1303            //  PostgreSQL doesn't always do this.
1304            //  As a concrete example, we require SELECT and UPDATE privileges to execute
1305            //  `UPDATE t SET a = 42;`, while PostgreSQL only requires UPDATE privileges.
1306            let items = selection
1307                .depends_on()
1308                .into_iter()
1309                .map(|gid| catalog.resolve_item_id(&gid));
1310            privileges.extend_from_slice(&generate_read_privileges_inner(
1311                catalog, items, role_id, &mut seen,
1312            ));
1313
1314            if let Some(privilege) = generate_cluster_usage_privileges(
1315                selection.as_const().is_some(),
1316                target_cluster_id,
1317                role_id,
1318            ) {
1319                privileges.push(privilege);
1320            }
1321            RbacRequirements {
1322                privileges,
1323                ..Default::default()
1324            }
1325        }
1326        Plan::GrantRole(plan::GrantRolePlan {
1327            role_ids: _,
1328            member_ids: _,
1329            grantor_id: _,
1330        })
1331        | Plan::RevokeRole(plan::RevokeRolePlan {
1332            role_ids: _,
1333            member_ids: _,
1334            grantor_id: _,
1335        }) => RbacRequirements {
1336            privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1337            ..Default::default()
1338        },
1339        Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1340            update_privileges,
1341            grantees: _,
1342        })
1343        | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1344            update_privileges,
1345            revokees: _,
1346        }) => {
1347            let mut privileges = Vec::with_capacity(update_privileges.len());
1348            for UpdatePrivilege { target_id, .. } in update_privileges {
1349                match target_id {
1350                    SystemObjectId::Object(object_id) => match object_id {
1351                        ObjectId::ClusterReplica((cluster_id, _)) => {
1352                            privileges.push((
1353                                SystemObjectId::Object(cluster_id.into()),
1354                                AclMode::USAGE,
1355                                role_id,
1356                            ));
1357                        }
1358                        ObjectId::Schema((database_spec, _)) => match database_spec {
1359                            ResolvedDatabaseSpecifier::Ambient => {}
1360                            ResolvedDatabaseSpecifier::Id(database_id) => {
1361                                privileges.push((
1362                                    SystemObjectId::Object(database_id.into()),
1363                                    AclMode::USAGE,
1364                                    role_id,
1365                                ));
1366                            }
1367                        },
1368                        ObjectId::Item(item_id) => {
1369                            let item = catalog.get_item(item_id);
1370                            privileges.push((
1371                                SystemObjectId::Object(item.name().qualifiers.clone().into()),
1372                                AclMode::USAGE,
1373                                role_id,
1374                            ))
1375                        }
1376                        ObjectId::Cluster(_)
1377                        | ObjectId::Database(_)
1378                        | ObjectId::Role(_)
1379                        | ObjectId::NetworkPolicy(_) => {}
1380                    },
1381                    SystemObjectId::System => {}
1382                }
1383            }
1384            RbacRequirements {
1385                ownership: update_privileges
1386                    .iter()
1387                    .filter_map(|update_privilege| update_privilege.target_id.object_id())
1388                    .cloned()
1389                    .collect(),
1390                privileges,
1391                // To grant/revoke a privilege on some object, generally the grantor/revoker must be the
1392                // owner of that object (or have a grant option on that object which isn't implemented in
1393                // Materialize yet). There is no owner of the entire system, so it's only reasonable to
1394                // restrict granting/revoking system privileges to superusers.
1395                superuser_action: if update_privileges
1396                    .iter()
1397                    .any(|update_privilege| update_privilege.target_id.is_system())
1398                {
1399                    Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1400                } else {
1401                    None
1402                },
1403                ..Default::default()
1404            }
1405        }
1406        Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1407            privilege_objects,
1408            privilege_acl_items: _,
1409            is_grant: _,
1410        }) => RbacRequirements {
1411            role_membership: privilege_objects
1412                .iter()
1413                .map(|privilege_object| privilege_object.role_id)
1414                .collect(),
1415            privileges: privilege_objects
1416                .into_iter()
1417                .filter_map(|privilege_object| {
1418                    if let (Some(database_id), Some(_)) =
1419                        (privilege_object.database_id, privilege_object.schema_id)
1420                    {
1421                        Some((
1422                            SystemObjectId::Object(database_id.into()),
1423                            AclMode::USAGE,
1424                            role_id,
1425                        ))
1426                    } else {
1427                        None
1428                    }
1429                })
1430                .collect(),
1431            // Altering the default privileges for the PUBLIC role (aka ALL ROLES) will affect all roles
1432            // that currently exist and roles that will exist in the future. It's impossible for an exising
1433            // role to be a member of a role that doesn't exist yet, so no current role could possibly have
1434            // the privileges required to alter default privileges for the PUBLIC role. Therefore we
1435            // only superusers can alter default privileges for the PUBLIC role.
1436            superuser_action: if privilege_objects
1437                .iter()
1438                .any(|privilege_object| privilege_object.role_id.is_public())
1439            {
1440                Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1441            } else {
1442                None
1443            },
1444            ..Default::default()
1445        },
1446        Plan::ReassignOwned(plan::ReassignOwnedPlan {
1447            old_roles,
1448            new_role,
1449            reassign_ids: _,
1450        }) => RbacRequirements {
1451            role_membership: old_roles
1452                .into_iter()
1453                .cloned()
1454                .chain(iter::once(*new_role))
1455                .collect(),
1456            ..Default::default()
1457        },
1458        Plan::SideEffectingFunc(func) => {
1459            let role_membership = match func {
1460                SideEffectingFunc::PgCancelBackend { connection_id } => {
1461                    active_conns(*connection_id)
1462                        .map(|x| [x].into())
1463                        .unwrap_or_default()
1464                }
1465            };
1466            RbacRequirements {
1467                role_membership,
1468                ..Default::default()
1469            }
1470        }
1471        Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1472            let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1473            RbacRequirements {
1474                privileges: vec![
1475                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1476                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1477                ],
1478                ..Default::default()
1479            }
1480        }
1481        Plan::DiscardTemp
1482        | Plan::DiscardAll
1483        | Plan::EmptyQuery
1484        | Plan::ShowAllVariables
1485        | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1486        | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1487        | Plan::SetVariable(plan::SetVariablePlan {
1488            name: _,
1489            value: _,
1490            local: _,
1491        })
1492        | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1493        | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1494        | Plan::StartTransaction(plan::StartTransactionPlan {
1495            access: _,
1496            isolation_level: _,
1497        })
1498        | Plan::CommitTransaction(plan::CommitTransactionPlan {
1499            transaction_type: _,
1500        })
1501        | Plan::AbortTransaction(plan::AbortTransactionPlan {
1502            transaction_type: _,
1503        })
1504        | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1505        | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1506        | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1507        | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1508        | Plan::Declare(plan::DeclarePlan {
1509            name: _,
1510            stmt: _,
1511            sql: _,
1512            params: _,
1513        })
1514        | Plan::Fetch(plan::FetchPlan {
1515            name: _,
1516            count: _,
1517            timeout: _,
1518        })
1519        | Plan::Close(plan::ClosePlan { name: _ })
1520        | Plan::Prepare(plan::PreparePlan {
1521            name: _,
1522            stmt: _,
1523            desc: _,
1524            sql: _,
1525        })
1526        | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1527        | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1528        | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1529    }
1530}
1531
1532/// Reports whether any role has ownership over an object.
1533fn check_owner_roles(
1534    object_id: &ObjectId,
1535    role_ids: &BTreeSet<RoleId>,
1536    catalog: &impl SessionCatalog,
1537) -> bool {
1538    if let Some(owner_id) = catalog.get_owner_id(object_id) {
1539        role_ids.contains(&owner_id)
1540    } else {
1541        true
1542    }
1543}
1544
1545fn ownership_err(
1546    unheld_ownership: Vec<ObjectId>,
1547    catalog: &impl SessionCatalog,
1548) -> Result<(), UnauthorizedError> {
1549    if !unheld_ownership.is_empty() {
1550        let objects = unheld_ownership
1551            .into_iter()
1552            .map(|ownership| match ownership {
1553                ObjectId::Cluster(id) => (
1554                    ObjectType::Cluster,
1555                    catalog.get_cluster(id).name().to_string(),
1556                ),
1557                ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1558                    let cluster = catalog.get_cluster(cluster_id);
1559                    let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1560                    // Note: using unchecked here is okay because the values are coming from an
1561                    // already existing name.
1562                    let name = QualifiedReplica {
1563                        cluster: Ident::new_unchecked(cluster.name()),
1564                        replica: Ident::new_unchecked(replica.name()),
1565                    };
1566                    (ObjectType::ClusterReplica, name.to_string())
1567                }
1568                ObjectId::Database(id) => (
1569                    ObjectType::Database,
1570                    catalog.get_database(&id).name().to_string(),
1571                ),
1572                ObjectId::Schema((database_spec, schema_spec)) => {
1573                    let schema = catalog.get_schema(&database_spec, &schema_spec);
1574                    let name = catalog.resolve_full_schema_name(schema.name());
1575                    (ObjectType::Schema, name.to_string())
1576                }
1577                ObjectId::Item(id) => {
1578                    let item = catalog.get_item(&id);
1579                    let name = catalog.resolve_full_name(item.name());
1580                    (item.item_type().into(), name.to_string())
1581                }
1582                ObjectId::NetworkPolicy(id) => (
1583                    ObjectType::NetworkPolicy,
1584                    catalog.get_network_policy(&id).name().to_string(),
1585                ),
1586                ObjectId::Role(_) => unreachable!("roles have no owner"),
1587            })
1588            .collect();
1589        Err(UnauthorizedError::Ownership { objects })
1590    } else {
1591        Ok(())
1592    }
1593}
1594
1595fn generate_required_source_privileges(
1596    name: &QualifiedItemName,
1597    data_source: &DataSourceDesc,
1598    in_cluster: Option<ClusterId>,
1599    role_id: RoleId,
1600) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1601    let mut privileges = vec![(
1602        SystemObjectId::Object(name.qualifiers.clone().into()),
1603        AclMode::CREATE,
1604        role_id,
1605    )];
1606    match (data_source, in_cluster) {
1607        (_, Some(id)) => {
1608            privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1609        }
1610        (DataSourceDesc::Ingestion(_), None) => {
1611            privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1612        }
1613        // Non-ingestion data-sources have meaningless cluster config's (for now...) and they need
1614        // to be ignored.
1615        // This feels very brittle, but there's not much we can do until the UNDEFINED cluster
1616        // config is removed.
1617        (_, None) => {}
1618    }
1619    privileges
1620}
1621
1622/// Generates all the privileges required to execute a read that includes the objects in `ids`.
1623///
1624/// Not only do we need to validate that `role_id` has read privileges on all relations in `ids`,
1625/// but if any object is a view or materialized view then we need to validate that the owner of
1626/// that view has all of the privileges required to execute the query within the view.
1627///
1628/// For more details see: <https://www.postgresql.org/docs/15/rules-privileges.html>
1629fn generate_read_privileges(
1630    catalog: &impl SessionCatalog,
1631    ids: impl Iterator<Item = CatalogItemId>,
1632    role_id: RoleId,
1633) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1634    generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1635}
1636
1637fn generate_read_privileges_inner(
1638    catalog: &impl SessionCatalog,
1639    ids: impl Iterator<Item = CatalogItemId>,
1640    role_id: RoleId,
1641    seen: &mut BTreeSet<(ObjectId, RoleId)>,
1642) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1643    let mut privileges = Vec::new();
1644    let mut views = Vec::new();
1645
1646    for id in ids {
1647        if seen.insert((id.into(), role_id)) {
1648            let item = catalog.get_item(&id);
1649            let schema_id: ObjectId = item.name().qualifiers.clone().into();
1650            if seen.insert((schema_id.clone(), role_id)) {
1651                privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1652            }
1653            match item.item_type() {
1654                CatalogItemType::View
1655                | CatalogItemType::MaterializedView
1656                | CatalogItemType::ContinualTask => {
1657                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1658                    views.push((item.references().items().copied(), item.owner_id()));
1659                }
1660                CatalogItemType::Table | CatalogItemType::Source => {
1661                    privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1662                }
1663                CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1664                    privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1665                }
1666                CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1667            }
1668        }
1669    }
1670
1671    for (view_ids, view_owner) in views {
1672        privileges.extend_from_slice(&generate_read_privileges_inner(
1673            catalog, view_ids, view_owner, seen,
1674        ));
1675    }
1676
1677    privileges
1678}
1679
1680fn generate_usage_privileges(
1681    catalog: &impl SessionCatalog,
1682    ids: &ResolvedIds,
1683    role_id: RoleId,
1684    item_types: &BTreeSet<CatalogItemType>,
1685) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1686    // Use a `BTreeSet` to remove duplicate privileges.
1687    ids.items()
1688        .filter_map(move |id| {
1689            let item = catalog.get_item(id);
1690            if item_types.contains(&item.item_type()) {
1691                let schema_id = item.name().qualifiers.clone().into();
1692                Some([
1693                    (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1694                    (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1695                ])
1696            } else {
1697                None
1698            }
1699        })
1700        .flatten()
1701        .collect()
1702}
1703
1704fn generate_cluster_usage_privileges(
1705    expr_is_const: bool,
1706    target_cluster_id: Option<ClusterId>,
1707    role_id: RoleId,
1708) -> Option<(SystemObjectId, AclMode, RoleId)> {
1709    // TODO(jkosh44) expr hasn't been fully optimized yet, so it might actually be a constant,
1710    //  but we mistakenly think that it's not. For now it's ok to be overly protective.
1711    if !expr_is_const {
1712        if let Some(cluster_id) = target_cluster_id {
1713            return Some((
1714                SystemObjectId::Object(cluster_id.into()),
1715                AclMode::USAGE,
1716                role_id,
1717            ));
1718        }
1719    }
1720
1721    None
1722}
1723
1724fn check_object_privileges(
1725    catalog: &impl SessionCatalog,
1726    privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1727    role_membership: BTreeSet<RoleId>,
1728    current_role_id: RoleId,
1729) -> Result<(), UnauthorizedError> {
1730    let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1731    role_memberships.insert(current_role_id, role_membership);
1732    for (object_id, acl_mode, role_id) in privileges {
1733        let role_membership = role_memberships
1734            .entry(role_id)
1735            .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1736        let object_privileges = catalog
1737            .get_privileges(&object_id)
1738            .expect("only object types with privileges will generate required privileges");
1739        let role_privileges = role_membership
1740            .iter()
1741            .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1742            .map(|mz_acl_item| mz_acl_item.acl_mode)
1743            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1744        if !role_privileges.contains(acl_mode) {
1745            let role_name = catalog.get_role(&role_id).name().to_string();
1746            let privileges = acl_mode.to_error_string();
1747            return Err(UnauthorizedError::Privilege {
1748                object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1749                role_name,
1750                privileges,
1751            });
1752        }
1753    }
1754
1755    Ok(())
1756}
1757
1758pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1759    const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1760        .union(AclMode::SELECT)
1761        .union(AclMode::UPDATE)
1762        .union(AclMode::DELETE);
1763    const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1764    const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1765        .union(AclMode::CREATE_DB)
1766        .union(AclMode::CREATE_CLUSTER)
1767        .union(AclMode::CREATE_NETWORK_POLICY);
1768
1769    const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1770    match object_type {
1771        SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1772        SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1773        SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1774        SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1775        SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1776        SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1777        SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1778        SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1779        SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1780        SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1781        SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1782        SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1783        SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1784        SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1785        SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1786        SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1787        SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT,
1788        SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1789    }
1790}
1791
1792pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1793    MzAclItem {
1794        grantee: owner_id,
1795        grantor: owner_id,
1796        acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1797    }
1798}
1799
1800const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1801    match object_type {
1802        ObjectType::Table
1803        | ObjectType::View
1804        | ObjectType::MaterializedView
1805        | ObjectType::Source
1806        | ObjectType::ContinualTask => AclMode::SELECT,
1807        ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1808        ObjectType::Sink
1809        | ObjectType::Index
1810        | ObjectType::Role
1811        | ObjectType::Cluster
1812        | ObjectType::ClusterReplica
1813        | ObjectType::Secret
1814        | ObjectType::Connection
1815        | ObjectType::Database
1816        | ObjectType::Func
1817        | ObjectType::NetworkPolicy => AclMode::empty(),
1818    }
1819}
1820
1821pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1822    let acl_mode = default_builtin_object_acl_mode(object_type);
1823    MzAclItem {
1824        grantee: MZ_SUPPORT_ROLE_ID,
1825        grantor: MZ_SYSTEM_ROLE_ID,
1826        acl_mode,
1827    }
1828}
1829
1830pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1831    let acl_mode = default_builtin_object_acl_mode(object_type);
1832    MzAclItem {
1833        grantee: RoleId::Public,
1834        grantor: MZ_SYSTEM_ROLE_ID,
1835        acl_mode,
1836    }
1837}