1use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26 CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29 CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30 SchemaSpecifier, SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34 DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40fn rbac_check_preamble(
42 catalog: &impl SessionCatalog,
43 session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45 if catalog
50 .try_get_role(&session_meta.role_metadata().current_role)
51 .is_none()
52 {
53 return Err(UnauthorizedError::ConcurrentRoleDrop(
54 session_meta.role_metadata().current_role.clone(),
55 ));
56 };
57 if catalog
58 .try_get_role(&session_meta.role_metadata().session_role)
59 .is_none()
60 {
61 return Err(UnauthorizedError::ConcurrentRoleDrop(
62 session_meta.role_metadata().session_role.clone(),
63 ));
64 };
65 if catalog
66 .try_get_role(&session_meta.role_metadata().authenticated_role)
67 .is_none()
68 {
69 return Err(UnauthorizedError::ConcurrentRoleDrop(
70 session_meta.role_metadata().authenticated_role.clone(),
71 ));
72 };
73
74 Ok(())
75}
76
77fn filter_requirements(
79 catalog: &impl SessionCatalog,
80 session_meta: &dyn SessionMetadata,
81 rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83 let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86 && !session_meta.role_metadata().current_role.is_system()
87 && !session_meta.role_metadata().session_role.is_system();
88 let is_superuser = session_meta.is_superuser();
90 if is_rbac_disabled || is_superuser {
91 return rbac_requirements.filter_to_mandatory_requirements();
92 }
93
94 rbac_requirements
95}
96
97static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99 btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104 let mut items = DEFAULT_ITEM_USAGE.clone();
105 items.insert(CatalogItemType::Type);
106 items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110static RESTRICT_TO_USER_OBJECTS_ALLOWED_OIDS: LazyLock<BTreeSet<u32>> = LazyLock::new(|| {
119 use mz_pgrepr::oid;
120 btreeset! {
121 oid::VIEW_MZ_MCP_DATA_PRODUCTS_OID,
122 oid::VIEW_MZ_MCP_DATA_PRODUCT_DETAILS_OID,
123 oid::VIEW_MZ_SHOW_MY_CLUSTER_PRIVILEGES_OID,
124 }
125});
126
127#[derive(Debug, thiserror::Error)]
129pub enum UnauthorizedError {
130 #[error("permission denied to {action}")]
132 Superuser { action: String },
133 #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
135 Ownership { objects: Vec<(ObjectType, String)> },
136 #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
138 RoleMembership { role_names: Vec<String> },
139 #[error("permission denied for {object_description}")]
141 Privilege {
142 object_description: ErrorMessageObjectDescription,
143 role_name: String,
144 privileges: String,
145 },
146 #[error("permission denied to {action}")]
150 MzSystem { action: String },
151 #[error("permission denied to {action}")]
153 MzSupport { action: String },
154 #[error("role {0} was concurrently dropped")]
156 ConcurrentRoleDrop(RoleId),
157 #[error("access to system object {object_name} is restricted")]
159 RestrictedSystemObject { object_name: String },
160}
161
162impl UnauthorizedError {
163 pub fn detail(&self) -> Option<String> {
164 match &self {
165 UnauthorizedError::Superuser { action } => {
166 Some(format!("You must be a superuser to {}", action))
167 }
168 UnauthorizedError::Privilege {
169 object_description,
170 role_name,
171 privileges,
172 } => Some(format!(
173 "The '{role_name}' role needs {privileges} privileges on {object_description}"
174 )),
175 UnauthorizedError::MzSystem { .. } => {
176 Some(format!("You must be the '{}' role", SYSTEM_USER.name))
177 }
178 UnauthorizedError::MzSupport { .. } => Some(format!(
179 "The '{}' role has very limited privileges",
180 SUPPORT_USER.name
181 )),
182 UnauthorizedError::ConcurrentRoleDrop(_) => {
183 Some("Please disconnect and re-connect with a valid role.".into())
184 }
185 UnauthorizedError::RestrictedSystemObject { .. } => Some(
186 "Access to system catalog objects is restricted for this role. \
187 Contact your administrator if you need access."
188 .into(),
189 ),
190 UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
191 }
192 }
193}
194
195#[derive(Debug)]
197struct RbacRequirements {
198 role_membership: BTreeSet<RoleId>,
200 ownership: Vec<ObjectId>,
202 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
205 item_usage: &'static BTreeSet<CatalogItemType>,
210 superuser_action: Option<String>,
212}
213
214impl RbacRequirements {
215 fn empty() -> RbacRequirements {
216 RbacRequirements {
217 role_membership: BTreeSet::new(),
218 ownership: Vec::new(),
219 privileges: Vec::new(),
220 item_usage: &EMPTY_ITEM_USAGE,
221 superuser_action: None,
222 }
223 }
224
225 fn validate(
226 self,
227 catalog: &impl SessionCatalog,
228 session: &dyn SessionMetadata,
229 resolved_ids: &ResolvedIds,
230 ) -> Result<(), UnauthorizedError> {
231 let role_membership =
233 catalog.collect_role_membership(&session.role_metadata().current_role);
234
235 check_usage(catalog, session, resolved_ids, self.item_usage)?;
236
237 let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
240 if !unheld_membership.is_empty() {
241 let role_names = unheld_membership
242 .into_iter()
243 .map(|role_id| {
244 catalog
246 .try_get_role(role_id)
247 .map(|role| role.name().to_string())
248 .unwrap_or_else(|| role_id.to_string())
249 })
250 .collect();
251 return Err(UnauthorizedError::RoleMembership { role_names });
252 }
253
254 let unheld_ownership = self
257 .ownership
258 .into_iter()
259 .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
260 .collect();
261 ownership_err(unheld_ownership, catalog)?;
262
263 check_object_privileges(
264 catalog,
265 self.privileges,
266 role_membership,
267 session.role_metadata().current_role,
268 )?;
269
270 if let Some(action) = self.superuser_action {
271 return Err(UnauthorizedError::Superuser { action });
272 }
273
274 Ok(())
275 }
276
277 fn filter_to_mandatory_requirements(self) -> RbacRequirements {
278 let RbacRequirements {
279 role_membership,
280 ownership,
281 privileges,
282 item_usage,
283 superuser_action: _,
284 } = self;
285 let role_membership = role_membership
286 .into_iter()
287 .filter(|id| id.is_system())
288 .collect();
289 let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
290 let privileges = privileges
291 .into_iter()
292 .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
293 .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
295 .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
296 .collect();
297 let superuser_action = None;
298 RbacRequirements {
299 role_membership,
300 ownership,
301 privileges,
302 item_usage,
303 superuser_action,
304 }
305 }
306}
307
308impl Default for RbacRequirements {
309 fn default() -> Self {
310 RbacRequirements {
311 role_membership: BTreeSet::new(),
312 ownership: Vec::new(),
313 privileges: Vec::new(),
314 item_usage: &DEFAULT_ITEM_USAGE,
315 superuser_action: None,
316 }
317 }
318}
319
320fn check_restrict_to_user_objects(
328 catalog: &impl SessionCatalog,
329 session: &dyn SessionMetadata,
330 resolved_ids: &ResolvedIds,
331) -> Result<(), UnauthorizedError> {
332 if !session.restrict_to_user_objects() {
333 return Ok(());
334 }
335 for item_id in resolved_ids.items() {
336 if item_id.is_system() {
337 if let Some(item) = catalog.try_get_item(item_id) {
338 match item.item_type() {
339 CatalogItemType::Func | CatalogItemType::Type => {}
340 _ => {
341 if RESTRICT_TO_USER_OBJECTS_ALLOWED_OIDS.contains(&item.oid()) {
342 continue;
343 }
344 return Err(UnauthorizedError::RestrictedSystemObject {
345 object_name: item.name().item.clone(),
346 });
347 }
348 }
349 }
350 }
351 }
352 Ok(())
353}
354
355pub fn check_usage(
357 catalog: &impl SessionCatalog,
358 session: &dyn SessionMetadata,
359 resolved_ids: &ResolvedIds,
360 item_types: &BTreeSet<CatalogItemType>,
361) -> Result<(), UnauthorizedError> {
362 rbac_check_preamble(catalog, session)?;
363
364 check_restrict_to_user_objects(catalog, session, resolved_ids)?;
366
367 let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
369
370 let existing_resolved_ids =
373 resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
374
375 let required_privileges = generate_usage_privileges(
376 catalog,
377 &existing_resolved_ids,
378 session.role_metadata().current_role,
379 item_types,
380 )
381 .into_iter()
382 .collect();
383
384 let mut rbac_requirements = RbacRequirements::empty();
385 rbac_requirements.privileges = required_privileges;
386 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
387 let required_privileges = rbac_requirements.privileges;
388
389 check_object_privileges(
390 catalog,
391 required_privileges,
392 role_membership,
393 session.role_metadata().current_role,
394 )?;
395
396 Ok(())
397}
398
399pub fn check_plan(
406 catalog: &impl SessionCatalog,
407 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
413 session: &dyn SessionMetadata,
414 plan: &Plan,
415 target_cluster_id: Option<ClusterId>,
416 resolved_ids: &ResolvedIds,
417 sql_impl_resolved_ids: &ResolvedIds,
418) -> Result<(), UnauthorizedError> {
419 rbac_check_preamble(catalog, session)?;
420
421 check_restrict_to_user_objects(catalog, session, sql_impl_resolved_ids)?;
425
426 let rbac_requirements = generate_rbac_requirements(
427 catalog,
428 plan,
429 active_conns,
430 target_cluster_id,
431 session.role_metadata().current_role,
432 );
433 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
434 debug!(
435 "rbac requirements {rbac_requirements:?} for plan {:?}",
436 PlanKind::from(plan)
437 );
438 rbac_requirements.validate(catalog, session, resolved_ids)
439}
440
441pub fn is_rbac_enabled_for_session(
443 system_vars: &SystemVars,
444 session: &dyn SessionMetadata,
445) -> bool {
446 let server_enabled = system_vars.enable_rbac_checks();
447 let session_enabled = session.enable_session_rbac_checks();
448
449 server_enabled || session_enabled
452}
453
454fn generate_rbac_requirements(
456 catalog: &impl SessionCatalog,
457 plan: &Plan,
458 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
459 target_cluster_id: Option<ClusterId>,
460 role_id: RoleId,
461) -> RbacRequirements {
462 match plan {
463 Plan::CreateConnection(plan::CreateConnectionPlan {
464 name,
465 if_not_exists: _,
466 connection: _,
467 validate: _,
468 }) => RbacRequirements {
469 privileges: vec![(
470 SystemObjectId::Object(name.qualifiers.clone().into()),
471 AclMode::CREATE,
472 role_id,
473 )],
474 item_usage: &CREATE_ITEM_USAGE,
475 ..Default::default()
476 },
477 Plan::CreateDatabase(plan::CreateDatabasePlan {
478 name: _,
479 if_not_exists: _,
480 }) => RbacRequirements {
481 privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
482 item_usage: &CREATE_ITEM_USAGE,
483 ..Default::default()
484 },
485 Plan::CreateSchema(plan::CreateSchemaPlan {
486 database_spec,
487 schema_name: _,
488 if_not_exists: _,
489 }) => {
490 let privileges = match database_spec {
491 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
492 ResolvedDatabaseSpecifier::Id(database_id) => {
493 vec![(
494 SystemObjectId::Object(database_id.into()),
495 AclMode::CREATE,
496 role_id,
497 )]
498 }
499 };
500 RbacRequirements {
501 privileges,
502 item_usage: &CREATE_ITEM_USAGE,
503 ..Default::default()
504 }
505 }
506 Plan::CreateRole(plan::CreateRolePlan {
507 name: _,
508 attributes,
509 }) => {
510 if attributes.superuser.unwrap_or(false) {
511 RbacRequirements {
512 superuser_action: Some("create superuser role".to_string()),
513 ..Default::default()
514 }
515 } else {
516 RbacRequirements {
517 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
518 item_usage: &CREATE_ITEM_USAGE,
519 ..Default::default()
520 }
521 }
522 }
523 Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
524 privileges: vec![(
525 SystemObjectId::System,
526 AclMode::CREATE_NETWORK_POLICY,
527 role_id,
528 )],
529 item_usage: &CREATE_ITEM_USAGE,
530 ..Default::default()
531 },
532 Plan::CreateCluster(plan::CreateClusterPlan {
533 name: _,
534 variant: _,
535 workload_class: _,
536 }) => RbacRequirements {
537 privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
538 item_usage: &CREATE_ITEM_USAGE,
539 ..Default::default()
540 },
541 Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
542 cluster_id,
543 name: _,
544 config: _,
545 }) => RbacRequirements {
546 ownership: vec![ObjectId::Cluster(*cluster_id)],
547 item_usage: &CREATE_ITEM_USAGE,
548 ..Default::default()
549 },
550 Plan::CreateSource(plan::CreateSourcePlan {
551 name,
552 source,
553 if_not_exists: _,
554 timeline: _,
555 in_cluster,
556 }) => RbacRequirements {
557 privileges: generate_required_source_privileges(
558 name,
559 &source.data_source,
560 *in_cluster,
561 role_id,
562 ),
563 item_usage: &CREATE_ITEM_USAGE,
564 ..Default::default()
565 },
566 Plan::CreateSources(plans) => RbacRequirements {
567 privileges: plans
568 .iter()
569 .flat_map(
570 |plan::CreateSourcePlanBundle {
571 item_id: _,
572 global_id: _,
573 plan:
574 plan::CreateSourcePlan {
575 name,
576 source,
577 if_not_exists: _,
578 timeline: _,
579 in_cluster,
580 },
581 resolved_ids: _,
582 available_source_references: _,
583 }| {
584 generate_required_source_privileges(
585 name,
586 &source.data_source,
587 *in_cluster,
588 role_id,
589 )
590 .into_iter()
591 },
592 )
593 .collect(),
594 item_usage: &CREATE_ITEM_USAGE,
595 ..Default::default()
596 },
597 Plan::CreateSecret(plan::CreateSecretPlan {
598 name,
599 secret: _,
600 if_not_exists: _,
601 }) => RbacRequirements {
602 privileges: vec![(
603 SystemObjectId::Object(name.qualifiers.clone().into()),
604 AclMode::CREATE,
605 role_id,
606 )],
607 item_usage: &CREATE_ITEM_USAGE,
608 ..Default::default()
609 },
610 Plan::CreateSink(plan::CreateSinkPlan {
611 name,
612 sink,
613 with_snapshot: _,
614 if_not_exists: _,
615 in_cluster,
616 }) => {
617 let mut privileges = vec![(
618 SystemObjectId::Object(name.qualifiers.clone().into()),
619 AclMode::CREATE,
620 role_id,
621 )];
622 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
623 privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
624 privileges.push((
625 SystemObjectId::Object(in_cluster.into()),
626 AclMode::CREATE,
627 role_id,
628 ));
629 RbacRequirements {
630 privileges,
631 item_usage: &CREATE_ITEM_USAGE,
632 ..Default::default()
633 }
634 }
635 Plan::CreateTable(plan::CreateTablePlan {
636 name,
637 table: _,
638 if_not_exists: _,
639 }) => RbacRequirements {
640 privileges: vec![(
641 SystemObjectId::Object(name.qualifiers.clone().into()),
642 AclMode::CREATE,
643 role_id,
644 )],
645 item_usage: &CREATE_ITEM_USAGE,
646 ..Default::default()
647 },
648 Plan::CreateView(plan::CreateViewPlan {
649 name,
650 view: _,
651 replace,
652 drop_ids: _,
653 if_not_exists: _,
654 ambiguous_columns: _,
655 }) => RbacRequirements {
656 ownership: replace
657 .map(|id| vec![ObjectId::Item(id)])
658 .unwrap_or_default(),
659 privileges: vec![(
660 SystemObjectId::Object(name.qualifiers.clone().into()),
661 AclMode::CREATE,
662 role_id,
663 )],
664 item_usage: &CREATE_ITEM_USAGE,
665 ..Default::default()
666 },
667 Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
668 name,
669 materialized_view,
670 replace,
671 drop_ids: _,
672 if_not_exists: _,
673 ambiguous_columns: _,
674 }) => RbacRequirements {
675 ownership: replace
676 .map(|id| vec![ObjectId::Item(id)])
677 .unwrap_or_default(),
678 privileges: vec![
679 (
680 SystemObjectId::Object(name.qualifiers.clone().into()),
681 AclMode::CREATE,
682 role_id,
683 ),
684 (
685 SystemObjectId::Object(materialized_view.cluster_id.into()),
686 AclMode::CREATE,
687 role_id,
688 ),
689 ],
690 item_usage: &CREATE_ITEM_USAGE,
691 ..Default::default()
692 },
693 Plan::CreateIndex(plan::CreateIndexPlan {
694 name,
695 index,
696 if_not_exists: _,
697 }) => {
698 let index_on_item = catalog.resolve_item_id(&index.on);
699 RbacRequirements {
700 ownership: vec![ObjectId::Item(index_on_item)],
701 privileges: vec![
702 (
703 SystemObjectId::Object(name.qualifiers.clone().into()),
704 AclMode::CREATE,
705 role_id,
706 ),
707 (
708 SystemObjectId::Object(index.cluster_id.into()),
709 AclMode::CREATE,
710 role_id,
711 ),
712 ],
713 item_usage: &CREATE_ITEM_USAGE,
714 ..Default::default()
715 }
716 }
717 Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
718 privileges: vec![(
719 SystemObjectId::Object(name.qualifiers.clone().into()),
720 AclMode::CREATE,
721 role_id,
722 )],
723 item_usage: &CREATE_ITEM_USAGE,
724 ..Default::default()
725 },
726 Plan::Comment(plan::CommentPlan {
727 object_id,
728 sub_component: _,
729 comment: _,
730 }) => {
731 let (ownership, privileges) = match object_id {
732 CommentObjectId::Role(_) => (
735 Vec::new(),
736 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
737 ),
738 _ => (vec![ObjectId::from(*object_id)], Vec::new()),
739 };
740 RbacRequirements {
741 ownership,
742 privileges,
743 ..Default::default()
744 }
745 }
746 Plan::DropObjects(plan::DropObjectsPlan {
747 referenced_ids,
748 drop_ids: _,
749 object_type,
750 }) => {
751 let privileges = if object_type == &ObjectType::Role {
752 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
753 } else {
754 referenced_ids
755 .iter()
756 .filter_map(|id| match id {
757 ObjectId::ClusterReplica((cluster_id, _)) => Some((
758 SystemObjectId::Object(cluster_id.into()),
759 AclMode::USAGE,
760 role_id,
761 )),
762 ObjectId::Schema((database_spec, _)) => match database_spec {
763 ResolvedDatabaseSpecifier::Ambient => None,
764 ResolvedDatabaseSpecifier::Id(database_id) => Some((
765 SystemObjectId::Object(database_id.into()),
766 AclMode::USAGE,
767 role_id,
768 )),
769 },
770 ObjectId::Item(item_id) => {
771 let item = catalog.get_item(item_id);
772 Some((
773 SystemObjectId::Object(item.name().qualifiers.clone().into()),
774 AclMode::USAGE,
775 role_id,
776 ))
777 }
778 ObjectId::Cluster(_)
779 | ObjectId::Database(_)
780 | ObjectId::Role(_)
781 | ObjectId::NetworkPolicy(_) => None,
782 })
783 .collect()
784 };
785 RbacRequirements {
786 ownership: referenced_ids.clone(),
788 privileges,
789 ..Default::default()
790 }
791 }
792 Plan::DropOwned(plan::DropOwnedPlan {
793 role_ids,
794 drop_ids: _,
795 privilege_revokes: _,
796 default_privilege_revokes: _,
797 }) => RbacRequirements {
798 role_membership: role_ids.into_iter().cloned().collect(),
799 ..Default::default()
800 },
801 Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
802 let container_id = match id {
803 ObjectId::Item(id) => Some(SystemObjectId::Object(
804 catalog.get_item(id).name().qualifiers.clone().into(),
805 )),
806 ObjectId::Schema((database_id, _schema_id)) => match database_id {
807 ResolvedDatabaseSpecifier::Ambient => None,
808 ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
809 },
810 ObjectId::Cluster(_)
811 | ObjectId::ClusterReplica(_)
812 | ObjectId::Database(_)
813 | ObjectId::Role(_)
814 | ObjectId::NetworkPolicy(_) => None,
815 };
816 let privileges = match container_id {
817 Some(id) => vec![(id, AclMode::USAGE, role_id)],
818 None => Vec::new(),
819 };
820 RbacRequirements {
821 privileges,
822 item_usage: &EMPTY_ITEM_USAGE,
823 ..Default::default()
824 }
825 }
826 Plan::ShowColumns(plan::ShowColumnsPlan {
827 id,
828 select_plan,
829 new_resolved_ids: _,
830 }) => {
831 let mut privileges = vec![(
832 SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
833 AclMode::USAGE,
834 role_id,
835 )];
836
837 for privilege in generate_rbac_requirements(
838 catalog,
839 &Plan::Select(select_plan.clone()),
840 active_conns,
841 target_cluster_id,
842 role_id,
843 )
844 .privileges
845 {
846 privileges.push(privilege);
847 }
848 RbacRequirements {
849 privileges,
850 ..Default::default()
851 }
852 }
853 Plan::Select(plan::SelectPlan {
854 source,
855 select: _,
856 when: _,
857 finishing: _,
858 copy_to: _,
859 }) => {
860 let items = source
861 .depends_on()
862 .into_iter()
863 .map(|gid| catalog.resolve_item_id(&gid));
864 let mut privileges = generate_read_privileges(catalog, items, role_id);
865 if let Some(privilege) = generate_cluster_usage_privileges(
866 source.as_const().is_some(),
867 target_cluster_id,
868 role_id,
869 ) {
870 privileges.push(privilege);
871 }
872 RbacRequirements {
873 privileges,
874 ..Default::default()
875 }
876 }
877 Plan::Subscribe(plan::SubscribePlan {
878 from,
879 with_snapshot: _,
880 when: _,
881 up_to: _,
882 copy_to: _,
883 emit_progress: _,
884 output: _,
885 }) => {
886 let items = from
887 .depends_on()
888 .into_iter()
889 .map(|gid| catalog.resolve_item_id(&gid));
890 let mut privileges = generate_read_privileges(catalog, items, role_id);
891 if let Some(cluster_id) = target_cluster_id {
892 privileges.push((
893 SystemObjectId::Object(cluster_id.into()),
894 AclMode::USAGE,
895 role_id,
896 ));
897 }
898 RbacRequirements {
899 privileges,
900 ..Default::default()
901 }
902 }
903 Plan::CopyFrom(plan::CopyFromPlan {
904 target_name: _,
905 target_id,
906 source: _,
907 columns: _,
908 source_desc: _,
909 mfp: _,
910 params: _,
911 filter: _,
912 }) => RbacRequirements {
913 privileges: vec![
914 (
915 SystemObjectId::Object(
916 catalog.get_item(target_id).name().qualifiers.clone().into(),
917 ),
918 AclMode::USAGE,
919 role_id,
920 ),
921 (
922 SystemObjectId::Object(target_id.into()),
923 AclMode::INSERT,
924 role_id,
925 ),
926 ],
927 ..Default::default()
928 },
929 Plan::CopyTo(plan::CopyToPlan {
930 select_plan,
931 desc: _,
932 to: _,
933 connection: _,
934 connection_id: _,
935 format: _,
936 max_file_size: _,
937 }) => {
938 let items = select_plan
939 .source
940 .depends_on()
941 .into_iter()
942 .map(|gid| catalog.resolve_item_id(&gid));
943 let mut privileges = generate_read_privileges(catalog, items, role_id);
944 if let Some(cluster_id) = target_cluster_id {
945 privileges.push((
946 SystemObjectId::Object(cluster_id.into()),
947 AclMode::USAGE,
948 role_id,
949 ));
950 }
951 RbacRequirements {
952 privileges,
953 ..Default::default()
954 }
955 }
956 Plan::ExplainPlan(plan::ExplainPlanPlan {
957 stage: _,
958 format: _,
959 config: _,
960 explainee,
961 })
962 | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
963 privileges: match explainee {
964 Explainee::View(id)
965 | Explainee::MaterializedView(id)
966 | Explainee::Index(id)
967 | Explainee::ReplanView(id)
968 | Explainee::ReplanMaterializedView(id)
969 | Explainee::ReplanIndex(id) => {
970 let item = catalog.get_item(id);
971 let schema_id: ObjectId = item.name().qualifiers.clone().into();
972 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
973 }
974 Explainee::Statement(stmt) => stmt
975 .depends_on()
976 .into_iter()
977 .map(|id| {
978 let item = catalog.get_item_by_global_id(&id);
979 let schema_id: ObjectId = item.name().qualifiers.clone().into();
980 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
981 })
982 .collect(),
983 },
984 item_usage: match explainee {
985 Explainee::View(..)
986 | Explainee::MaterializedView(..)
987 | Explainee::Index(..)
988 | Explainee::ReplanView(..)
989 | Explainee::ReplanMaterializedView(..)
990 | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
991 Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
992 },
993 ..Default::default()
994 },
995 Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
996 RbacRequirements {
997 privileges: {
998 let item = catalog.get_item_by_global_id(sink_from);
999 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1000 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
1001 },
1002 item_usage: &EMPTY_ITEM_USAGE,
1003 ..Default::default()
1004 }
1005 }
1006 Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
1007 format: _,
1008 raw_plan,
1009 when: _,
1010 }) => RbacRequirements {
1011 privileges: raw_plan
1012 .depends_on()
1013 .into_iter()
1014 .map(|id| {
1015 let item = catalog.get_item_by_global_id(&id);
1016 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1017 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
1018 })
1019 .collect(),
1020 ..Default::default()
1021 },
1022 Plan::Insert(plan::InsertPlan {
1023 id,
1024 values,
1025 returning,
1026 }) => {
1027 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1028 let mut privileges = vec![
1029 (
1030 SystemObjectId::Object(schema_id.clone()),
1031 AclMode::USAGE,
1032 role_id,
1033 ),
1034 (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
1035 ];
1036 let mut seen = BTreeSet::from([(schema_id, role_id)]);
1037
1038 if returning
1041 .iter()
1042 .any(|assignment| assignment.contains_column())
1043 {
1044 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1045 seen.insert((id.into(), role_id));
1046 }
1047
1048 let items = values
1049 .depends_on()
1050 .into_iter()
1051 .map(|gid| catalog.resolve_item_id(&gid));
1052 privileges.extend_from_slice(&generate_read_privileges_inner(
1053 catalog, items, role_id, &mut seen,
1054 ));
1055
1056 if let Some(privilege) = generate_cluster_usage_privileges(
1057 values.as_const().is_some(),
1058 target_cluster_id,
1059 role_id,
1060 ) {
1061 privileges.push(privilege);
1062 } else if !returning.is_empty() {
1063 if let Some(cluster_id) = target_cluster_id {
1066 privileges.push((
1067 SystemObjectId::Object(cluster_id.into()),
1068 AclMode::USAGE,
1069 role_id,
1070 ));
1071 }
1072 }
1073 RbacRequirements {
1074 privileges,
1075 ..Default::default()
1076 }
1077 }
1078 Plan::AlterCluster(plan::AlterClusterPlan {
1079 id,
1080 name: _,
1081 options: _,
1082 strategy: _,
1083 }) => RbacRequirements {
1084 ownership: vec![ObjectId::Cluster(*id)],
1085 item_usage: &CREATE_ITEM_USAGE,
1086 ..Default::default()
1087 },
1088 Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1089 ownership: vec![ObjectId::Item(*id)],
1090 privileges: vec![(
1091 SystemObjectId::Object(set_cluster.into()),
1092 AclMode::CREATE,
1093 role_id,
1094 )],
1095 item_usage: &CREATE_ITEM_USAGE,
1096 ..Default::default()
1097 },
1098 Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1099 id,
1100 window: _,
1101 value: _,
1102 object_type: _,
1103 }) => RbacRequirements {
1104 ownership: vec![ObjectId::Item(*id)],
1105 item_usage: &CREATE_ITEM_USAGE,
1106 ..Default::default()
1107 },
1108 Plan::AlterSourceTimestampInterval(plan::AlterSourceTimestampIntervalPlan {
1109 id,
1110 value: _,
1111 interval: _,
1112 }) => RbacRequirements {
1113 ownership: vec![ObjectId::Item(*id)],
1114 item_usage: &CREATE_ITEM_USAGE,
1115 ..Default::default()
1116 },
1117 Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1118 ownership: vec![ObjectId::Item(*id)],
1119 ..Default::default()
1120 },
1121 Plan::AlterSource(plan::AlterSourcePlan {
1122 item_id,
1123 ingestion_id: _,
1124 action: _,
1125 }) => RbacRequirements {
1126 ownership: vec![ObjectId::Item(*item_id)],
1127 item_usage: &CREATE_ITEM_USAGE,
1128 ..Default::default()
1129 },
1130 Plan::AlterSink(plan::AlterSinkPlan {
1131 item_id,
1132 global_id: _,
1133 sink,
1134 with_snapshot: _,
1135 in_cluster,
1136 }) => {
1137 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1138 let mut privileges = generate_read_privileges(catalog, items, role_id);
1139 privileges.push((
1140 SystemObjectId::Object(in_cluster.into()),
1141 AclMode::CREATE,
1142 role_id,
1143 ));
1144 RbacRequirements {
1145 ownership: vec![ObjectId::Item(*item_id)],
1146 privileges,
1147 item_usage: &CREATE_ITEM_USAGE,
1148 ..Default::default()
1149 }
1150 }
1151 Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1152 id,
1153 name: _,
1154 to_name: _,
1155 }) => RbacRequirements {
1156 ownership: vec![ObjectId::Cluster(*id)],
1157 ..Default::default()
1158 },
1159 Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1160 id_a,
1161 id_b,
1162 name_a: _,
1163 name_b: _,
1164 name_temp: _,
1165 }) => RbacRequirements {
1166 ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1167 ..Default::default()
1168 },
1169 Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1170 cluster_id,
1171 replica_id,
1172 name: _,
1173 to_name: _,
1174 }) => RbacRequirements {
1175 ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1176 ..Default::default()
1177 },
1178 Plan::AlterItemRename(plan::AlterItemRenamePlan {
1179 id,
1180 current_full_name: _,
1181 to_name: _,
1182 object_type: _,
1183 }) => RbacRequirements {
1184 ownership: vec![ObjectId::Item(*id)],
1185 ..Default::default()
1186 },
1187 Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1188 cur_schema_spec,
1189 new_schema_name: _,
1190 }) => {
1191 let privileges = match cur_schema_spec.0 {
1192 ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1193 SystemObjectId::Object(ObjectId::Database(db_id)),
1194 AclMode::CREATE,
1195 role_id,
1196 )],
1197 ResolvedDatabaseSpecifier::Ambient => vec![],
1198 };
1199
1200 RbacRequirements {
1201 ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1202 privileges,
1203 ..Default::default()
1204 }
1205 }
1206 Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1207 schema_a_spec,
1208 schema_a_name: _,
1209 schema_b_spec,
1210 schema_b_name: _,
1211 name_temp: _,
1212 }) => {
1213 let mut privileges = vec![];
1214 if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1215 privileges.push((
1216 SystemObjectId::Object(ObjectId::Database(id_a)),
1217 AclMode::CREATE,
1218 role_id,
1219 ));
1220 }
1221 if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1222 privileges.push((
1223 SystemObjectId::Object(ObjectId::Database(id_b)),
1224 AclMode::CREATE,
1225 role_id,
1226 ));
1227 }
1228
1229 RbacRequirements {
1230 ownership: vec![
1231 ObjectId::Schema(*schema_a_spec),
1232 ObjectId::Schema(*schema_b_spec),
1233 ],
1234 privileges,
1235 ..Default::default()
1236 }
1237 }
1238 Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1239 ownership: vec![ObjectId::Item(*id)],
1240 item_usage: &CREATE_ITEM_USAGE,
1241 ..Default::default()
1242 },
1243 Plan::AlterRole(plan::AlterRolePlan {
1244 id,
1245 name: _,
1246 option,
1247 }) => match option {
1248 plan::PlannedAlterRoleOption::Attributes(attributes)
1250 if attributes.superuser.is_some() =>
1251 {
1252 RbacRequirements {
1253 superuser_action: Some("alter superuser role".to_string()),
1254 ..Default::default()
1255 }
1256 }
1257 plan::PlannedAlterRoleOption::Attributes(plan::PlannedRoleAttributes {
1260 password,
1261 scram_iterations: _,
1264 nopassword: _,
1265 superuser: None,
1268 inherit: None,
1269 login: None,
1270 }) if password.is_some() && role_id == *id => RbacRequirements::default(),
1271 plan::PlannedAlterRoleOption::Attributes(attributes)
1273 if attributes.password.is_some() && role_id != *id =>
1274 {
1275 RbacRequirements {
1276 superuser_action: Some("alter password of role".to_string()),
1277 ..Default::default()
1278 }
1279 }
1280 plan::PlannedAlterRoleOption::Variable(var)
1285 if var.name().eq_ignore_ascii_case("restrict_to_user_objects") =>
1286 {
1287 RbacRequirements {
1288 superuser_action: Some("set restrict_to_user_objects".to_string()),
1289 ..Default::default()
1290 }
1291 }
1292 plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1294 RbacRequirements::default()
1295 }
1296 _ => RbacRequirements {
1298 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1299 item_usage: &CREATE_ITEM_USAGE,
1300 ..Default::default()
1301 },
1302 },
1303 Plan::AlterOwner(plan::AlterOwnerPlan {
1304 id,
1305 object_type: _,
1306 new_owner,
1307 }) => {
1308 let privileges = match id {
1309 ObjectId::ClusterReplica((cluster_id, _)) => {
1310 vec![(
1311 SystemObjectId::Object(cluster_id.into()),
1312 AclMode::CREATE,
1313 role_id,
1314 )]
1315 }
1316 ObjectId::Schema((database_spec, _)) => match database_spec {
1317 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1318 ResolvedDatabaseSpecifier::Id(database_id) => {
1319 vec![(
1320 SystemObjectId::Object(database_id.into()),
1321 AclMode::CREATE,
1322 role_id,
1323 )]
1324 }
1325 },
1326 ObjectId::Item(item_id) => {
1327 let item = catalog.get_item(item_id);
1328 vec![(
1329 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1330 AclMode::CREATE,
1331 role_id,
1332 )]
1333 }
1334 ObjectId::Cluster(_)
1335 | ObjectId::Database(_)
1336 | ObjectId::Role(_)
1337 | ObjectId::NetworkPolicy(_) => Vec::new(),
1338 };
1339 RbacRequirements {
1340 role_membership: BTreeSet::from([*new_owner]),
1341 ownership: vec![id.clone()],
1342 privileges,
1343 ..Default::default()
1344 }
1345 }
1346 Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1347 ownership: vec![ObjectId::Item(*relation_id)],
1348 item_usage: &CREATE_ITEM_USAGE,
1349 ..Default::default()
1350 },
1351 Plan::AlterMaterializedViewApplyReplacement(
1352 plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1353 ) => RbacRequirements {
1354 ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1355 item_usage: &CREATE_ITEM_USAGE,
1356 ..Default::default()
1357 },
1358 Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1359 ownership: vec![ObjectId::NetworkPolicy(*id)],
1360 item_usage: &CREATE_ITEM_USAGE,
1361 ..Default::default()
1362 },
1363 Plan::ReadThenWrite(plan::ReadThenWritePlan {
1364 id,
1365 selection,
1366 finishing: _,
1367 assignments,
1368 kind,
1369 returning,
1370 }) => {
1371 let acl_mode = match kind {
1372 MutationKind::Insert => AclMode::INSERT,
1373 MutationKind::Update => AclMode::UPDATE,
1374 MutationKind::Delete => AclMode::DELETE,
1375 };
1376 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1377 let mut privileges = vec![
1378 (
1379 SystemObjectId::Object(schema_id.clone()),
1380 AclMode::USAGE,
1381 role_id,
1382 ),
1383 (SystemObjectId::Object(id.into()), acl_mode, role_id),
1384 ];
1385 let mut seen = BTreeSet::from([(schema_id, role_id)]);
1386
1387 if assignments
1390 .values()
1391 .chain(returning.iter())
1392 .any(|assignment| assignment.contains_column())
1393 {
1394 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1395 seen.insert((id.into(), role_id));
1396 }
1397
1398 let items = selection
1405 .depends_on()
1406 .into_iter()
1407 .map(|gid| catalog.resolve_item_id(&gid));
1408 privileges.extend_from_slice(&generate_read_privileges_inner(
1409 catalog, items, role_id, &mut seen,
1410 ));
1411
1412 if let Some(privilege) = generate_cluster_usage_privileges(
1413 selection.as_const().is_some(),
1414 target_cluster_id,
1415 role_id,
1416 ) {
1417 privileges.push(privilege);
1418 }
1419 RbacRequirements {
1420 privileges,
1421 ..Default::default()
1422 }
1423 }
1424 Plan::GrantRole(plan::GrantRolePlan {
1425 role_ids: _,
1426 member_ids: _,
1427 grantor_id: _,
1428 })
1429 | Plan::RevokeRole(plan::RevokeRolePlan {
1430 role_ids: _,
1431 member_ids: _,
1432 grantor_id: _,
1433 }) => RbacRequirements {
1434 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1435 ..Default::default()
1436 },
1437 Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1438 update_privileges,
1439 grantees: _,
1440 })
1441 | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1442 update_privileges,
1443 revokees: _,
1444 }) => {
1445 let mut privileges = Vec::with_capacity(update_privileges.len());
1446 for UpdatePrivilege { target_id, .. } in update_privileges {
1447 match target_id {
1448 SystemObjectId::Object(object_id) => match object_id {
1449 ObjectId::ClusterReplica((cluster_id, _)) => {
1450 privileges.push((
1451 SystemObjectId::Object(cluster_id.into()),
1452 AclMode::USAGE,
1453 role_id,
1454 ));
1455 }
1456 ObjectId::Schema((database_spec, _)) => match database_spec {
1457 ResolvedDatabaseSpecifier::Ambient => {}
1458 ResolvedDatabaseSpecifier::Id(database_id) => {
1459 privileges.push((
1460 SystemObjectId::Object(database_id.into()),
1461 AclMode::USAGE,
1462 role_id,
1463 ));
1464 }
1465 },
1466 ObjectId::Item(item_id) => {
1467 let item = catalog.get_item(item_id);
1468 privileges.push((
1469 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1470 AclMode::USAGE,
1471 role_id,
1472 ))
1473 }
1474 ObjectId::Cluster(_)
1475 | ObjectId::Database(_)
1476 | ObjectId::Role(_)
1477 | ObjectId::NetworkPolicy(_) => {}
1478 },
1479 SystemObjectId::System => {}
1480 }
1481 }
1482 RbacRequirements {
1483 ownership: update_privileges
1484 .iter()
1485 .filter_map(|update_privilege| update_privilege.target_id.object_id())
1486 .cloned()
1487 .collect(),
1488 privileges,
1489 superuser_action: if update_privileges
1494 .iter()
1495 .any(|update_privilege| update_privilege.target_id.is_system())
1496 {
1497 Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1498 } else {
1499 None
1500 },
1501 ..Default::default()
1502 }
1503 }
1504 Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1505 privilege_objects,
1506 privilege_acl_items: _,
1507 is_grant: _,
1508 }) => RbacRequirements {
1509 role_membership: privilege_objects
1510 .iter()
1511 .map(|privilege_object| privilege_object.role_id)
1512 .collect(),
1513 privileges: privilege_objects
1514 .into_iter()
1515 .filter_map(|privilege_object| {
1516 if let (Some(database_id), Some(_)) =
1517 (privilege_object.database_id, privilege_object.schema_id)
1518 {
1519 Some((
1520 SystemObjectId::Object(database_id.into()),
1521 AclMode::USAGE,
1522 role_id,
1523 ))
1524 } else {
1525 None
1526 }
1527 })
1528 .collect(),
1529 superuser_action: if privilege_objects
1535 .iter()
1536 .any(|privilege_object| privilege_object.role_id.is_public())
1537 {
1538 Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1539 } else {
1540 None
1541 },
1542 ..Default::default()
1543 },
1544 Plan::ReassignOwned(plan::ReassignOwnedPlan {
1545 old_roles,
1546 new_role,
1547 reassign_ids: _,
1548 }) => RbacRequirements {
1549 role_membership: old_roles
1550 .into_iter()
1551 .cloned()
1552 .chain(iter::once(*new_role))
1553 .collect(),
1554 ..Default::default()
1555 },
1556 Plan::SideEffectingFunc(func) => {
1557 let role_membership = match func {
1558 SideEffectingFunc::PgCancelBackend {
1561 connection_id: None,
1562 } => BTreeSet::new(),
1563 SideEffectingFunc::PgCancelBackend {
1564 connection_id: Some(connection_id),
1565 } => active_conns.expect("active_conns is required for Plan::SideEffectingFunc")(
1566 *connection_id,
1567 )
1568 .map(|x| [x].into())
1569 .unwrap_or_default(),
1570 };
1571 RbacRequirements {
1572 role_membership,
1573 ..Default::default()
1574 }
1575 }
1576 Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1577 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1578 RbacRequirements {
1579 privileges: vec![
1580 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1581 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1582 ],
1583 ..Default::default()
1584 }
1585 }
1586 Plan::DiscardTemp
1587 | Plan::DiscardAll
1588 | Plan::EmptyQuery
1589 | Plan::ShowAllVariables
1590 | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1591 | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1592 | Plan::SetVariable(plan::SetVariablePlan {
1593 name: _,
1594 value: _,
1595 local: _,
1596 })
1597 | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1598 | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1599 | Plan::StartTransaction(plan::StartTransactionPlan {
1600 access: _,
1601 isolation_level: _,
1602 })
1603 | Plan::CommitTransaction(plan::CommitTransactionPlan {
1604 transaction_type: _,
1605 })
1606 | Plan::AbortTransaction(plan::AbortTransactionPlan {
1607 transaction_type: _,
1608 })
1609 | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1610 | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1611 | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1612 | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1613 | Plan::Declare(plan::DeclarePlan {
1614 name: _,
1615 stmt: _,
1616 sql: _,
1617 params: _,
1618 })
1619 | Plan::Fetch(plan::FetchPlan {
1620 name: _,
1621 count: _,
1622 timeout: _,
1623 })
1624 | Plan::Close(plan::ClosePlan { name: _ })
1625 | Plan::Prepare(plan::PreparePlan {
1626 name: _,
1627 stmt: _,
1628 desc: _,
1629 sql: _,
1630 })
1631 | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1632 | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1633 | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1634 }
1635}
1636
1637fn check_owner_roles(
1639 object_id: &ObjectId,
1640 role_ids: &BTreeSet<RoleId>,
1641 catalog: &impl SessionCatalog,
1642) -> bool {
1643 if let Some(owner_id) = catalog.get_owner_id(object_id) {
1644 role_ids.contains(&owner_id)
1645 } else {
1646 true
1647 }
1648}
1649
1650fn ownership_err(
1651 unheld_ownership: Vec<ObjectId>,
1652 catalog: &impl SessionCatalog,
1653) -> Result<(), UnauthorizedError> {
1654 if !unheld_ownership.is_empty() {
1655 let objects = unheld_ownership
1656 .into_iter()
1657 .map(|ownership| match ownership {
1658 ObjectId::Cluster(id) => (
1659 ObjectType::Cluster,
1660 catalog.get_cluster(id).name().to_string(),
1661 ),
1662 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1663 let cluster = catalog.get_cluster(cluster_id);
1664 let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1665 let name = QualifiedReplica {
1668 cluster: Ident::new_unchecked(cluster.name()),
1669 replica: Ident::new_unchecked(replica.name()),
1670 };
1671 (ObjectType::ClusterReplica, name.to_string())
1672 }
1673 ObjectId::Database(id) => (
1674 ObjectType::Database,
1675 catalog.get_database(&id).name().to_string(),
1676 ),
1677 ObjectId::Schema((database_spec, schema_spec)) => {
1678 let schema = catalog.get_schema(&database_spec, &schema_spec);
1679 let name = catalog.resolve_full_schema_name(schema.name());
1680 (ObjectType::Schema, name.to_string())
1681 }
1682 ObjectId::Item(id) => {
1683 let item = catalog.get_item(&id);
1684 let name = catalog.resolve_full_name(item.name());
1685 (item.item_type().into(), name.to_string())
1686 }
1687 ObjectId::NetworkPolicy(id) => (
1688 ObjectType::NetworkPolicy,
1689 catalog.get_network_policy(&id).name().to_string(),
1690 ),
1691 ObjectId::Role(_) => unreachable!("roles have no owner"),
1692 })
1693 .collect();
1694 Err(UnauthorizedError::Ownership { objects })
1695 } else {
1696 Ok(())
1697 }
1698}
1699
1700fn generate_required_source_privileges(
1701 name: &QualifiedItemName,
1702 data_source: &DataSourceDesc,
1703 in_cluster: Option<ClusterId>,
1704 role_id: RoleId,
1705) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1706 let mut privileges = vec![(
1707 SystemObjectId::Object(name.qualifiers.clone().into()),
1708 AclMode::CREATE,
1709 role_id,
1710 )];
1711 match (data_source, in_cluster) {
1712 (_, Some(id)) => {
1713 privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1714 }
1715 (DataSourceDesc::Ingestion(_), None) => {
1716 privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1717 }
1718 (_, None) => {}
1723 }
1724 privileges
1725}
1726
1727fn generate_read_privileges(
1735 catalog: &impl SessionCatalog,
1736 ids: impl Iterator<Item = CatalogItemId>,
1737 role_id: RoleId,
1738) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1739 generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1740}
1741
1742fn generate_read_privileges_inner(
1743 catalog: &impl SessionCatalog,
1744 ids: impl Iterator<Item = CatalogItemId>,
1745 role_id: RoleId,
1746 seen: &mut BTreeSet<(ObjectId, RoleId)>,
1747) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1748 let mut privileges = Vec::new();
1749 let mut views = Vec::new();
1750
1751 for id in ids {
1752 if seen.insert((id.into(), role_id)) {
1753 let item = catalog.get_item(&id);
1754 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1755 if seen.insert((schema_id.clone(), role_id)) {
1756 privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1757 }
1758 match item.item_type() {
1759 CatalogItemType::View | CatalogItemType::MaterializedView => {
1760 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1761 views.push((item.references().items().copied(), item.owner_id()));
1762 }
1763 CatalogItemType::Table | CatalogItemType::Source => {
1764 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1765 }
1766 CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1767 privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1768 }
1769 CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1770 }
1771 }
1772 }
1773
1774 for (view_ids, view_owner) in views {
1775 privileges.extend_from_slice(&generate_read_privileges_inner(
1776 catalog, view_ids, view_owner, seen,
1777 ));
1778 }
1779
1780 privileges
1781}
1782
1783fn generate_usage_privileges(
1784 catalog: &impl SessionCatalog,
1785 ids: &ResolvedIds,
1786 role_id: RoleId,
1787 item_types: &BTreeSet<CatalogItemType>,
1788) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1789 ids.items()
1791 .filter_map(move |id| {
1792 let item = catalog.get_item(id);
1793 if item_types.contains(&item.item_type()) {
1794 let schema_id = item.name().qualifiers.clone().into();
1795 Some([
1796 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1797 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1798 ])
1799 } else {
1800 None
1801 }
1802 })
1803 .flatten()
1804 .collect()
1805}
1806
1807fn generate_cluster_usage_privileges(
1808 expr_is_const: bool,
1809 target_cluster_id: Option<ClusterId>,
1810 role_id: RoleId,
1811) -> Option<(SystemObjectId, AclMode, RoleId)> {
1812 if !expr_is_const {
1815 if let Some(cluster_id) = target_cluster_id {
1816 return Some((
1817 SystemObjectId::Object(cluster_id.into()),
1818 AclMode::USAGE,
1819 role_id,
1820 ));
1821 }
1822 }
1823
1824 None
1825}
1826
1827fn check_object_privileges(
1828 catalog: &impl SessionCatalog,
1829 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1830 role_membership: BTreeSet<RoleId>,
1831 current_role_id: RoleId,
1832) -> Result<(), UnauthorizedError> {
1833 let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1834 role_memberships.insert(current_role_id, role_membership);
1835 for (object_id, acl_mode, role_id) in privileges {
1836 if matches!(
1840 &object_id,
1841 SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary)))
1842 ) {
1843 continue;
1844 }
1845
1846 let role_membership = role_memberships
1847 .entry(role_id)
1848 .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1849 let object_privileges = catalog
1850 .get_privileges(&object_id)
1851 .expect("only object types with privileges will generate required privileges");
1852 let role_privileges = role_membership
1853 .iter()
1854 .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1855 .map(|mz_acl_item| mz_acl_item.acl_mode)
1856 .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1857 if !role_privileges.contains(acl_mode) {
1858 let role_name = catalog.get_role(&role_id).name().to_string();
1859 let privileges = acl_mode.to_error_string();
1860 return Err(UnauthorizedError::Privilege {
1861 object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1862 role_name,
1863 privileges,
1864 });
1865 }
1866 }
1867
1868 Ok(())
1869}
1870
1871pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1872 const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1873 .union(AclMode::SELECT)
1874 .union(AclMode::UPDATE)
1875 .union(AclMode::DELETE);
1876 const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1877 const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1878 .union(AclMode::CREATE_DB)
1879 .union(AclMode::CREATE_CLUSTER)
1880 .union(AclMode::CREATE_NETWORK_POLICY);
1881
1882 const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1883 match object_type {
1884 SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1885 SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1886 SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1887 SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1888 SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1889 SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1890 SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1891 SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1892 SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1893 SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1894 SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1895 SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1896 SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1897 SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1898 SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1899 SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1900 SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1901 }
1902}
1903
1904pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1905 MzAclItem {
1906 grantee: owner_id,
1907 grantor: owner_id,
1908 acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1909 }
1910}
1911
1912const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1913 match object_type {
1914 ObjectType::Table
1915 | ObjectType::View
1916 | ObjectType::MaterializedView
1917 | ObjectType::Source => AclMode::SELECT,
1918 ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1919 ObjectType::Sink
1920 | ObjectType::Index
1921 | ObjectType::Role
1922 | ObjectType::Cluster
1923 | ObjectType::ClusterReplica
1924 | ObjectType::Secret
1925 | ObjectType::Connection
1926 | ObjectType::Database
1927 | ObjectType::Func
1928 | ObjectType::NetworkPolicy => AclMode::empty(),
1929 }
1930}
1931
1932pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1933 let acl_mode = default_builtin_object_acl_mode(object_type);
1934 MzAclItem {
1935 grantee: MZ_SUPPORT_ROLE_ID,
1936 grantor: MZ_SYSTEM_ROLE_ID,
1937 acl_mode,
1938 }
1939}
1940
1941pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1942 let acl_mode = default_builtin_object_acl_mode(object_type);
1943 MzAclItem {
1944 grantee: RoleId::Public,
1945 grantor: MZ_SYSTEM_ROLE_ID,
1946 acl_mode,
1947 }
1948}