1use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26 CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29 CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30 SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34 DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40fn rbac_check_preamble(
42 catalog: &impl SessionCatalog,
43 session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45 if catalog
50 .try_get_role(&session_meta.role_metadata().current_role)
51 .is_none()
52 {
53 return Err(UnauthorizedError::ConcurrentRoleDrop(
54 session_meta.role_metadata().current_role.clone(),
55 ));
56 };
57 if catalog
58 .try_get_role(&session_meta.role_metadata().session_role)
59 .is_none()
60 {
61 return Err(UnauthorizedError::ConcurrentRoleDrop(
62 session_meta.role_metadata().session_role.clone(),
63 ));
64 };
65 if catalog
66 .try_get_role(&session_meta.role_metadata().authenticated_role)
67 .is_none()
68 {
69 return Err(UnauthorizedError::ConcurrentRoleDrop(
70 session_meta.role_metadata().authenticated_role.clone(),
71 ));
72 };
73
74 Ok(())
75}
76
77fn filter_requirements(
79 catalog: &impl SessionCatalog,
80 session_meta: &dyn SessionMetadata,
81 rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83 let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86 && !session_meta.role_metadata().current_role.is_system()
87 && !session_meta.role_metadata().session_role.is_system();
88 let is_superuser = session_meta.is_superuser();
90 if is_rbac_disabled || is_superuser {
91 return rbac_requirements.filter_to_mandatory_requirements();
92 }
93
94 rbac_requirements
95}
96
97static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99 btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104 let mut items = DEFAULT_ITEM_USAGE.clone();
105 items.insert(CatalogItemType::Type);
106 items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113 #[error("permission denied to {action}")]
115 Superuser { action: String },
116 #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118 Ownership { objects: Vec<(ObjectType, String)> },
119 #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121 RoleMembership { role_names: Vec<String> },
122 #[error("permission denied for {object_description}")]
124 Privilege {
125 object_description: ErrorMessageObjectDescription,
126 role_name: String,
127 privileges: String,
128 },
129 #[error("permission denied to {action}")]
133 MzSystem { action: String },
134 #[error("permission denied to {action}")]
136 MzSupport { action: String },
137 #[error("role {0} was concurrently dropped")]
139 ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143 pub fn detail(&self) -> Option<String> {
144 match &self {
145 UnauthorizedError::Superuser { action } => {
146 Some(format!("You must be a superuser to {}", action))
147 }
148 UnauthorizedError::Privilege {
149 object_description,
150 role_name,
151 privileges,
152 } => Some(format!(
153 "The '{role_name}' role needs {privileges} privileges on {object_description}"
154 )),
155 UnauthorizedError::MzSystem { .. } => {
156 Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157 }
158 UnauthorizedError::MzSupport { .. } => Some(format!(
159 "The '{}' role has very limited privileges",
160 SUPPORT_USER.name
161 )),
162 UnauthorizedError::ConcurrentRoleDrop(_) => {
163 Some("Please disconnect and re-connect with a valid role.".into())
164 }
165 UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166 }
167 }
168}
169
170#[derive(Debug)]
172struct RbacRequirements {
173 role_membership: BTreeSet<RoleId>,
175 ownership: Vec<ObjectId>,
177 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180 item_usage: &'static BTreeSet<CatalogItemType>,
185 superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190 fn empty() -> RbacRequirements {
191 RbacRequirements {
192 role_membership: BTreeSet::new(),
193 ownership: Vec::new(),
194 privileges: Vec::new(),
195 item_usage: &EMPTY_ITEM_USAGE,
196 superuser_action: None,
197 }
198 }
199
200 fn validate(
201 self,
202 catalog: &impl SessionCatalog,
203 session: &dyn SessionMetadata,
204 resolved_ids: &ResolvedIds,
205 ) -> Result<(), UnauthorizedError> {
206 let role_membership =
208 catalog.collect_role_membership(&session.role_metadata().current_role);
209
210 check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212 let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215 if !unheld_membership.is_empty() {
216 let role_names = unheld_membership
217 .into_iter()
218 .map(|role_id| {
219 catalog
221 .try_get_role(role_id)
222 .map(|role| role.name().to_string())
223 .unwrap_or(role_id.to_string())
224 })
225 .collect();
226 return Err(UnauthorizedError::RoleMembership { role_names });
227 }
228
229 let unheld_ownership = self
232 .ownership
233 .into_iter()
234 .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235 .collect();
236 ownership_err(unheld_ownership, catalog)?;
237
238 check_object_privileges(
239 catalog,
240 self.privileges,
241 role_membership,
242 session.role_metadata().current_role,
243 )?;
244
245 if let Some(action) = self.superuser_action {
246 return Err(UnauthorizedError::Superuser { action });
247 }
248
249 Ok(())
250 }
251
252 fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253 let RbacRequirements {
254 role_membership,
255 ownership,
256 privileges,
257 item_usage,
258 superuser_action: _,
259 } = self;
260 let role_membership = role_membership
261 .into_iter()
262 .filter(|id| id.is_system())
263 .collect();
264 let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265 let privileges = privileges
266 .into_iter()
267 .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268 .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270 .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271 .collect();
272 let superuser_action = None;
273 RbacRequirements {
274 role_membership,
275 ownership,
276 privileges,
277 item_usage,
278 superuser_action,
279 }
280 }
281}
282
283impl Default for RbacRequirements {
284 fn default() -> Self {
285 RbacRequirements {
286 role_membership: BTreeSet::new(),
287 ownership: Vec::new(),
288 privileges: Vec::new(),
289 item_usage: &DEFAULT_ITEM_USAGE,
290 superuser_action: None,
291 }
292 }
293}
294
295pub fn check_usage(
297 catalog: &impl SessionCatalog,
298 session: &dyn SessionMetadata,
299 resolved_ids: &ResolvedIds,
300 item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302 rbac_check_preamble(catalog, session)?;
303
304 let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307 let existing_resolved_ids =
310 resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312 let required_privileges = generate_usage_privileges(
313 catalog,
314 &existing_resolved_ids,
315 session.role_metadata().current_role,
316 item_types,
317 )
318 .into_iter()
319 .collect();
320
321 let mut rbac_requirements = RbacRequirements::empty();
322 rbac_requirements.privileges = required_privileges;
323 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324 let required_privileges = rbac_requirements.privileges;
325
326 check_object_privileges(
327 catalog,
328 required_privileges,
329 role_membership,
330 session.role_metadata().current_role,
331 )?;
332
333 Ok(())
334}
335
336pub fn check_plan(
338 catalog: &impl SessionCatalog,
339 active_conns: impl FnOnce(u32) -> Option<RoleId>,
341 session: &dyn SessionMetadata,
342 plan: &Plan,
343 target_cluster_id: Option<ClusterId>,
344 resolved_ids: &ResolvedIds,
345) -> Result<(), UnauthorizedError> {
346 rbac_check_preamble(catalog, session)?;
347
348 let rbac_requirements = generate_rbac_requirements(
349 catalog,
350 plan,
351 active_conns,
352 target_cluster_id,
353 session.role_metadata().current_role,
354 );
355 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
356 debug!(
357 "rbac requirements {rbac_requirements:?} for plan {:?}",
358 PlanKind::from(plan)
359 );
360 rbac_requirements.validate(catalog, session, resolved_ids)
361}
362
363pub fn is_rbac_enabled_for_session(
365 system_vars: &SystemVars,
366 session: &dyn SessionMetadata,
367) -> bool {
368 let server_enabled = system_vars.enable_rbac_checks();
369 let session_enabled = session.enable_session_rbac_checks();
370
371 server_enabled || session_enabled
374}
375
376fn generate_rbac_requirements(
378 catalog: &impl SessionCatalog,
379 plan: &Plan,
380 active_conns: impl FnOnce(u32) -> Option<RoleId>,
381 target_cluster_id: Option<ClusterId>,
382 role_id: RoleId,
383) -> RbacRequirements {
384 match plan {
385 Plan::CreateConnection(plan::CreateConnectionPlan {
386 name,
387 if_not_exists: _,
388 connection: _,
389 validate: _,
390 }) => RbacRequirements {
391 privileges: vec![(
392 SystemObjectId::Object(name.qualifiers.clone().into()),
393 AclMode::CREATE,
394 role_id,
395 )],
396 item_usage: &CREATE_ITEM_USAGE,
397 ..Default::default()
398 },
399 Plan::CreateDatabase(plan::CreateDatabasePlan {
400 name: _,
401 if_not_exists: _,
402 }) => RbacRequirements {
403 privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
404 item_usage: &CREATE_ITEM_USAGE,
405 ..Default::default()
406 },
407 Plan::CreateSchema(plan::CreateSchemaPlan {
408 database_spec,
409 schema_name: _,
410 if_not_exists: _,
411 }) => {
412 let privileges = match database_spec {
413 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
414 ResolvedDatabaseSpecifier::Id(database_id) => {
415 vec![(
416 SystemObjectId::Object(database_id.into()),
417 AclMode::CREATE,
418 role_id,
419 )]
420 }
421 };
422 RbacRequirements {
423 privileges,
424 item_usage: &CREATE_ITEM_USAGE,
425 ..Default::default()
426 }
427 }
428 Plan::CreateRole(plan::CreateRolePlan {
429 name: _,
430 attributes,
431 }) => {
432 if attributes.superuser.unwrap_or(false) {
433 RbacRequirements {
434 superuser_action: Some("create superuser role".to_string()),
435 ..Default::default()
436 }
437 } else {
438 RbacRequirements {
439 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
440 item_usage: &CREATE_ITEM_USAGE,
441 ..Default::default()
442 }
443 }
444 }
445 Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
446 privileges: vec![(
447 SystemObjectId::System,
448 AclMode::CREATE_NETWORK_POLICY,
449 role_id,
450 )],
451 item_usage: &CREATE_ITEM_USAGE,
452 ..Default::default()
453 },
454 Plan::CreateCluster(plan::CreateClusterPlan {
455 name: _,
456 variant: _,
457 workload_class: _,
458 }) => RbacRequirements {
459 privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
460 item_usage: &CREATE_ITEM_USAGE,
461 ..Default::default()
462 },
463 Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
464 cluster_id,
465 name: _,
466 config: _,
467 }) => RbacRequirements {
468 ownership: vec![ObjectId::Cluster(*cluster_id)],
469 item_usage: &CREATE_ITEM_USAGE,
470 ..Default::default()
471 },
472 Plan::CreateSource(plan::CreateSourcePlan {
473 name,
474 source,
475 if_not_exists: _,
476 timeline: _,
477 in_cluster,
478 }) => RbacRequirements {
479 privileges: generate_required_source_privileges(
480 name,
481 &source.data_source,
482 *in_cluster,
483 role_id,
484 ),
485 item_usage: &CREATE_ITEM_USAGE,
486 ..Default::default()
487 },
488 Plan::CreateSources(plans) => RbacRequirements {
489 privileges: plans
490 .iter()
491 .flat_map(
492 |plan::CreateSourcePlanBundle {
493 item_id: _,
494 global_id: _,
495 plan:
496 plan::CreateSourcePlan {
497 name,
498 source,
499 if_not_exists: _,
500 timeline: _,
501 in_cluster,
502 },
503 resolved_ids: _,
504 available_source_references: _,
505 }| {
506 generate_required_source_privileges(
507 name,
508 &source.data_source,
509 *in_cluster,
510 role_id,
511 )
512 .into_iter()
513 },
514 )
515 .collect(),
516 item_usage: &CREATE_ITEM_USAGE,
517 ..Default::default()
518 },
519 Plan::CreateSecret(plan::CreateSecretPlan {
520 name,
521 secret: _,
522 if_not_exists: _,
523 }) => RbacRequirements {
524 privileges: vec![(
525 SystemObjectId::Object(name.qualifiers.clone().into()),
526 AclMode::CREATE,
527 role_id,
528 )],
529 item_usage: &CREATE_ITEM_USAGE,
530 ..Default::default()
531 },
532 Plan::CreateSink(plan::CreateSinkPlan {
533 name,
534 sink,
535 with_snapshot: _,
536 if_not_exists: _,
537 in_cluster,
538 }) => {
539 let mut privileges = vec![(
540 SystemObjectId::Object(name.qualifiers.clone().into()),
541 AclMode::CREATE,
542 role_id,
543 )];
544 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
545 privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
546 privileges.push((
547 SystemObjectId::Object(in_cluster.into()),
548 AclMode::CREATE,
549 role_id,
550 ));
551 RbacRequirements {
552 privileges,
553 item_usage: &CREATE_ITEM_USAGE,
554 ..Default::default()
555 }
556 }
557 Plan::CreateTable(plan::CreateTablePlan {
558 name,
559 table: _,
560 if_not_exists: _,
561 }) => RbacRequirements {
562 privileges: vec![(
563 SystemObjectId::Object(name.qualifiers.clone().into()),
564 AclMode::CREATE,
565 role_id,
566 )],
567 item_usage: &CREATE_ITEM_USAGE,
568 ..Default::default()
569 },
570 Plan::CreateView(plan::CreateViewPlan {
571 name,
572 view: _,
573 replace,
574 drop_ids: _,
575 if_not_exists: _,
576 ambiguous_columns: _,
577 }) => RbacRequirements {
578 ownership: replace
579 .map(|id| vec![ObjectId::Item(id)])
580 .unwrap_or_default(),
581 privileges: vec![(
582 SystemObjectId::Object(name.qualifiers.clone().into()),
583 AclMode::CREATE,
584 role_id,
585 )],
586 item_usage: &CREATE_ITEM_USAGE,
587 ..Default::default()
588 },
589 Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
590 name,
591 materialized_view,
592 replace,
593 drop_ids: _,
594 if_not_exists: _,
595 ambiguous_columns: _,
596 }) => RbacRequirements {
597 ownership: replace
598 .map(|id| vec![ObjectId::Item(id)])
599 .unwrap_or_default(),
600 privileges: vec![
601 (
602 SystemObjectId::Object(name.qualifiers.clone().into()),
603 AclMode::CREATE,
604 role_id,
605 ),
606 (
607 SystemObjectId::Object(materialized_view.cluster_id.into()),
608 AclMode::CREATE,
609 role_id,
610 ),
611 ],
612 item_usage: &CREATE_ITEM_USAGE,
613 ..Default::default()
614 },
615 Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
616 name,
617 placeholder_id: _,
618 desc: _,
619 input_id: _,
620 with_snapshot: _,
621 continual_task,
622 }) => RbacRequirements {
623 privileges: vec![
624 (
625 SystemObjectId::Object(name.qualifiers.clone().into()),
626 AclMode::CREATE,
627 role_id,
628 ),
629 (
630 SystemObjectId::Object(continual_task.cluster_id.into()),
631 AclMode::CREATE,
632 role_id,
633 ),
634 ],
635 item_usage: &CREATE_ITEM_USAGE,
636 ..Default::default()
637 },
638 Plan::CreateIndex(plan::CreateIndexPlan {
639 name,
640 index,
641 if_not_exists: _,
642 }) => {
643 let index_on_item = catalog.resolve_item_id(&index.on);
644 RbacRequirements {
645 ownership: vec![ObjectId::Item(index_on_item)],
646 privileges: vec![
647 (
648 SystemObjectId::Object(name.qualifiers.clone().into()),
649 AclMode::CREATE,
650 role_id,
651 ),
652 (
653 SystemObjectId::Object(index.cluster_id.into()),
654 AclMode::CREATE,
655 role_id,
656 ),
657 ],
658 item_usage: &CREATE_ITEM_USAGE,
659 ..Default::default()
660 }
661 }
662 Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
663 privileges: vec![(
664 SystemObjectId::Object(name.qualifiers.clone().into()),
665 AclMode::CREATE,
666 role_id,
667 )],
668 item_usage: &CREATE_ITEM_USAGE,
669 ..Default::default()
670 },
671 Plan::Comment(plan::CommentPlan {
672 object_id,
673 sub_component: _,
674 comment: _,
675 }) => {
676 let (ownership, privileges) = match object_id {
677 CommentObjectId::Role(_) => (
680 Vec::new(),
681 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
682 ),
683 _ => (vec![ObjectId::from(*object_id)], Vec::new()),
684 };
685 RbacRequirements {
686 ownership,
687 privileges,
688 ..Default::default()
689 }
690 }
691 Plan::DropObjects(plan::DropObjectsPlan {
692 referenced_ids,
693 drop_ids: _,
694 object_type,
695 }) => {
696 let privileges = if object_type == &ObjectType::Role {
697 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
698 } else {
699 referenced_ids
700 .iter()
701 .filter_map(|id| match id {
702 ObjectId::ClusterReplica((cluster_id, _)) => Some((
703 SystemObjectId::Object(cluster_id.into()),
704 AclMode::USAGE,
705 role_id,
706 )),
707 ObjectId::Schema((database_spec, _)) => match database_spec {
708 ResolvedDatabaseSpecifier::Ambient => None,
709 ResolvedDatabaseSpecifier::Id(database_id) => Some((
710 SystemObjectId::Object(database_id.into()),
711 AclMode::USAGE,
712 role_id,
713 )),
714 },
715 ObjectId::Item(item_id) => {
716 let item = catalog.get_item(item_id);
717 Some((
718 SystemObjectId::Object(item.name().qualifiers.clone().into()),
719 AclMode::USAGE,
720 role_id,
721 ))
722 }
723 ObjectId::Cluster(_)
724 | ObjectId::Database(_)
725 | ObjectId::Role(_)
726 | ObjectId::NetworkPolicy(_) => None,
727 })
728 .collect()
729 };
730 RbacRequirements {
731 ownership: referenced_ids.clone(),
733 privileges,
734 ..Default::default()
735 }
736 }
737 Plan::DropOwned(plan::DropOwnedPlan {
738 role_ids,
739 drop_ids: _,
740 privilege_revokes: _,
741 default_privilege_revokes: _,
742 }) => RbacRequirements {
743 role_membership: role_ids.into_iter().cloned().collect(),
744 ..Default::default()
745 },
746 Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
747 let container_id = match id {
748 ObjectId::Item(id) => Some(SystemObjectId::Object(
749 catalog.get_item(id).name().qualifiers.clone().into(),
750 )),
751 ObjectId::Schema((database_id, _schema_id)) => match database_id {
752 ResolvedDatabaseSpecifier::Ambient => None,
753 ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
754 },
755 ObjectId::Cluster(_)
756 | ObjectId::ClusterReplica(_)
757 | ObjectId::Database(_)
758 | ObjectId::Role(_)
759 | ObjectId::NetworkPolicy(_) => None,
760 };
761 let privileges = match container_id {
762 Some(id) => vec![(id, AclMode::USAGE, role_id)],
763 None => Vec::new(),
764 };
765 RbacRequirements {
766 privileges,
767 item_usage: &EMPTY_ITEM_USAGE,
768 ..Default::default()
769 }
770 }
771 Plan::ShowColumns(plan::ShowColumnsPlan {
772 id,
773 select_plan,
774 new_resolved_ids: _,
775 }) => {
776 let mut privileges = vec![(
777 SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
778 AclMode::USAGE,
779 role_id,
780 )];
781
782 for privilege in generate_rbac_requirements(
783 catalog,
784 &Plan::Select(select_plan.clone()),
785 active_conns,
786 target_cluster_id,
787 role_id,
788 )
789 .privileges
790 {
791 privileges.push(privilege);
792 }
793 RbacRequirements {
794 privileges,
795 ..Default::default()
796 }
797 }
798 Plan::Select(plan::SelectPlan {
799 source,
800 select: _,
801 when: _,
802 finishing: _,
803 copy_to: _,
804 }) => {
805 let items = source
806 .depends_on()
807 .into_iter()
808 .map(|gid| catalog.resolve_item_id(&gid));
809 let mut privileges = generate_read_privileges(catalog, items, role_id);
810 if let Some(privilege) = generate_cluster_usage_privileges(
811 source.as_const().is_some(),
812 target_cluster_id,
813 role_id,
814 ) {
815 privileges.push(privilege);
816 }
817 RbacRequirements {
818 privileges,
819 ..Default::default()
820 }
821 }
822 Plan::Subscribe(plan::SubscribePlan {
823 from,
824 with_snapshot: _,
825 when: _,
826 up_to: _,
827 copy_to: _,
828 emit_progress: _,
829 output: _,
830 }) => {
831 let items = from
832 .depends_on()
833 .into_iter()
834 .map(|gid| catalog.resolve_item_id(&gid));
835 let mut privileges = generate_read_privileges(catalog, items, role_id);
836 if let Some(cluster_id) = target_cluster_id {
837 privileges.push((
838 SystemObjectId::Object(cluster_id.into()),
839 AclMode::USAGE,
840 role_id,
841 ));
842 }
843 RbacRequirements {
844 privileges,
845 ..Default::default()
846 }
847 }
848 Plan::CopyFrom(plan::CopyFromPlan {
849 id,
850 source: _,
851 columns: _,
852 source_desc: _,
853 mfp: _,
854 params: _,
855 filter: _,
856 }) => RbacRequirements {
857 privileges: vec![
858 (
859 SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
860 AclMode::USAGE,
861 role_id,
862 ),
863 (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
864 ],
865 ..Default::default()
866 },
867 Plan::CopyTo(plan::CopyToPlan {
868 select_plan,
869 desc: _,
870 to: _,
871 connection: _,
872 connection_id: _,
873 format: _,
874 max_file_size: _,
875 }) => {
876 let items = select_plan
877 .source
878 .depends_on()
879 .into_iter()
880 .map(|gid| catalog.resolve_item_id(&gid));
881 let mut privileges = generate_read_privileges(catalog, items, role_id);
882 if let Some(cluster_id) = target_cluster_id {
883 privileges.push((
884 SystemObjectId::Object(cluster_id.into()),
885 AclMode::USAGE,
886 role_id,
887 ));
888 }
889 RbacRequirements {
890 privileges,
891 ..Default::default()
892 }
893 }
894 Plan::ExplainPlan(plan::ExplainPlanPlan {
895 stage: _,
896 format: _,
897 config: _,
898 explainee,
899 })
900 | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
901 privileges: match explainee {
902 Explainee::View(id)
903 | Explainee::MaterializedView(id)
904 | Explainee::Index(id)
905 | Explainee::ReplanView(id)
906 | Explainee::ReplanMaterializedView(id)
907 | Explainee::ReplanIndex(id) => {
908 let item = catalog.get_item(id);
909 let schema_id: ObjectId = item.name().qualifiers.clone().into();
910 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
911 }
912 Explainee::Statement(stmt) => stmt
913 .depends_on()
914 .into_iter()
915 .map(|id| {
916 let item = catalog.get_item_by_global_id(&id);
917 let schema_id: ObjectId = item.name().qualifiers.clone().into();
918 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
919 })
920 .collect(),
921 },
922 item_usage: match explainee {
923 Explainee::View(..)
924 | Explainee::MaterializedView(..)
925 | Explainee::Index(..)
926 | Explainee::ReplanView(..)
927 | Explainee::ReplanMaterializedView(..)
928 | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
929 Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
930 },
931 ..Default::default()
932 },
933 Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
934 RbacRequirements {
935 privileges: {
936 let item = catalog.get_item_by_global_id(sink_from);
937 let schema_id: ObjectId = item.name().qualifiers.clone().into();
938 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
939 },
940 item_usage: &EMPTY_ITEM_USAGE,
941 ..Default::default()
942 }
943 }
944 Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
945 format: _,
946 raw_plan,
947 when: _,
948 }) => RbacRequirements {
949 privileges: raw_plan
950 .depends_on()
951 .into_iter()
952 .map(|id| {
953 let item = catalog.get_item_by_global_id(&id);
954 let schema_id: ObjectId = item.name().qualifiers.clone().into();
955 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
956 })
957 .collect(),
958 ..Default::default()
959 },
960 Plan::Insert(plan::InsertPlan {
961 id,
962 values,
963 returning,
964 }) => {
965 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
966 let mut privileges = vec![
967 (
968 SystemObjectId::Object(schema_id.clone()),
969 AclMode::USAGE,
970 role_id,
971 ),
972 (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
973 ];
974 let mut seen = BTreeSet::from([(schema_id, role_id)]);
975
976 if returning
979 .iter()
980 .any(|assignment| assignment.contains_column())
981 {
982 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
983 seen.insert((id.into(), role_id));
984 }
985
986 let items = values
987 .depends_on()
988 .into_iter()
989 .map(|gid| catalog.resolve_item_id(&gid));
990 privileges.extend_from_slice(&generate_read_privileges_inner(
991 catalog, items, role_id, &mut seen,
992 ));
993
994 if let Some(privilege) = generate_cluster_usage_privileges(
995 values.as_const().is_some(),
996 target_cluster_id,
997 role_id,
998 ) {
999 privileges.push(privilege);
1000 } else if !returning.is_empty() {
1001 if let Some(cluster_id) = target_cluster_id {
1004 privileges.push((
1005 SystemObjectId::Object(cluster_id.into()),
1006 AclMode::USAGE,
1007 role_id,
1008 ));
1009 }
1010 }
1011 RbacRequirements {
1012 privileges,
1013 ..Default::default()
1014 }
1015 }
1016 Plan::AlterCluster(plan::AlterClusterPlan {
1017 id,
1018 name: _,
1019 options: _,
1020 strategy: _,
1021 }) => RbacRequirements {
1022 ownership: vec![ObjectId::Cluster(*id)],
1023 item_usage: &CREATE_ITEM_USAGE,
1024 ..Default::default()
1025 },
1026 Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1027 ownership: vec![ObjectId::Item(*id)],
1028 privileges: vec![(
1029 SystemObjectId::Object(set_cluster.into()),
1030 AclMode::CREATE,
1031 role_id,
1032 )],
1033 item_usage: &CREATE_ITEM_USAGE,
1034 ..Default::default()
1035 },
1036 Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1037 id,
1038 window: _,
1039 value: _,
1040 object_type: _,
1041 }) => RbacRequirements {
1042 ownership: vec![ObjectId::Item(*id)],
1043 item_usage: &CREATE_ITEM_USAGE,
1044 ..Default::default()
1045 },
1046 Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1047 ownership: vec![ObjectId::Item(*id)],
1048 ..Default::default()
1049 },
1050 Plan::AlterSource(plan::AlterSourcePlan {
1051 item_id,
1052 ingestion_id: _,
1053 action: _,
1054 }) => RbacRequirements {
1055 ownership: vec![ObjectId::Item(*item_id)],
1056 item_usage: &CREATE_ITEM_USAGE,
1057 ..Default::default()
1058 },
1059 Plan::AlterSink(plan::AlterSinkPlan {
1060 item_id,
1061 global_id: _,
1062 sink,
1063 with_snapshot: _,
1064 in_cluster,
1065 }) => {
1066 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1067 let mut privileges = generate_read_privileges(catalog, items, role_id);
1068 privileges.push((
1069 SystemObjectId::Object(in_cluster.into()),
1070 AclMode::CREATE,
1071 role_id,
1072 ));
1073 RbacRequirements {
1074 ownership: vec![ObjectId::Item(*item_id)],
1075 privileges,
1076 item_usage: &CREATE_ITEM_USAGE,
1077 ..Default::default()
1078 }
1079 }
1080 Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1081 id,
1082 name: _,
1083 to_name: _,
1084 }) => RbacRequirements {
1085 ownership: vec![ObjectId::Cluster(*id)],
1086 ..Default::default()
1087 },
1088 Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1089 id_a,
1090 id_b,
1091 name_a: _,
1092 name_b: _,
1093 name_temp: _,
1094 }) => RbacRequirements {
1095 ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1096 ..Default::default()
1097 },
1098 Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1099 cluster_id,
1100 replica_id,
1101 name: _,
1102 to_name: _,
1103 }) => RbacRequirements {
1104 ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1105 ..Default::default()
1106 },
1107 Plan::AlterItemRename(plan::AlterItemRenamePlan {
1108 id,
1109 current_full_name: _,
1110 to_name: _,
1111 object_type: _,
1112 }) => RbacRequirements {
1113 ownership: vec![ObjectId::Item(*id)],
1114 ..Default::default()
1115 },
1116 Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1117 cur_schema_spec,
1118 new_schema_name: _,
1119 }) => {
1120 let privileges = match cur_schema_spec.0 {
1121 ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1122 SystemObjectId::Object(ObjectId::Database(db_id)),
1123 AclMode::CREATE,
1124 role_id,
1125 )],
1126 ResolvedDatabaseSpecifier::Ambient => vec![],
1127 };
1128
1129 RbacRequirements {
1130 ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1131 privileges,
1132 ..Default::default()
1133 }
1134 }
1135 Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1136 schema_a_spec,
1137 schema_a_name: _,
1138 schema_b_spec,
1139 schema_b_name: _,
1140 name_temp: _,
1141 }) => {
1142 let mut privileges = vec![];
1143 if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1144 privileges.push((
1145 SystemObjectId::Object(ObjectId::Database(id_a)),
1146 AclMode::CREATE,
1147 role_id,
1148 ));
1149 }
1150 if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1151 privileges.push((
1152 SystemObjectId::Object(ObjectId::Database(id_b)),
1153 AclMode::CREATE,
1154 role_id,
1155 ));
1156 }
1157
1158 RbacRequirements {
1159 ownership: vec![
1160 ObjectId::Schema(*schema_a_spec),
1161 ObjectId::Schema(*schema_b_spec),
1162 ],
1163 privileges,
1164 ..Default::default()
1165 }
1166 }
1167 Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1168 ownership: vec![ObjectId::Item(*id)],
1169 item_usage: &CREATE_ITEM_USAGE,
1170 ..Default::default()
1171 },
1172 Plan::AlterRole(plan::AlterRolePlan {
1173 id,
1174 name: _,
1175 option,
1176 }) => match option {
1177 plan::PlannedAlterRoleOption::Attributes(attributes)
1179 if attributes.superuser.unwrap_or(false) =>
1180 {
1181 RbacRequirements {
1182 superuser_action: Some("alter superuser role".to_string()),
1183 ..Default::default()
1184 }
1185 }
1186 plan::PlannedAlterRoleOption::Attributes(attributes)
1188 if attributes.password.is_some() && role_id == *id =>
1189 {
1190 RbacRequirements::default()
1191 }
1192 plan::PlannedAlterRoleOption::Attributes(attributes)
1194 if attributes.password.is_some() =>
1195 {
1196 RbacRequirements {
1197 superuser_action: Some("alter password of role".to_string()),
1198 ..Default::default()
1199 }
1200 }
1201 plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1203 RbacRequirements::default()
1204 }
1205 _ => RbacRequirements {
1207 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1208 item_usage: &CREATE_ITEM_USAGE,
1209 ..Default::default()
1210 },
1211 },
1212 Plan::AlterOwner(plan::AlterOwnerPlan {
1213 id,
1214 object_type: _,
1215 new_owner,
1216 }) => {
1217 let privileges = match id {
1218 ObjectId::ClusterReplica((cluster_id, _)) => {
1219 vec![(
1220 SystemObjectId::Object(cluster_id.into()),
1221 AclMode::CREATE,
1222 role_id,
1223 )]
1224 }
1225 ObjectId::Schema((database_spec, _)) => match database_spec {
1226 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1227 ResolvedDatabaseSpecifier::Id(database_id) => {
1228 vec![(
1229 SystemObjectId::Object(database_id.into()),
1230 AclMode::CREATE,
1231 role_id,
1232 )]
1233 }
1234 },
1235 ObjectId::Item(item_id) => {
1236 let item = catalog.get_item(item_id);
1237 vec![(
1238 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1239 AclMode::CREATE,
1240 role_id,
1241 )]
1242 }
1243 ObjectId::Cluster(_)
1244 | ObjectId::Database(_)
1245 | ObjectId::Role(_)
1246 | ObjectId::NetworkPolicy(_) => Vec::new(),
1247 };
1248 RbacRequirements {
1249 role_membership: BTreeSet::from([*new_owner]),
1250 ownership: vec![id.clone()],
1251 privileges,
1252 ..Default::default()
1253 }
1254 }
1255 Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1256 ownership: vec![ObjectId::Item(*relation_id)],
1257 item_usage: &CREATE_ITEM_USAGE,
1258 ..Default::default()
1259 },
1260 Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1261 ownership: vec![ObjectId::NetworkPolicy(*id)],
1262 item_usage: &CREATE_ITEM_USAGE,
1263 ..Default::default()
1264 },
1265 Plan::ReadThenWrite(plan::ReadThenWritePlan {
1266 id,
1267 selection,
1268 finishing: _,
1269 assignments,
1270 kind,
1271 returning,
1272 }) => {
1273 let acl_mode = match kind {
1274 MutationKind::Insert => AclMode::INSERT,
1275 MutationKind::Update => AclMode::UPDATE,
1276 MutationKind::Delete => AclMode::DELETE,
1277 };
1278 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1279 let mut privileges = vec![
1280 (
1281 SystemObjectId::Object(schema_id.clone()),
1282 AclMode::USAGE,
1283 role_id,
1284 ),
1285 (SystemObjectId::Object(id.into()), acl_mode, role_id),
1286 ];
1287 let mut seen = BTreeSet::from([(schema_id, role_id)]);
1288
1289 if assignments
1292 .values()
1293 .chain(returning.iter())
1294 .any(|assignment| assignment.contains_column())
1295 {
1296 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1297 seen.insert((id.into(), role_id));
1298 }
1299
1300 let items = selection
1307 .depends_on()
1308 .into_iter()
1309 .map(|gid| catalog.resolve_item_id(&gid));
1310 privileges.extend_from_slice(&generate_read_privileges_inner(
1311 catalog, items, role_id, &mut seen,
1312 ));
1313
1314 if let Some(privilege) = generate_cluster_usage_privileges(
1315 selection.as_const().is_some(),
1316 target_cluster_id,
1317 role_id,
1318 ) {
1319 privileges.push(privilege);
1320 }
1321 RbacRequirements {
1322 privileges,
1323 ..Default::default()
1324 }
1325 }
1326 Plan::GrantRole(plan::GrantRolePlan {
1327 role_ids: _,
1328 member_ids: _,
1329 grantor_id: _,
1330 })
1331 | Plan::RevokeRole(plan::RevokeRolePlan {
1332 role_ids: _,
1333 member_ids: _,
1334 grantor_id: _,
1335 }) => RbacRequirements {
1336 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1337 ..Default::default()
1338 },
1339 Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1340 update_privileges,
1341 grantees: _,
1342 })
1343 | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1344 update_privileges,
1345 revokees: _,
1346 }) => {
1347 let mut privileges = Vec::with_capacity(update_privileges.len());
1348 for UpdatePrivilege { target_id, .. } in update_privileges {
1349 match target_id {
1350 SystemObjectId::Object(object_id) => match object_id {
1351 ObjectId::ClusterReplica((cluster_id, _)) => {
1352 privileges.push((
1353 SystemObjectId::Object(cluster_id.into()),
1354 AclMode::USAGE,
1355 role_id,
1356 ));
1357 }
1358 ObjectId::Schema((database_spec, _)) => match database_spec {
1359 ResolvedDatabaseSpecifier::Ambient => {}
1360 ResolvedDatabaseSpecifier::Id(database_id) => {
1361 privileges.push((
1362 SystemObjectId::Object(database_id.into()),
1363 AclMode::USAGE,
1364 role_id,
1365 ));
1366 }
1367 },
1368 ObjectId::Item(item_id) => {
1369 let item = catalog.get_item(item_id);
1370 privileges.push((
1371 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1372 AclMode::USAGE,
1373 role_id,
1374 ))
1375 }
1376 ObjectId::Cluster(_)
1377 | ObjectId::Database(_)
1378 | ObjectId::Role(_)
1379 | ObjectId::NetworkPolicy(_) => {}
1380 },
1381 SystemObjectId::System => {}
1382 }
1383 }
1384 RbacRequirements {
1385 ownership: update_privileges
1386 .iter()
1387 .filter_map(|update_privilege| update_privilege.target_id.object_id())
1388 .cloned()
1389 .collect(),
1390 privileges,
1391 superuser_action: if update_privileges
1396 .iter()
1397 .any(|update_privilege| update_privilege.target_id.is_system())
1398 {
1399 Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1400 } else {
1401 None
1402 },
1403 ..Default::default()
1404 }
1405 }
1406 Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1407 privilege_objects,
1408 privilege_acl_items: _,
1409 is_grant: _,
1410 }) => RbacRequirements {
1411 role_membership: privilege_objects
1412 .iter()
1413 .map(|privilege_object| privilege_object.role_id)
1414 .collect(),
1415 privileges: privilege_objects
1416 .into_iter()
1417 .filter_map(|privilege_object| {
1418 if let (Some(database_id), Some(_)) =
1419 (privilege_object.database_id, privilege_object.schema_id)
1420 {
1421 Some((
1422 SystemObjectId::Object(database_id.into()),
1423 AclMode::USAGE,
1424 role_id,
1425 ))
1426 } else {
1427 None
1428 }
1429 })
1430 .collect(),
1431 superuser_action: if privilege_objects
1437 .iter()
1438 .any(|privilege_object| privilege_object.role_id.is_public())
1439 {
1440 Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1441 } else {
1442 None
1443 },
1444 ..Default::default()
1445 },
1446 Plan::ReassignOwned(plan::ReassignOwnedPlan {
1447 old_roles,
1448 new_role,
1449 reassign_ids: _,
1450 }) => RbacRequirements {
1451 role_membership: old_roles
1452 .into_iter()
1453 .cloned()
1454 .chain(iter::once(*new_role))
1455 .collect(),
1456 ..Default::default()
1457 },
1458 Plan::SideEffectingFunc(func) => {
1459 let role_membership = match func {
1460 SideEffectingFunc::PgCancelBackend { connection_id } => {
1461 active_conns(*connection_id)
1462 .map(|x| [x].into())
1463 .unwrap_or_default()
1464 }
1465 };
1466 RbacRequirements {
1467 role_membership,
1468 ..Default::default()
1469 }
1470 }
1471 Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1472 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1473 RbacRequirements {
1474 privileges: vec![
1475 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1476 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1477 ],
1478 ..Default::default()
1479 }
1480 }
1481 Plan::DiscardTemp
1482 | Plan::DiscardAll
1483 | Plan::EmptyQuery
1484 | Plan::ShowAllVariables
1485 | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1486 | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1487 | Plan::SetVariable(plan::SetVariablePlan {
1488 name: _,
1489 value: _,
1490 local: _,
1491 })
1492 | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1493 | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1494 | Plan::StartTransaction(plan::StartTransactionPlan {
1495 access: _,
1496 isolation_level: _,
1497 })
1498 | Plan::CommitTransaction(plan::CommitTransactionPlan {
1499 transaction_type: _,
1500 })
1501 | Plan::AbortTransaction(plan::AbortTransactionPlan {
1502 transaction_type: _,
1503 })
1504 | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1505 | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1506 | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1507 | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1508 | Plan::Declare(plan::DeclarePlan {
1509 name: _,
1510 stmt: _,
1511 sql: _,
1512 params: _,
1513 })
1514 | Plan::Fetch(plan::FetchPlan {
1515 name: _,
1516 count: _,
1517 timeout: _,
1518 })
1519 | Plan::Close(plan::ClosePlan { name: _ })
1520 | Plan::Prepare(plan::PreparePlan {
1521 name: _,
1522 stmt: _,
1523 desc: _,
1524 sql: _,
1525 })
1526 | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1527 | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1528 | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1529 }
1530}
1531
1532fn check_owner_roles(
1534 object_id: &ObjectId,
1535 role_ids: &BTreeSet<RoleId>,
1536 catalog: &impl SessionCatalog,
1537) -> bool {
1538 if let Some(owner_id) = catalog.get_owner_id(object_id) {
1539 role_ids.contains(&owner_id)
1540 } else {
1541 true
1542 }
1543}
1544
1545fn ownership_err(
1546 unheld_ownership: Vec<ObjectId>,
1547 catalog: &impl SessionCatalog,
1548) -> Result<(), UnauthorizedError> {
1549 if !unheld_ownership.is_empty() {
1550 let objects = unheld_ownership
1551 .into_iter()
1552 .map(|ownership| match ownership {
1553 ObjectId::Cluster(id) => (
1554 ObjectType::Cluster,
1555 catalog.get_cluster(id).name().to_string(),
1556 ),
1557 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1558 let cluster = catalog.get_cluster(cluster_id);
1559 let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1560 let name = QualifiedReplica {
1563 cluster: Ident::new_unchecked(cluster.name()),
1564 replica: Ident::new_unchecked(replica.name()),
1565 };
1566 (ObjectType::ClusterReplica, name.to_string())
1567 }
1568 ObjectId::Database(id) => (
1569 ObjectType::Database,
1570 catalog.get_database(&id).name().to_string(),
1571 ),
1572 ObjectId::Schema((database_spec, schema_spec)) => {
1573 let schema = catalog.get_schema(&database_spec, &schema_spec);
1574 let name = catalog.resolve_full_schema_name(schema.name());
1575 (ObjectType::Schema, name.to_string())
1576 }
1577 ObjectId::Item(id) => {
1578 let item = catalog.get_item(&id);
1579 let name = catalog.resolve_full_name(item.name());
1580 (item.item_type().into(), name.to_string())
1581 }
1582 ObjectId::NetworkPolicy(id) => (
1583 ObjectType::NetworkPolicy,
1584 catalog.get_network_policy(&id).name().to_string(),
1585 ),
1586 ObjectId::Role(_) => unreachable!("roles have no owner"),
1587 })
1588 .collect();
1589 Err(UnauthorizedError::Ownership { objects })
1590 } else {
1591 Ok(())
1592 }
1593}
1594
1595fn generate_required_source_privileges(
1596 name: &QualifiedItemName,
1597 data_source: &DataSourceDesc,
1598 in_cluster: Option<ClusterId>,
1599 role_id: RoleId,
1600) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1601 let mut privileges = vec![(
1602 SystemObjectId::Object(name.qualifiers.clone().into()),
1603 AclMode::CREATE,
1604 role_id,
1605 )];
1606 match (data_source, in_cluster) {
1607 (_, Some(id)) => {
1608 privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1609 }
1610 (DataSourceDesc::Ingestion(_), None) => {
1611 privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1612 }
1613 (_, None) => {}
1618 }
1619 privileges
1620}
1621
1622fn generate_read_privileges(
1630 catalog: &impl SessionCatalog,
1631 ids: impl Iterator<Item = CatalogItemId>,
1632 role_id: RoleId,
1633) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1634 generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1635}
1636
1637fn generate_read_privileges_inner(
1638 catalog: &impl SessionCatalog,
1639 ids: impl Iterator<Item = CatalogItemId>,
1640 role_id: RoleId,
1641 seen: &mut BTreeSet<(ObjectId, RoleId)>,
1642) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1643 let mut privileges = Vec::new();
1644 let mut views = Vec::new();
1645
1646 for id in ids {
1647 if seen.insert((id.into(), role_id)) {
1648 let item = catalog.get_item(&id);
1649 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1650 if seen.insert((schema_id.clone(), role_id)) {
1651 privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1652 }
1653 match item.item_type() {
1654 CatalogItemType::View
1655 | CatalogItemType::MaterializedView
1656 | CatalogItemType::ContinualTask => {
1657 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1658 views.push((item.references().items().copied(), item.owner_id()));
1659 }
1660 CatalogItemType::Table | CatalogItemType::Source => {
1661 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1662 }
1663 CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1664 privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1665 }
1666 CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1667 }
1668 }
1669 }
1670
1671 for (view_ids, view_owner) in views {
1672 privileges.extend_from_slice(&generate_read_privileges_inner(
1673 catalog, view_ids, view_owner, seen,
1674 ));
1675 }
1676
1677 privileges
1678}
1679
1680fn generate_usage_privileges(
1681 catalog: &impl SessionCatalog,
1682 ids: &ResolvedIds,
1683 role_id: RoleId,
1684 item_types: &BTreeSet<CatalogItemType>,
1685) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1686 ids.items()
1688 .filter_map(move |id| {
1689 let item = catalog.get_item(id);
1690 if item_types.contains(&item.item_type()) {
1691 let schema_id = item.name().qualifiers.clone().into();
1692 Some([
1693 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1694 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1695 ])
1696 } else {
1697 None
1698 }
1699 })
1700 .flatten()
1701 .collect()
1702}
1703
1704fn generate_cluster_usage_privileges(
1705 expr_is_const: bool,
1706 target_cluster_id: Option<ClusterId>,
1707 role_id: RoleId,
1708) -> Option<(SystemObjectId, AclMode, RoleId)> {
1709 if !expr_is_const {
1712 if let Some(cluster_id) = target_cluster_id {
1713 return Some((
1714 SystemObjectId::Object(cluster_id.into()),
1715 AclMode::USAGE,
1716 role_id,
1717 ));
1718 }
1719 }
1720
1721 None
1722}
1723
1724fn check_object_privileges(
1725 catalog: &impl SessionCatalog,
1726 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1727 role_membership: BTreeSet<RoleId>,
1728 current_role_id: RoleId,
1729) -> Result<(), UnauthorizedError> {
1730 let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1731 role_memberships.insert(current_role_id, role_membership);
1732 for (object_id, acl_mode, role_id) in privileges {
1733 let role_membership = role_memberships
1734 .entry(role_id)
1735 .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1736 let object_privileges = catalog
1737 .get_privileges(&object_id)
1738 .expect("only object types with privileges will generate required privileges");
1739 let role_privileges = role_membership
1740 .iter()
1741 .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1742 .map(|mz_acl_item| mz_acl_item.acl_mode)
1743 .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1744 if !role_privileges.contains(acl_mode) {
1745 let role_name = catalog.get_role(&role_id).name().to_string();
1746 let privileges = acl_mode.to_error_string();
1747 return Err(UnauthorizedError::Privilege {
1748 object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1749 role_name,
1750 privileges,
1751 });
1752 }
1753 }
1754
1755 Ok(())
1756}
1757
1758pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1759 const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1760 .union(AclMode::SELECT)
1761 .union(AclMode::UPDATE)
1762 .union(AclMode::DELETE);
1763 const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1764 const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1765 .union(AclMode::CREATE_DB)
1766 .union(AclMode::CREATE_CLUSTER)
1767 .union(AclMode::CREATE_NETWORK_POLICY);
1768
1769 const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1770 match object_type {
1771 SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1772 SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1773 SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1774 SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1775 SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1776 SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1777 SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1778 SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1779 SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1780 SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1781 SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1782 SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1783 SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1784 SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1785 SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1786 SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1787 SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT,
1788 SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1789 }
1790}
1791
1792pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1793 MzAclItem {
1794 grantee: owner_id,
1795 grantor: owner_id,
1796 acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1797 }
1798}
1799
1800const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1801 match object_type {
1802 ObjectType::Table
1803 | ObjectType::View
1804 | ObjectType::MaterializedView
1805 | ObjectType::Source
1806 | ObjectType::ContinualTask => AclMode::SELECT,
1807 ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1808 ObjectType::Sink
1809 | ObjectType::Index
1810 | ObjectType::Role
1811 | ObjectType::Cluster
1812 | ObjectType::ClusterReplica
1813 | ObjectType::Secret
1814 | ObjectType::Connection
1815 | ObjectType::Database
1816 | ObjectType::Func
1817 | ObjectType::NetworkPolicy => AclMode::empty(),
1818 }
1819}
1820
1821pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1822 let acl_mode = default_builtin_object_acl_mode(object_type);
1823 MzAclItem {
1824 grantee: MZ_SUPPORT_ROLE_ID,
1825 grantor: MZ_SYSTEM_ROLE_ID,
1826 acl_mode,
1827 }
1828}
1829
1830pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1831 let acl_mode = default_builtin_object_acl_mode(object_type);
1832 MzAclItem {
1833 grantee: RoleId::Public,
1834 grantor: MZ_SYSTEM_ROLE_ID,
1835 acl_mode,
1836 }
1837}