1use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26 CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29 CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30 SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34 DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40fn rbac_check_preamble(
42 catalog: &impl SessionCatalog,
43 session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45 if catalog
50 .try_get_role(&session_meta.role_metadata().current_role)
51 .is_none()
52 {
53 return Err(UnauthorizedError::ConcurrentRoleDrop(
54 session_meta.role_metadata().current_role.clone(),
55 ));
56 };
57 if catalog
58 .try_get_role(&session_meta.role_metadata().session_role)
59 .is_none()
60 {
61 return Err(UnauthorizedError::ConcurrentRoleDrop(
62 session_meta.role_metadata().session_role.clone(),
63 ));
64 };
65 if catalog
66 .try_get_role(&session_meta.role_metadata().authenticated_role)
67 .is_none()
68 {
69 return Err(UnauthorizedError::ConcurrentRoleDrop(
70 session_meta.role_metadata().authenticated_role.clone(),
71 ));
72 };
73
74 Ok(())
75}
76
77fn filter_requirements(
79 catalog: &impl SessionCatalog,
80 session_meta: &dyn SessionMetadata,
81 rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83 let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86 && !session_meta.role_metadata().current_role.is_system()
87 && !session_meta.role_metadata().session_role.is_system();
88 let is_superuser = session_meta.is_superuser();
90 if is_rbac_disabled || is_superuser {
91 return rbac_requirements.filter_to_mandatory_requirements();
92 }
93
94 rbac_requirements
95}
96
97static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99 btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104 let mut items = DEFAULT_ITEM_USAGE.clone();
105 items.insert(CatalogItemType::Type);
106 items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113 #[error("permission denied to {action}")]
115 Superuser { action: String },
116 #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118 Ownership { objects: Vec<(ObjectType, String)> },
119 #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121 RoleMembership { role_names: Vec<String> },
122 #[error("permission denied for {object_description}")]
124 Privilege {
125 object_description: ErrorMessageObjectDescription,
126 role_name: String,
127 privileges: String,
128 },
129 #[error("permission denied to {action}")]
133 MzSystem { action: String },
134 #[error("permission denied to {action}")]
136 MzSupport { action: String },
137 #[error("role {0} was concurrently dropped")]
139 ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143 pub fn detail(&self) -> Option<String> {
144 match &self {
145 UnauthorizedError::Superuser { action } => {
146 Some(format!("You must be a superuser to {}", action))
147 }
148 UnauthorizedError::Privilege {
149 object_description,
150 role_name,
151 privileges,
152 } => Some(format!(
153 "The '{role_name}' role needs {privileges} privileges on {object_description}"
154 )),
155 UnauthorizedError::MzSystem { .. } => {
156 Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157 }
158 UnauthorizedError::MzSupport { .. } => Some(format!(
159 "The '{}' role has very limited privileges",
160 SUPPORT_USER.name
161 )),
162 UnauthorizedError::ConcurrentRoleDrop(_) => {
163 Some("Please disconnect and re-connect with a valid role.".into())
164 }
165 UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166 }
167 }
168}
169
170#[derive(Debug)]
172struct RbacRequirements {
173 role_membership: BTreeSet<RoleId>,
175 ownership: Vec<ObjectId>,
177 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180 item_usage: &'static BTreeSet<CatalogItemType>,
185 superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190 fn empty() -> RbacRequirements {
191 RbacRequirements {
192 role_membership: BTreeSet::new(),
193 ownership: Vec::new(),
194 privileges: Vec::new(),
195 item_usage: &EMPTY_ITEM_USAGE,
196 superuser_action: None,
197 }
198 }
199
200 fn validate(
201 self,
202 catalog: &impl SessionCatalog,
203 session: &dyn SessionMetadata,
204 resolved_ids: &ResolvedIds,
205 ) -> Result<(), UnauthorizedError> {
206 let role_membership =
208 catalog.collect_role_membership(&session.role_metadata().current_role);
209
210 check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212 let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215 if !unheld_membership.is_empty() {
216 let role_names = unheld_membership
217 .into_iter()
218 .map(|role_id| {
219 catalog
221 .try_get_role(role_id)
222 .map(|role| role.name().to_string())
223 .unwrap_or_else(|| role_id.to_string())
224 })
225 .collect();
226 return Err(UnauthorizedError::RoleMembership { role_names });
227 }
228
229 let unheld_ownership = self
232 .ownership
233 .into_iter()
234 .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235 .collect();
236 ownership_err(unheld_ownership, catalog)?;
237
238 check_object_privileges(
239 catalog,
240 self.privileges,
241 role_membership,
242 session.role_metadata().current_role,
243 )?;
244
245 if let Some(action) = self.superuser_action {
246 return Err(UnauthorizedError::Superuser { action });
247 }
248
249 Ok(())
250 }
251
252 fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253 let RbacRequirements {
254 role_membership,
255 ownership,
256 privileges,
257 item_usage,
258 superuser_action: _,
259 } = self;
260 let role_membership = role_membership
261 .into_iter()
262 .filter(|id| id.is_system())
263 .collect();
264 let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265 let privileges = privileges
266 .into_iter()
267 .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268 .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270 .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271 .collect();
272 let superuser_action = None;
273 RbacRequirements {
274 role_membership,
275 ownership,
276 privileges,
277 item_usage,
278 superuser_action,
279 }
280 }
281}
282
283impl Default for RbacRequirements {
284 fn default() -> Self {
285 RbacRequirements {
286 role_membership: BTreeSet::new(),
287 ownership: Vec::new(),
288 privileges: Vec::new(),
289 item_usage: &DEFAULT_ITEM_USAGE,
290 superuser_action: None,
291 }
292 }
293}
294
295pub fn check_usage(
297 catalog: &impl SessionCatalog,
298 session: &dyn SessionMetadata,
299 resolved_ids: &ResolvedIds,
300 item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302 rbac_check_preamble(catalog, session)?;
303
304 let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307 let existing_resolved_ids =
310 resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312 let required_privileges = generate_usage_privileges(
313 catalog,
314 &existing_resolved_ids,
315 session.role_metadata().current_role,
316 item_types,
317 )
318 .into_iter()
319 .collect();
320
321 let mut rbac_requirements = RbacRequirements::empty();
322 rbac_requirements.privileges = required_privileges;
323 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324 let required_privileges = rbac_requirements.privileges;
325
326 check_object_privileges(
327 catalog,
328 required_privileges,
329 role_membership,
330 session.role_metadata().current_role,
331 )?;
332
333 Ok(())
334}
335
336pub fn check_plan(
338 catalog: &impl SessionCatalog,
339 active_conns: impl FnOnce(u32) -> Option<RoleId>,
341 session: &dyn SessionMetadata,
342 plan: &Plan,
343 target_cluster_id: Option<ClusterId>,
344 resolved_ids: &ResolvedIds,
345) -> Result<(), UnauthorizedError> {
346 rbac_check_preamble(catalog, session)?;
347
348 let rbac_requirements = generate_rbac_requirements(
349 catalog,
350 plan,
351 active_conns,
352 target_cluster_id,
353 session.role_metadata().current_role,
354 );
355 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
356 debug!(
357 "rbac requirements {rbac_requirements:?} for plan {:?}",
358 PlanKind::from(plan)
359 );
360 rbac_requirements.validate(catalog, session, resolved_ids)
361}
362
363pub fn is_rbac_enabled_for_session(
365 system_vars: &SystemVars,
366 session: &dyn SessionMetadata,
367) -> bool {
368 let server_enabled = system_vars.enable_rbac_checks();
369 let session_enabled = session.enable_session_rbac_checks();
370
371 server_enabled || session_enabled
374}
375
376fn generate_rbac_requirements(
378 catalog: &impl SessionCatalog,
379 plan: &Plan,
380 active_conns: impl FnOnce(u32) -> Option<RoleId>,
381 target_cluster_id: Option<ClusterId>,
382 role_id: RoleId,
383) -> RbacRequirements {
384 match plan {
385 Plan::CreateConnection(plan::CreateConnectionPlan {
386 name,
387 if_not_exists: _,
388 connection: _,
389 validate: _,
390 }) => RbacRequirements {
391 privileges: vec![(
392 SystemObjectId::Object(name.qualifiers.clone().into()),
393 AclMode::CREATE,
394 role_id,
395 )],
396 item_usage: &CREATE_ITEM_USAGE,
397 ..Default::default()
398 },
399 Plan::CreateDatabase(plan::CreateDatabasePlan {
400 name: _,
401 if_not_exists: _,
402 }) => RbacRequirements {
403 privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
404 item_usage: &CREATE_ITEM_USAGE,
405 ..Default::default()
406 },
407 Plan::CreateSchema(plan::CreateSchemaPlan {
408 database_spec,
409 schema_name: _,
410 if_not_exists: _,
411 }) => {
412 let privileges = match database_spec {
413 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
414 ResolvedDatabaseSpecifier::Id(database_id) => {
415 vec![(
416 SystemObjectId::Object(database_id.into()),
417 AclMode::CREATE,
418 role_id,
419 )]
420 }
421 };
422 RbacRequirements {
423 privileges,
424 item_usage: &CREATE_ITEM_USAGE,
425 ..Default::default()
426 }
427 }
428 Plan::CreateRole(plan::CreateRolePlan {
429 name: _,
430 attributes,
431 }) => {
432 if attributes.superuser.unwrap_or(false) {
433 RbacRequirements {
434 superuser_action: Some("create superuser role".to_string()),
435 ..Default::default()
436 }
437 } else {
438 RbacRequirements {
439 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
440 item_usage: &CREATE_ITEM_USAGE,
441 ..Default::default()
442 }
443 }
444 }
445 Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
446 privileges: vec![(
447 SystemObjectId::System,
448 AclMode::CREATE_NETWORK_POLICY,
449 role_id,
450 )],
451 item_usage: &CREATE_ITEM_USAGE,
452 ..Default::default()
453 },
454 Plan::CreateCluster(plan::CreateClusterPlan {
455 name: _,
456 variant: _,
457 workload_class: _,
458 }) => RbacRequirements {
459 privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
460 item_usage: &CREATE_ITEM_USAGE,
461 ..Default::default()
462 },
463 Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
464 cluster_id,
465 name: _,
466 config: _,
467 }) => RbacRequirements {
468 ownership: vec![ObjectId::Cluster(*cluster_id)],
469 item_usage: &CREATE_ITEM_USAGE,
470 ..Default::default()
471 },
472 Plan::CreateSource(plan::CreateSourcePlan {
473 name,
474 source,
475 if_not_exists: _,
476 timeline: _,
477 in_cluster,
478 }) => RbacRequirements {
479 privileges: generate_required_source_privileges(
480 name,
481 &source.data_source,
482 *in_cluster,
483 role_id,
484 ),
485 item_usage: &CREATE_ITEM_USAGE,
486 ..Default::default()
487 },
488 Plan::CreateSources(plans) => RbacRequirements {
489 privileges: plans
490 .iter()
491 .flat_map(
492 |plan::CreateSourcePlanBundle {
493 item_id: _,
494 global_id: _,
495 plan:
496 plan::CreateSourcePlan {
497 name,
498 source,
499 if_not_exists: _,
500 timeline: _,
501 in_cluster,
502 },
503 resolved_ids: _,
504 available_source_references: _,
505 }| {
506 generate_required_source_privileges(
507 name,
508 &source.data_source,
509 *in_cluster,
510 role_id,
511 )
512 .into_iter()
513 },
514 )
515 .collect(),
516 item_usage: &CREATE_ITEM_USAGE,
517 ..Default::default()
518 },
519 Plan::CreateSecret(plan::CreateSecretPlan {
520 name,
521 secret: _,
522 if_not_exists: _,
523 }) => RbacRequirements {
524 privileges: vec![(
525 SystemObjectId::Object(name.qualifiers.clone().into()),
526 AclMode::CREATE,
527 role_id,
528 )],
529 item_usage: &CREATE_ITEM_USAGE,
530 ..Default::default()
531 },
532 Plan::CreateSink(plan::CreateSinkPlan {
533 name,
534 sink,
535 with_snapshot: _,
536 if_not_exists: _,
537 in_cluster,
538 }) => {
539 let mut privileges = vec![(
540 SystemObjectId::Object(name.qualifiers.clone().into()),
541 AclMode::CREATE,
542 role_id,
543 )];
544 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
545 privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
546 privileges.push((
547 SystemObjectId::Object(in_cluster.into()),
548 AclMode::CREATE,
549 role_id,
550 ));
551 RbacRequirements {
552 privileges,
553 item_usage: &CREATE_ITEM_USAGE,
554 ..Default::default()
555 }
556 }
557 Plan::CreateTable(plan::CreateTablePlan {
558 name,
559 table: _,
560 if_not_exists: _,
561 }) => RbacRequirements {
562 privileges: vec![(
563 SystemObjectId::Object(name.qualifiers.clone().into()),
564 AclMode::CREATE,
565 role_id,
566 )],
567 item_usage: &CREATE_ITEM_USAGE,
568 ..Default::default()
569 },
570 Plan::CreateView(plan::CreateViewPlan {
571 name,
572 view: _,
573 replace,
574 drop_ids: _,
575 if_not_exists: _,
576 ambiguous_columns: _,
577 }) => RbacRequirements {
578 ownership: replace
579 .map(|id| vec![ObjectId::Item(id)])
580 .unwrap_or_default(),
581 privileges: vec![(
582 SystemObjectId::Object(name.qualifiers.clone().into()),
583 AclMode::CREATE,
584 role_id,
585 )],
586 item_usage: &CREATE_ITEM_USAGE,
587 ..Default::default()
588 },
589 Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
590 name,
591 materialized_view,
592 replace,
593 drop_ids: _,
594 if_not_exists: _,
595 ambiguous_columns: _,
596 }) => RbacRequirements {
597 ownership: replace
598 .map(|id| vec![ObjectId::Item(id)])
599 .unwrap_or_default(),
600 privileges: vec![
601 (
602 SystemObjectId::Object(name.qualifiers.clone().into()),
603 AclMode::CREATE,
604 role_id,
605 ),
606 (
607 SystemObjectId::Object(materialized_view.cluster_id.into()),
608 AclMode::CREATE,
609 role_id,
610 ),
611 ],
612 item_usage: &CREATE_ITEM_USAGE,
613 ..Default::default()
614 },
615 Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
616 name,
617 placeholder_id: _,
618 desc: _,
619 input_id: _,
620 with_snapshot: _,
621 continual_task,
622 }) => RbacRequirements {
623 privileges: vec![
624 (
625 SystemObjectId::Object(name.qualifiers.clone().into()),
626 AclMode::CREATE,
627 role_id,
628 ),
629 (
630 SystemObjectId::Object(continual_task.cluster_id.into()),
631 AclMode::CREATE,
632 role_id,
633 ),
634 ],
635 item_usage: &CREATE_ITEM_USAGE,
636 ..Default::default()
637 },
638 Plan::CreateIndex(plan::CreateIndexPlan {
639 name,
640 index,
641 if_not_exists: _,
642 }) => {
643 let index_on_item = catalog.resolve_item_id(&index.on);
644 RbacRequirements {
645 ownership: vec![ObjectId::Item(index_on_item)],
646 privileges: vec![
647 (
648 SystemObjectId::Object(name.qualifiers.clone().into()),
649 AclMode::CREATE,
650 role_id,
651 ),
652 (
653 SystemObjectId::Object(index.cluster_id.into()),
654 AclMode::CREATE,
655 role_id,
656 ),
657 ],
658 item_usage: &CREATE_ITEM_USAGE,
659 ..Default::default()
660 }
661 }
662 Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
663 privileges: vec![(
664 SystemObjectId::Object(name.qualifiers.clone().into()),
665 AclMode::CREATE,
666 role_id,
667 )],
668 item_usage: &CREATE_ITEM_USAGE,
669 ..Default::default()
670 },
671 Plan::Comment(plan::CommentPlan {
672 object_id,
673 sub_component: _,
674 comment: _,
675 }) => {
676 let (ownership, privileges) = match object_id {
677 CommentObjectId::Role(_) => (
680 Vec::new(),
681 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
682 ),
683 _ => (vec![ObjectId::from(*object_id)], Vec::new()),
684 };
685 RbacRequirements {
686 ownership,
687 privileges,
688 ..Default::default()
689 }
690 }
691 Plan::DropObjects(plan::DropObjectsPlan {
692 referenced_ids,
693 drop_ids: _,
694 object_type,
695 }) => {
696 let privileges = if object_type == &ObjectType::Role {
697 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
698 } else {
699 referenced_ids
700 .iter()
701 .filter_map(|id| match id {
702 ObjectId::ClusterReplica((cluster_id, _)) => Some((
703 SystemObjectId::Object(cluster_id.into()),
704 AclMode::USAGE,
705 role_id,
706 )),
707 ObjectId::Schema((database_spec, _)) => match database_spec {
708 ResolvedDatabaseSpecifier::Ambient => None,
709 ResolvedDatabaseSpecifier::Id(database_id) => Some((
710 SystemObjectId::Object(database_id.into()),
711 AclMode::USAGE,
712 role_id,
713 )),
714 },
715 ObjectId::Item(item_id) => {
716 let item = catalog.get_item(item_id);
717 Some((
718 SystemObjectId::Object(item.name().qualifiers.clone().into()),
719 AclMode::USAGE,
720 role_id,
721 ))
722 }
723 ObjectId::Cluster(_)
724 | ObjectId::Database(_)
725 | ObjectId::Role(_)
726 | ObjectId::NetworkPolicy(_) => None,
727 })
728 .collect()
729 };
730 RbacRequirements {
731 ownership: referenced_ids.clone(),
733 privileges,
734 ..Default::default()
735 }
736 }
737 Plan::DropOwned(plan::DropOwnedPlan {
738 role_ids,
739 drop_ids: _,
740 privilege_revokes: _,
741 default_privilege_revokes: _,
742 }) => RbacRequirements {
743 role_membership: role_ids.into_iter().cloned().collect(),
744 ..Default::default()
745 },
746 Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
747 let container_id = match id {
748 ObjectId::Item(id) => Some(SystemObjectId::Object(
749 catalog.get_item(id).name().qualifiers.clone().into(),
750 )),
751 ObjectId::Schema((database_id, _schema_id)) => match database_id {
752 ResolvedDatabaseSpecifier::Ambient => None,
753 ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
754 },
755 ObjectId::Cluster(_)
756 | ObjectId::ClusterReplica(_)
757 | ObjectId::Database(_)
758 | ObjectId::Role(_)
759 | ObjectId::NetworkPolicy(_) => None,
760 };
761 let privileges = match container_id {
762 Some(id) => vec![(id, AclMode::USAGE, role_id)],
763 None => Vec::new(),
764 };
765 RbacRequirements {
766 privileges,
767 item_usage: &EMPTY_ITEM_USAGE,
768 ..Default::default()
769 }
770 }
771 Plan::ShowColumns(plan::ShowColumnsPlan {
772 id,
773 select_plan,
774 new_resolved_ids: _,
775 }) => {
776 let mut privileges = vec![(
777 SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
778 AclMode::USAGE,
779 role_id,
780 )];
781
782 for privilege in generate_rbac_requirements(
783 catalog,
784 &Plan::Select(select_plan.clone()),
785 active_conns,
786 target_cluster_id,
787 role_id,
788 )
789 .privileges
790 {
791 privileges.push(privilege);
792 }
793 RbacRequirements {
794 privileges,
795 ..Default::default()
796 }
797 }
798 Plan::Select(plan::SelectPlan {
799 source,
800 select: _,
801 when: _,
802 finishing: _,
803 copy_to: _,
804 }) => {
805 let items = source
806 .depends_on()
807 .into_iter()
808 .map(|gid| catalog.resolve_item_id(&gid));
809 let mut privileges = generate_read_privileges(catalog, items, role_id);
810 if let Some(privilege) = generate_cluster_usage_privileges(
811 source.as_const().is_some(),
812 target_cluster_id,
813 role_id,
814 ) {
815 privileges.push(privilege);
816 }
817 RbacRequirements {
818 privileges,
819 ..Default::default()
820 }
821 }
822 Plan::Subscribe(plan::SubscribePlan {
823 from,
824 with_snapshot: _,
825 when: _,
826 up_to: _,
827 copy_to: _,
828 emit_progress: _,
829 output: _,
830 }) => {
831 let items = from
832 .depends_on()
833 .into_iter()
834 .map(|gid| catalog.resolve_item_id(&gid));
835 let mut privileges = generate_read_privileges(catalog, items, role_id);
836 if let Some(cluster_id) = target_cluster_id {
837 privileges.push((
838 SystemObjectId::Object(cluster_id.into()),
839 AclMode::USAGE,
840 role_id,
841 ));
842 }
843 RbacRequirements {
844 privileges,
845 ..Default::default()
846 }
847 }
848 Plan::CopyFrom(plan::CopyFromPlan {
849 target_name: _,
850 target_id,
851 source: _,
852 columns: _,
853 source_desc: _,
854 mfp: _,
855 params: _,
856 filter: _,
857 }) => RbacRequirements {
858 privileges: vec![
859 (
860 SystemObjectId::Object(
861 catalog.get_item(target_id).name().qualifiers.clone().into(),
862 ),
863 AclMode::USAGE,
864 role_id,
865 ),
866 (
867 SystemObjectId::Object(target_id.into()),
868 AclMode::INSERT,
869 role_id,
870 ),
871 ],
872 ..Default::default()
873 },
874 Plan::CopyTo(plan::CopyToPlan {
875 select_plan,
876 desc: _,
877 to: _,
878 connection: _,
879 connection_id: _,
880 format: _,
881 max_file_size: _,
882 }) => {
883 let items = select_plan
884 .source
885 .depends_on()
886 .into_iter()
887 .map(|gid| catalog.resolve_item_id(&gid));
888 let mut privileges = generate_read_privileges(catalog, items, role_id);
889 if let Some(cluster_id) = target_cluster_id {
890 privileges.push((
891 SystemObjectId::Object(cluster_id.into()),
892 AclMode::USAGE,
893 role_id,
894 ));
895 }
896 RbacRequirements {
897 privileges,
898 ..Default::default()
899 }
900 }
901 Plan::ExplainPlan(plan::ExplainPlanPlan {
902 stage: _,
903 format: _,
904 config: _,
905 explainee,
906 })
907 | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
908 privileges: match explainee {
909 Explainee::View(id)
910 | Explainee::MaterializedView(id)
911 | Explainee::Index(id)
912 | Explainee::ReplanView(id)
913 | Explainee::ReplanMaterializedView(id)
914 | Explainee::ReplanIndex(id) => {
915 let item = catalog.get_item(id);
916 let schema_id: ObjectId = item.name().qualifiers.clone().into();
917 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
918 }
919 Explainee::Statement(stmt) => stmt
920 .depends_on()
921 .into_iter()
922 .map(|id| {
923 let item = catalog.get_item_by_global_id(&id);
924 let schema_id: ObjectId = item.name().qualifiers.clone().into();
925 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
926 })
927 .collect(),
928 },
929 item_usage: match explainee {
930 Explainee::View(..)
931 | Explainee::MaterializedView(..)
932 | Explainee::Index(..)
933 | Explainee::ReplanView(..)
934 | Explainee::ReplanMaterializedView(..)
935 | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
936 Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
937 },
938 ..Default::default()
939 },
940 Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
941 RbacRequirements {
942 privileges: {
943 let item = catalog.get_item_by_global_id(sink_from);
944 let schema_id: ObjectId = item.name().qualifiers.clone().into();
945 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
946 },
947 item_usage: &EMPTY_ITEM_USAGE,
948 ..Default::default()
949 }
950 }
951 Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
952 format: _,
953 raw_plan,
954 when: _,
955 }) => RbacRequirements {
956 privileges: raw_plan
957 .depends_on()
958 .into_iter()
959 .map(|id| {
960 let item = catalog.get_item_by_global_id(&id);
961 let schema_id: ObjectId = item.name().qualifiers.clone().into();
962 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
963 })
964 .collect(),
965 ..Default::default()
966 },
967 Plan::Insert(plan::InsertPlan {
968 id,
969 values,
970 returning,
971 }) => {
972 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
973 let mut privileges = vec![
974 (
975 SystemObjectId::Object(schema_id.clone()),
976 AclMode::USAGE,
977 role_id,
978 ),
979 (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
980 ];
981 let mut seen = BTreeSet::from([(schema_id, role_id)]);
982
983 if returning
986 .iter()
987 .any(|assignment| assignment.contains_column())
988 {
989 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
990 seen.insert((id.into(), role_id));
991 }
992
993 let items = values
994 .depends_on()
995 .into_iter()
996 .map(|gid| catalog.resolve_item_id(&gid));
997 privileges.extend_from_slice(&generate_read_privileges_inner(
998 catalog, items, role_id, &mut seen,
999 ));
1000
1001 if let Some(privilege) = generate_cluster_usage_privileges(
1002 values.as_const().is_some(),
1003 target_cluster_id,
1004 role_id,
1005 ) {
1006 privileges.push(privilege);
1007 } else if !returning.is_empty() {
1008 if let Some(cluster_id) = target_cluster_id {
1011 privileges.push((
1012 SystemObjectId::Object(cluster_id.into()),
1013 AclMode::USAGE,
1014 role_id,
1015 ));
1016 }
1017 }
1018 RbacRequirements {
1019 privileges,
1020 ..Default::default()
1021 }
1022 }
1023 Plan::AlterCluster(plan::AlterClusterPlan {
1024 id,
1025 name: _,
1026 options: _,
1027 strategy: _,
1028 }) => RbacRequirements {
1029 ownership: vec![ObjectId::Cluster(*id)],
1030 item_usage: &CREATE_ITEM_USAGE,
1031 ..Default::default()
1032 },
1033 Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1034 ownership: vec![ObjectId::Item(*id)],
1035 privileges: vec![(
1036 SystemObjectId::Object(set_cluster.into()),
1037 AclMode::CREATE,
1038 role_id,
1039 )],
1040 item_usage: &CREATE_ITEM_USAGE,
1041 ..Default::default()
1042 },
1043 Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1044 id,
1045 window: _,
1046 value: _,
1047 object_type: _,
1048 }) => RbacRequirements {
1049 ownership: vec![ObjectId::Item(*id)],
1050 item_usage: &CREATE_ITEM_USAGE,
1051 ..Default::default()
1052 },
1053 Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1054 ownership: vec![ObjectId::Item(*id)],
1055 ..Default::default()
1056 },
1057 Plan::AlterSource(plan::AlterSourcePlan {
1058 item_id,
1059 ingestion_id: _,
1060 action: _,
1061 }) => RbacRequirements {
1062 ownership: vec![ObjectId::Item(*item_id)],
1063 item_usage: &CREATE_ITEM_USAGE,
1064 ..Default::default()
1065 },
1066 Plan::AlterSink(plan::AlterSinkPlan {
1067 item_id,
1068 global_id: _,
1069 sink,
1070 with_snapshot: _,
1071 in_cluster,
1072 }) => {
1073 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1074 let mut privileges = generate_read_privileges(catalog, items, role_id);
1075 privileges.push((
1076 SystemObjectId::Object(in_cluster.into()),
1077 AclMode::CREATE,
1078 role_id,
1079 ));
1080 RbacRequirements {
1081 ownership: vec![ObjectId::Item(*item_id)],
1082 privileges,
1083 item_usage: &CREATE_ITEM_USAGE,
1084 ..Default::default()
1085 }
1086 }
1087 Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1088 id,
1089 name: _,
1090 to_name: _,
1091 }) => RbacRequirements {
1092 ownership: vec![ObjectId::Cluster(*id)],
1093 ..Default::default()
1094 },
1095 Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1096 id_a,
1097 id_b,
1098 name_a: _,
1099 name_b: _,
1100 name_temp: _,
1101 }) => RbacRequirements {
1102 ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1103 ..Default::default()
1104 },
1105 Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1106 cluster_id,
1107 replica_id,
1108 name: _,
1109 to_name: _,
1110 }) => RbacRequirements {
1111 ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1112 ..Default::default()
1113 },
1114 Plan::AlterItemRename(plan::AlterItemRenamePlan {
1115 id,
1116 current_full_name: _,
1117 to_name: _,
1118 object_type: _,
1119 }) => RbacRequirements {
1120 ownership: vec![ObjectId::Item(*id)],
1121 ..Default::default()
1122 },
1123 Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1124 cur_schema_spec,
1125 new_schema_name: _,
1126 }) => {
1127 let privileges = match cur_schema_spec.0 {
1128 ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1129 SystemObjectId::Object(ObjectId::Database(db_id)),
1130 AclMode::CREATE,
1131 role_id,
1132 )],
1133 ResolvedDatabaseSpecifier::Ambient => vec![],
1134 };
1135
1136 RbacRequirements {
1137 ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1138 privileges,
1139 ..Default::default()
1140 }
1141 }
1142 Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1143 schema_a_spec,
1144 schema_a_name: _,
1145 schema_b_spec,
1146 schema_b_name: _,
1147 name_temp: _,
1148 }) => {
1149 let mut privileges = vec![];
1150 if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1151 privileges.push((
1152 SystemObjectId::Object(ObjectId::Database(id_a)),
1153 AclMode::CREATE,
1154 role_id,
1155 ));
1156 }
1157 if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1158 privileges.push((
1159 SystemObjectId::Object(ObjectId::Database(id_b)),
1160 AclMode::CREATE,
1161 role_id,
1162 ));
1163 }
1164
1165 RbacRequirements {
1166 ownership: vec![
1167 ObjectId::Schema(*schema_a_spec),
1168 ObjectId::Schema(*schema_b_spec),
1169 ],
1170 privileges,
1171 ..Default::default()
1172 }
1173 }
1174 Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1175 ownership: vec![ObjectId::Item(*id)],
1176 item_usage: &CREATE_ITEM_USAGE,
1177 ..Default::default()
1178 },
1179 Plan::AlterRole(plan::AlterRolePlan {
1180 id,
1181 name: _,
1182 option,
1183 }) => match option {
1184 plan::PlannedAlterRoleOption::Attributes(attributes)
1186 if attributes.superuser.unwrap_or(false) =>
1187 {
1188 RbacRequirements {
1189 superuser_action: Some("alter superuser role".to_string()),
1190 ..Default::default()
1191 }
1192 }
1193 plan::PlannedAlterRoleOption::Attributes(attributes)
1195 if attributes.password.is_some() && role_id == *id =>
1196 {
1197 RbacRequirements::default()
1198 }
1199 plan::PlannedAlterRoleOption::Attributes(attributes)
1201 if attributes.password.is_some() =>
1202 {
1203 RbacRequirements {
1204 superuser_action: Some("alter password of role".to_string()),
1205 ..Default::default()
1206 }
1207 }
1208 plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1210 RbacRequirements::default()
1211 }
1212 _ => RbacRequirements {
1214 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1215 item_usage: &CREATE_ITEM_USAGE,
1216 ..Default::default()
1217 },
1218 },
1219 Plan::AlterOwner(plan::AlterOwnerPlan {
1220 id,
1221 object_type: _,
1222 new_owner,
1223 }) => {
1224 let privileges = match id {
1225 ObjectId::ClusterReplica((cluster_id, _)) => {
1226 vec![(
1227 SystemObjectId::Object(cluster_id.into()),
1228 AclMode::CREATE,
1229 role_id,
1230 )]
1231 }
1232 ObjectId::Schema((database_spec, _)) => match database_spec {
1233 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1234 ResolvedDatabaseSpecifier::Id(database_id) => {
1235 vec![(
1236 SystemObjectId::Object(database_id.into()),
1237 AclMode::CREATE,
1238 role_id,
1239 )]
1240 }
1241 },
1242 ObjectId::Item(item_id) => {
1243 let item = catalog.get_item(item_id);
1244 vec![(
1245 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1246 AclMode::CREATE,
1247 role_id,
1248 )]
1249 }
1250 ObjectId::Cluster(_)
1251 | ObjectId::Database(_)
1252 | ObjectId::Role(_)
1253 | ObjectId::NetworkPolicy(_) => Vec::new(),
1254 };
1255 RbacRequirements {
1256 role_membership: BTreeSet::from([*new_owner]),
1257 ownership: vec![id.clone()],
1258 privileges,
1259 ..Default::default()
1260 }
1261 }
1262 Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1263 ownership: vec![ObjectId::Item(*relation_id)],
1264 item_usage: &CREATE_ITEM_USAGE,
1265 ..Default::default()
1266 },
1267 Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1268 ownership: vec![ObjectId::NetworkPolicy(*id)],
1269 item_usage: &CREATE_ITEM_USAGE,
1270 ..Default::default()
1271 },
1272 Plan::ReadThenWrite(plan::ReadThenWritePlan {
1273 id,
1274 selection,
1275 finishing: _,
1276 assignments,
1277 kind,
1278 returning,
1279 }) => {
1280 let acl_mode = match kind {
1281 MutationKind::Insert => AclMode::INSERT,
1282 MutationKind::Update => AclMode::UPDATE,
1283 MutationKind::Delete => AclMode::DELETE,
1284 };
1285 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1286 let mut privileges = vec![
1287 (
1288 SystemObjectId::Object(schema_id.clone()),
1289 AclMode::USAGE,
1290 role_id,
1291 ),
1292 (SystemObjectId::Object(id.into()), acl_mode, role_id),
1293 ];
1294 let mut seen = BTreeSet::from([(schema_id, role_id)]);
1295
1296 if assignments
1299 .values()
1300 .chain(returning.iter())
1301 .any(|assignment| assignment.contains_column())
1302 {
1303 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1304 seen.insert((id.into(), role_id));
1305 }
1306
1307 let items = selection
1314 .depends_on()
1315 .into_iter()
1316 .map(|gid| catalog.resolve_item_id(&gid));
1317 privileges.extend_from_slice(&generate_read_privileges_inner(
1318 catalog, items, role_id, &mut seen,
1319 ));
1320
1321 if let Some(privilege) = generate_cluster_usage_privileges(
1322 selection.as_const().is_some(),
1323 target_cluster_id,
1324 role_id,
1325 ) {
1326 privileges.push(privilege);
1327 }
1328 RbacRequirements {
1329 privileges,
1330 ..Default::default()
1331 }
1332 }
1333 Plan::GrantRole(plan::GrantRolePlan {
1334 role_ids: _,
1335 member_ids: _,
1336 grantor_id: _,
1337 })
1338 | Plan::RevokeRole(plan::RevokeRolePlan {
1339 role_ids: _,
1340 member_ids: _,
1341 grantor_id: _,
1342 }) => RbacRequirements {
1343 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1344 ..Default::default()
1345 },
1346 Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1347 update_privileges,
1348 grantees: _,
1349 })
1350 | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1351 update_privileges,
1352 revokees: _,
1353 }) => {
1354 let mut privileges = Vec::with_capacity(update_privileges.len());
1355 for UpdatePrivilege { target_id, .. } in update_privileges {
1356 match target_id {
1357 SystemObjectId::Object(object_id) => match object_id {
1358 ObjectId::ClusterReplica((cluster_id, _)) => {
1359 privileges.push((
1360 SystemObjectId::Object(cluster_id.into()),
1361 AclMode::USAGE,
1362 role_id,
1363 ));
1364 }
1365 ObjectId::Schema((database_spec, _)) => match database_spec {
1366 ResolvedDatabaseSpecifier::Ambient => {}
1367 ResolvedDatabaseSpecifier::Id(database_id) => {
1368 privileges.push((
1369 SystemObjectId::Object(database_id.into()),
1370 AclMode::USAGE,
1371 role_id,
1372 ));
1373 }
1374 },
1375 ObjectId::Item(item_id) => {
1376 let item = catalog.get_item(item_id);
1377 privileges.push((
1378 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1379 AclMode::USAGE,
1380 role_id,
1381 ))
1382 }
1383 ObjectId::Cluster(_)
1384 | ObjectId::Database(_)
1385 | ObjectId::Role(_)
1386 | ObjectId::NetworkPolicy(_) => {}
1387 },
1388 SystemObjectId::System => {}
1389 }
1390 }
1391 RbacRequirements {
1392 ownership: update_privileges
1393 .iter()
1394 .filter_map(|update_privilege| update_privilege.target_id.object_id())
1395 .cloned()
1396 .collect(),
1397 privileges,
1398 superuser_action: if update_privileges
1403 .iter()
1404 .any(|update_privilege| update_privilege.target_id.is_system())
1405 {
1406 Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1407 } else {
1408 None
1409 },
1410 ..Default::default()
1411 }
1412 }
1413 Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1414 privilege_objects,
1415 privilege_acl_items: _,
1416 is_grant: _,
1417 }) => RbacRequirements {
1418 role_membership: privilege_objects
1419 .iter()
1420 .map(|privilege_object| privilege_object.role_id)
1421 .collect(),
1422 privileges: privilege_objects
1423 .into_iter()
1424 .filter_map(|privilege_object| {
1425 if let (Some(database_id), Some(_)) =
1426 (privilege_object.database_id, privilege_object.schema_id)
1427 {
1428 Some((
1429 SystemObjectId::Object(database_id.into()),
1430 AclMode::USAGE,
1431 role_id,
1432 ))
1433 } else {
1434 None
1435 }
1436 })
1437 .collect(),
1438 superuser_action: if privilege_objects
1444 .iter()
1445 .any(|privilege_object| privilege_object.role_id.is_public())
1446 {
1447 Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1448 } else {
1449 None
1450 },
1451 ..Default::default()
1452 },
1453 Plan::ReassignOwned(plan::ReassignOwnedPlan {
1454 old_roles,
1455 new_role,
1456 reassign_ids: _,
1457 }) => RbacRequirements {
1458 role_membership: old_roles
1459 .into_iter()
1460 .cloned()
1461 .chain(iter::once(*new_role))
1462 .collect(),
1463 ..Default::default()
1464 },
1465 Plan::SideEffectingFunc(func) => {
1466 let role_membership = match func {
1467 SideEffectingFunc::PgCancelBackend { connection_id } => {
1468 active_conns(*connection_id)
1469 .map(|x| [x].into())
1470 .unwrap_or_default()
1471 }
1472 };
1473 RbacRequirements {
1474 role_membership,
1475 ..Default::default()
1476 }
1477 }
1478 Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1479 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1480 RbacRequirements {
1481 privileges: vec![
1482 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1483 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1484 ],
1485 ..Default::default()
1486 }
1487 }
1488 Plan::DiscardTemp
1489 | Plan::DiscardAll
1490 | Plan::EmptyQuery
1491 | Plan::ShowAllVariables
1492 | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1493 | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1494 | Plan::SetVariable(plan::SetVariablePlan {
1495 name: _,
1496 value: _,
1497 local: _,
1498 })
1499 | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1500 | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1501 | Plan::StartTransaction(plan::StartTransactionPlan {
1502 access: _,
1503 isolation_level: _,
1504 })
1505 | Plan::CommitTransaction(plan::CommitTransactionPlan {
1506 transaction_type: _,
1507 })
1508 | Plan::AbortTransaction(plan::AbortTransactionPlan {
1509 transaction_type: _,
1510 })
1511 | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1512 | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1513 | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1514 | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1515 | Plan::Declare(plan::DeclarePlan {
1516 name: _,
1517 stmt: _,
1518 sql: _,
1519 params: _,
1520 })
1521 | Plan::Fetch(plan::FetchPlan {
1522 name: _,
1523 count: _,
1524 timeout: _,
1525 })
1526 | Plan::Close(plan::ClosePlan { name: _ })
1527 | Plan::Prepare(plan::PreparePlan {
1528 name: _,
1529 stmt: _,
1530 desc: _,
1531 sql: _,
1532 })
1533 | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1534 | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1535 | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1536 }
1537}
1538
1539fn check_owner_roles(
1541 object_id: &ObjectId,
1542 role_ids: &BTreeSet<RoleId>,
1543 catalog: &impl SessionCatalog,
1544) -> bool {
1545 if let Some(owner_id) = catalog.get_owner_id(object_id) {
1546 role_ids.contains(&owner_id)
1547 } else {
1548 true
1549 }
1550}
1551
1552fn ownership_err(
1553 unheld_ownership: Vec<ObjectId>,
1554 catalog: &impl SessionCatalog,
1555) -> Result<(), UnauthorizedError> {
1556 if !unheld_ownership.is_empty() {
1557 let objects = unheld_ownership
1558 .into_iter()
1559 .map(|ownership| match ownership {
1560 ObjectId::Cluster(id) => (
1561 ObjectType::Cluster,
1562 catalog.get_cluster(id).name().to_string(),
1563 ),
1564 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1565 let cluster = catalog.get_cluster(cluster_id);
1566 let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1567 let name = QualifiedReplica {
1570 cluster: Ident::new_unchecked(cluster.name()),
1571 replica: Ident::new_unchecked(replica.name()),
1572 };
1573 (ObjectType::ClusterReplica, name.to_string())
1574 }
1575 ObjectId::Database(id) => (
1576 ObjectType::Database,
1577 catalog.get_database(&id).name().to_string(),
1578 ),
1579 ObjectId::Schema((database_spec, schema_spec)) => {
1580 let schema = catalog.get_schema(&database_spec, &schema_spec);
1581 let name = catalog.resolve_full_schema_name(schema.name());
1582 (ObjectType::Schema, name.to_string())
1583 }
1584 ObjectId::Item(id) => {
1585 let item = catalog.get_item(&id);
1586 let name = catalog.resolve_full_name(item.name());
1587 (item.item_type().into(), name.to_string())
1588 }
1589 ObjectId::NetworkPolicy(id) => (
1590 ObjectType::NetworkPolicy,
1591 catalog.get_network_policy(&id).name().to_string(),
1592 ),
1593 ObjectId::Role(_) => unreachable!("roles have no owner"),
1594 })
1595 .collect();
1596 Err(UnauthorizedError::Ownership { objects })
1597 } else {
1598 Ok(())
1599 }
1600}
1601
1602fn generate_required_source_privileges(
1603 name: &QualifiedItemName,
1604 data_source: &DataSourceDesc,
1605 in_cluster: Option<ClusterId>,
1606 role_id: RoleId,
1607) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1608 let mut privileges = vec![(
1609 SystemObjectId::Object(name.qualifiers.clone().into()),
1610 AclMode::CREATE,
1611 role_id,
1612 )];
1613 match (data_source, in_cluster) {
1614 (_, Some(id)) => {
1615 privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1616 }
1617 (DataSourceDesc::Ingestion(_), None) => {
1618 privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1619 }
1620 (_, None) => {}
1625 }
1626 privileges
1627}
1628
1629fn generate_read_privileges(
1637 catalog: &impl SessionCatalog,
1638 ids: impl Iterator<Item = CatalogItemId>,
1639 role_id: RoleId,
1640) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1641 generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1642}
1643
1644fn generate_read_privileges_inner(
1645 catalog: &impl SessionCatalog,
1646 ids: impl Iterator<Item = CatalogItemId>,
1647 role_id: RoleId,
1648 seen: &mut BTreeSet<(ObjectId, RoleId)>,
1649) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1650 let mut privileges = Vec::new();
1651 let mut views = Vec::new();
1652
1653 for id in ids {
1654 if seen.insert((id.into(), role_id)) {
1655 let item = catalog.get_item(&id);
1656 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1657 if seen.insert((schema_id.clone(), role_id)) {
1658 privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1659 }
1660 match item.item_type() {
1661 CatalogItemType::View
1662 | CatalogItemType::MaterializedView
1663 | CatalogItemType::ContinualTask => {
1664 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1665 views.push((item.references().items().copied(), item.owner_id()));
1666 }
1667 CatalogItemType::Table | CatalogItemType::Source => {
1668 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1669 }
1670 CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1671 privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1672 }
1673 CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1674 }
1675 }
1676 }
1677
1678 for (view_ids, view_owner) in views {
1679 privileges.extend_from_slice(&generate_read_privileges_inner(
1680 catalog, view_ids, view_owner, seen,
1681 ));
1682 }
1683
1684 privileges
1685}
1686
1687fn generate_usage_privileges(
1688 catalog: &impl SessionCatalog,
1689 ids: &ResolvedIds,
1690 role_id: RoleId,
1691 item_types: &BTreeSet<CatalogItemType>,
1692) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1693 ids.items()
1695 .filter_map(move |id| {
1696 let item = catalog.get_item(id);
1697 if item_types.contains(&item.item_type()) {
1698 let schema_id = item.name().qualifiers.clone().into();
1699 Some([
1700 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1701 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1702 ])
1703 } else {
1704 None
1705 }
1706 })
1707 .flatten()
1708 .collect()
1709}
1710
1711fn generate_cluster_usage_privileges(
1712 expr_is_const: bool,
1713 target_cluster_id: Option<ClusterId>,
1714 role_id: RoleId,
1715) -> Option<(SystemObjectId, AclMode, RoleId)> {
1716 if !expr_is_const {
1719 if let Some(cluster_id) = target_cluster_id {
1720 return Some((
1721 SystemObjectId::Object(cluster_id.into()),
1722 AclMode::USAGE,
1723 role_id,
1724 ));
1725 }
1726 }
1727
1728 None
1729}
1730
1731fn check_object_privileges(
1732 catalog: &impl SessionCatalog,
1733 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1734 role_membership: BTreeSet<RoleId>,
1735 current_role_id: RoleId,
1736) -> Result<(), UnauthorizedError> {
1737 let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1738 role_memberships.insert(current_role_id, role_membership);
1739 for (object_id, acl_mode, role_id) in privileges {
1740 let role_membership = role_memberships
1741 .entry(role_id)
1742 .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1743 let object_privileges = catalog
1744 .get_privileges(&object_id)
1745 .expect("only object types with privileges will generate required privileges");
1746 let role_privileges = role_membership
1747 .iter()
1748 .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1749 .map(|mz_acl_item| mz_acl_item.acl_mode)
1750 .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1751 if !role_privileges.contains(acl_mode) {
1752 let role_name = catalog.get_role(&role_id).name().to_string();
1753 let privileges = acl_mode.to_error_string();
1754 return Err(UnauthorizedError::Privilege {
1755 object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1756 role_name,
1757 privileges,
1758 });
1759 }
1760 }
1761
1762 Ok(())
1763}
1764
1765pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1766 const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1767 .union(AclMode::SELECT)
1768 .union(AclMode::UPDATE)
1769 .union(AclMode::DELETE);
1770 const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1771 const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1772 .union(AclMode::CREATE_DB)
1773 .union(AclMode::CREATE_CLUSTER)
1774 .union(AclMode::CREATE_NETWORK_POLICY);
1775
1776 const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1777 match object_type {
1778 SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1779 SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1780 SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1781 SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1782 SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1783 SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1784 SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1785 SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1786 SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1787 SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1788 SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1789 SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1790 SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1791 SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1792 SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1793 SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1794 SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT,
1795 SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1796 }
1797}
1798
1799pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1800 MzAclItem {
1801 grantee: owner_id,
1802 grantor: owner_id,
1803 acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1804 }
1805}
1806
1807const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1808 match object_type {
1809 ObjectType::Table
1810 | ObjectType::View
1811 | ObjectType::MaterializedView
1812 | ObjectType::Source
1813 | ObjectType::ContinualTask => AclMode::SELECT,
1814 ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1815 ObjectType::Sink
1816 | ObjectType::Index
1817 | ObjectType::Role
1818 | ObjectType::Cluster
1819 | ObjectType::ClusterReplica
1820 | ObjectType::Secret
1821 | ObjectType::Connection
1822 | ObjectType::Database
1823 | ObjectType::Func
1824 | ObjectType::NetworkPolicy => AclMode::empty(),
1825 }
1826}
1827
1828pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1829 let acl_mode = default_builtin_object_acl_mode(object_type);
1830 MzAclItem {
1831 grantee: MZ_SUPPORT_ROLE_ID,
1832 grantor: MZ_SYSTEM_ROLE_ID,
1833 acl_mode,
1834 }
1835}
1836
1837pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1838 let acl_mode = default_builtin_object_acl_mode(object_type);
1839 MzAclItem {
1840 grantee: RoleId::Public,
1841 grantor: MZ_SYSTEM_ROLE_ID,
1842 acl_mode,
1843 }
1844}