1use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26 CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29 CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30 SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34 DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40fn rbac_check_preamble(
42 catalog: &impl SessionCatalog,
43 session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45 if catalog
50 .try_get_role(&session_meta.role_metadata().current_role)
51 .is_none()
52 {
53 return Err(UnauthorizedError::ConcurrentRoleDrop(
54 session_meta.role_metadata().current_role.clone(),
55 ));
56 };
57 if catalog
58 .try_get_role(&session_meta.role_metadata().session_role)
59 .is_none()
60 {
61 return Err(UnauthorizedError::ConcurrentRoleDrop(
62 session_meta.role_metadata().session_role.clone(),
63 ));
64 };
65 if catalog
66 .try_get_role(&session_meta.role_metadata().authenticated_role)
67 .is_none()
68 {
69 return Err(UnauthorizedError::ConcurrentRoleDrop(
70 session_meta.role_metadata().authenticated_role.clone(),
71 ));
72 };
73
74 Ok(())
75}
76
77fn filter_requirements(
79 catalog: &impl SessionCatalog,
80 session_meta: &dyn SessionMetadata,
81 rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83 let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86 && !session_meta.role_metadata().current_role.is_system()
87 && !session_meta.role_metadata().session_role.is_system();
88 let is_superuser = session_meta.is_superuser();
90 if is_rbac_disabled || is_superuser {
91 return rbac_requirements.filter_to_mandatory_requirements();
92 }
93
94 rbac_requirements
95}
96
97static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99 btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104 let mut items = DEFAULT_ITEM_USAGE.clone();
105 items.insert(CatalogItemType::Type);
106 items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113 #[error("permission denied to {action}")]
115 Superuser { action: String },
116 #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118 Ownership { objects: Vec<(ObjectType, String)> },
119 #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121 RoleMembership { role_names: Vec<String> },
122 #[error("permission denied for {object_description}")]
124 Privilege {
125 object_description: ErrorMessageObjectDescription,
126 role_name: String,
127 privileges: String,
128 },
129 #[error("permission denied to {action}")]
133 MzSystem { action: String },
134 #[error("permission denied to {action}")]
136 MzSupport { action: String },
137 #[error("role {0} was concurrently dropped")]
139 ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143 pub fn detail(&self) -> Option<String> {
144 match &self {
145 UnauthorizedError::Superuser { action } => {
146 Some(format!("You must be a superuser to {}", action))
147 }
148 UnauthorizedError::Privilege {
149 object_description,
150 role_name,
151 privileges,
152 } => Some(format!(
153 "The '{role_name}' role needs {privileges} privileges on {object_description}"
154 )),
155 UnauthorizedError::MzSystem { .. } => {
156 Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157 }
158 UnauthorizedError::MzSupport { .. } => Some(format!(
159 "The '{}' role has very limited privileges",
160 SUPPORT_USER.name
161 )),
162 UnauthorizedError::ConcurrentRoleDrop(_) => {
163 Some("Please disconnect and re-connect with a valid role.".into())
164 }
165 UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166 }
167 }
168}
169
170#[derive(Debug)]
172struct RbacRequirements {
173 role_membership: BTreeSet<RoleId>,
175 ownership: Vec<ObjectId>,
177 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180 item_usage: &'static BTreeSet<CatalogItemType>,
185 superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190 fn empty() -> RbacRequirements {
191 RbacRequirements {
192 role_membership: BTreeSet::new(),
193 ownership: Vec::new(),
194 privileges: Vec::new(),
195 item_usage: &EMPTY_ITEM_USAGE,
196 superuser_action: None,
197 }
198 }
199
200 fn validate(
201 self,
202 catalog: &impl SessionCatalog,
203 session: &dyn SessionMetadata,
204 resolved_ids: &ResolvedIds,
205 ) -> Result<(), UnauthorizedError> {
206 let role_membership =
208 catalog.collect_role_membership(&session.role_metadata().current_role);
209
210 check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212 let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215 if !unheld_membership.is_empty() {
216 let role_names = unheld_membership
217 .into_iter()
218 .map(|role_id| {
219 catalog
221 .try_get_role(role_id)
222 .map(|role| role.name().to_string())
223 .unwrap_or_else(|| role_id.to_string())
224 })
225 .collect();
226 return Err(UnauthorizedError::RoleMembership { role_names });
227 }
228
229 let unheld_ownership = self
232 .ownership
233 .into_iter()
234 .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235 .collect();
236 ownership_err(unheld_ownership, catalog)?;
237
238 check_object_privileges(
239 catalog,
240 self.privileges,
241 role_membership,
242 session.role_metadata().current_role,
243 )?;
244
245 if let Some(action) = self.superuser_action {
246 return Err(UnauthorizedError::Superuser { action });
247 }
248
249 Ok(())
250 }
251
252 fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253 let RbacRequirements {
254 role_membership,
255 ownership,
256 privileges,
257 item_usage,
258 superuser_action: _,
259 } = self;
260 let role_membership = role_membership
261 .into_iter()
262 .filter(|id| id.is_system())
263 .collect();
264 let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265 let privileges = privileges
266 .into_iter()
267 .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268 .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270 .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271 .collect();
272 let superuser_action = None;
273 RbacRequirements {
274 role_membership,
275 ownership,
276 privileges,
277 item_usage,
278 superuser_action,
279 }
280 }
281}
282
283impl Default for RbacRequirements {
284 fn default() -> Self {
285 RbacRequirements {
286 role_membership: BTreeSet::new(),
287 ownership: Vec::new(),
288 privileges: Vec::new(),
289 item_usage: &DEFAULT_ITEM_USAGE,
290 superuser_action: None,
291 }
292 }
293}
294
295pub fn check_usage(
297 catalog: &impl SessionCatalog,
298 session: &dyn SessionMetadata,
299 resolved_ids: &ResolvedIds,
300 item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302 rbac_check_preamble(catalog, session)?;
303
304 let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307 let existing_resolved_ids =
310 resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312 let required_privileges = generate_usage_privileges(
313 catalog,
314 &existing_resolved_ids,
315 session.role_metadata().current_role,
316 item_types,
317 )
318 .into_iter()
319 .collect();
320
321 let mut rbac_requirements = RbacRequirements::empty();
322 rbac_requirements.privileges = required_privileges;
323 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324 let required_privileges = rbac_requirements.privileges;
325
326 check_object_privileges(
327 catalog,
328 required_privileges,
329 role_membership,
330 session.role_metadata().current_role,
331 )?;
332
333 Ok(())
334}
335
336pub fn check_plan(
338 catalog: &impl SessionCatalog,
339 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
342 session: &dyn SessionMetadata,
343 plan: &Plan,
344 target_cluster_id: Option<ClusterId>,
345 resolved_ids: &ResolvedIds,
346) -> Result<(), UnauthorizedError> {
347 rbac_check_preamble(catalog, session)?;
348
349 let rbac_requirements = generate_rbac_requirements(
350 catalog,
351 plan,
352 active_conns,
353 target_cluster_id,
354 session.role_metadata().current_role,
355 );
356 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
357 debug!(
358 "rbac requirements {rbac_requirements:?} for plan {:?}",
359 PlanKind::from(plan)
360 );
361 rbac_requirements.validate(catalog, session, resolved_ids)
362}
363
364pub fn is_rbac_enabled_for_session(
366 system_vars: &SystemVars,
367 session: &dyn SessionMetadata,
368) -> bool {
369 let server_enabled = system_vars.enable_rbac_checks();
370 let session_enabled = session.enable_session_rbac_checks();
371
372 server_enabled || session_enabled
375}
376
377fn generate_rbac_requirements(
379 catalog: &impl SessionCatalog,
380 plan: &Plan,
381 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
382 target_cluster_id: Option<ClusterId>,
383 role_id: RoleId,
384) -> RbacRequirements {
385 match plan {
386 Plan::CreateConnection(plan::CreateConnectionPlan {
387 name,
388 if_not_exists: _,
389 connection: _,
390 validate: _,
391 }) => RbacRequirements {
392 privileges: vec![(
393 SystemObjectId::Object(name.qualifiers.clone().into()),
394 AclMode::CREATE,
395 role_id,
396 )],
397 item_usage: &CREATE_ITEM_USAGE,
398 ..Default::default()
399 },
400 Plan::CreateDatabase(plan::CreateDatabasePlan {
401 name: _,
402 if_not_exists: _,
403 }) => RbacRequirements {
404 privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
405 item_usage: &CREATE_ITEM_USAGE,
406 ..Default::default()
407 },
408 Plan::CreateSchema(plan::CreateSchemaPlan {
409 database_spec,
410 schema_name: _,
411 if_not_exists: _,
412 }) => {
413 let privileges = match database_spec {
414 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
415 ResolvedDatabaseSpecifier::Id(database_id) => {
416 vec![(
417 SystemObjectId::Object(database_id.into()),
418 AclMode::CREATE,
419 role_id,
420 )]
421 }
422 };
423 RbacRequirements {
424 privileges,
425 item_usage: &CREATE_ITEM_USAGE,
426 ..Default::default()
427 }
428 }
429 Plan::CreateRole(plan::CreateRolePlan {
430 name: _,
431 attributes,
432 }) => {
433 if attributes.superuser.unwrap_or(false) {
434 RbacRequirements {
435 superuser_action: Some("create superuser role".to_string()),
436 ..Default::default()
437 }
438 } else {
439 RbacRequirements {
440 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
441 item_usage: &CREATE_ITEM_USAGE,
442 ..Default::default()
443 }
444 }
445 }
446 Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
447 privileges: vec![(
448 SystemObjectId::System,
449 AclMode::CREATE_NETWORK_POLICY,
450 role_id,
451 )],
452 item_usage: &CREATE_ITEM_USAGE,
453 ..Default::default()
454 },
455 Plan::CreateCluster(plan::CreateClusterPlan {
456 name: _,
457 variant: _,
458 workload_class: _,
459 }) => RbacRequirements {
460 privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
461 item_usage: &CREATE_ITEM_USAGE,
462 ..Default::default()
463 },
464 Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
465 cluster_id,
466 name: _,
467 config: _,
468 }) => RbacRequirements {
469 ownership: vec![ObjectId::Cluster(*cluster_id)],
470 item_usage: &CREATE_ITEM_USAGE,
471 ..Default::default()
472 },
473 Plan::CreateSource(plan::CreateSourcePlan {
474 name,
475 source,
476 if_not_exists: _,
477 timeline: _,
478 in_cluster,
479 }) => RbacRequirements {
480 privileges: generate_required_source_privileges(
481 name,
482 &source.data_source,
483 *in_cluster,
484 role_id,
485 ),
486 item_usage: &CREATE_ITEM_USAGE,
487 ..Default::default()
488 },
489 Plan::CreateSources(plans) => RbacRequirements {
490 privileges: plans
491 .iter()
492 .flat_map(
493 |plan::CreateSourcePlanBundle {
494 item_id: _,
495 global_id: _,
496 plan:
497 plan::CreateSourcePlan {
498 name,
499 source,
500 if_not_exists: _,
501 timeline: _,
502 in_cluster,
503 },
504 resolved_ids: _,
505 available_source_references: _,
506 }| {
507 generate_required_source_privileges(
508 name,
509 &source.data_source,
510 *in_cluster,
511 role_id,
512 )
513 .into_iter()
514 },
515 )
516 .collect(),
517 item_usage: &CREATE_ITEM_USAGE,
518 ..Default::default()
519 },
520 Plan::CreateSecret(plan::CreateSecretPlan {
521 name,
522 secret: _,
523 if_not_exists: _,
524 }) => RbacRequirements {
525 privileges: vec![(
526 SystemObjectId::Object(name.qualifiers.clone().into()),
527 AclMode::CREATE,
528 role_id,
529 )],
530 item_usage: &CREATE_ITEM_USAGE,
531 ..Default::default()
532 },
533 Plan::CreateSink(plan::CreateSinkPlan {
534 name,
535 sink,
536 with_snapshot: _,
537 if_not_exists: _,
538 in_cluster,
539 }) => {
540 let mut privileges = vec![(
541 SystemObjectId::Object(name.qualifiers.clone().into()),
542 AclMode::CREATE,
543 role_id,
544 )];
545 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
546 privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
547 privileges.push((
548 SystemObjectId::Object(in_cluster.into()),
549 AclMode::CREATE,
550 role_id,
551 ));
552 RbacRequirements {
553 privileges,
554 item_usage: &CREATE_ITEM_USAGE,
555 ..Default::default()
556 }
557 }
558 Plan::CreateTable(plan::CreateTablePlan {
559 name,
560 table: _,
561 if_not_exists: _,
562 }) => RbacRequirements {
563 privileges: vec![(
564 SystemObjectId::Object(name.qualifiers.clone().into()),
565 AclMode::CREATE,
566 role_id,
567 )],
568 item_usage: &CREATE_ITEM_USAGE,
569 ..Default::default()
570 },
571 Plan::CreateView(plan::CreateViewPlan {
572 name,
573 view: _,
574 replace,
575 drop_ids: _,
576 if_not_exists: _,
577 ambiguous_columns: _,
578 }) => RbacRequirements {
579 ownership: replace
580 .map(|id| vec![ObjectId::Item(id)])
581 .unwrap_or_default(),
582 privileges: vec![(
583 SystemObjectId::Object(name.qualifiers.clone().into()),
584 AclMode::CREATE,
585 role_id,
586 )],
587 item_usage: &CREATE_ITEM_USAGE,
588 ..Default::default()
589 },
590 Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
591 name,
592 materialized_view,
593 replace,
594 drop_ids: _,
595 if_not_exists: _,
596 ambiguous_columns: _,
597 }) => RbacRequirements {
598 ownership: replace
599 .map(|id| vec![ObjectId::Item(id)])
600 .unwrap_or_default(),
601 privileges: vec![
602 (
603 SystemObjectId::Object(name.qualifiers.clone().into()),
604 AclMode::CREATE,
605 role_id,
606 ),
607 (
608 SystemObjectId::Object(materialized_view.cluster_id.into()),
609 AclMode::CREATE,
610 role_id,
611 ),
612 ],
613 item_usage: &CREATE_ITEM_USAGE,
614 ..Default::default()
615 },
616 Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
617 name,
618 placeholder_id: _,
619 desc: _,
620 input_id: _,
621 with_snapshot: _,
622 continual_task,
623 }) => RbacRequirements {
624 privileges: vec![
625 (
626 SystemObjectId::Object(name.qualifiers.clone().into()),
627 AclMode::CREATE,
628 role_id,
629 ),
630 (
631 SystemObjectId::Object(continual_task.cluster_id.into()),
632 AclMode::CREATE,
633 role_id,
634 ),
635 ],
636 item_usage: &CREATE_ITEM_USAGE,
637 ..Default::default()
638 },
639 Plan::CreateIndex(plan::CreateIndexPlan {
640 name,
641 index,
642 if_not_exists: _,
643 }) => {
644 let index_on_item = catalog.resolve_item_id(&index.on);
645 RbacRequirements {
646 ownership: vec![ObjectId::Item(index_on_item)],
647 privileges: vec![
648 (
649 SystemObjectId::Object(name.qualifiers.clone().into()),
650 AclMode::CREATE,
651 role_id,
652 ),
653 (
654 SystemObjectId::Object(index.cluster_id.into()),
655 AclMode::CREATE,
656 role_id,
657 ),
658 ],
659 item_usage: &CREATE_ITEM_USAGE,
660 ..Default::default()
661 }
662 }
663 Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
664 privileges: vec![(
665 SystemObjectId::Object(name.qualifiers.clone().into()),
666 AclMode::CREATE,
667 role_id,
668 )],
669 item_usage: &CREATE_ITEM_USAGE,
670 ..Default::default()
671 },
672 Plan::Comment(plan::CommentPlan {
673 object_id,
674 sub_component: _,
675 comment: _,
676 }) => {
677 let (ownership, privileges) = match object_id {
678 CommentObjectId::Role(_) => (
681 Vec::new(),
682 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
683 ),
684 _ => (vec![ObjectId::from(*object_id)], Vec::new()),
685 };
686 RbacRequirements {
687 ownership,
688 privileges,
689 ..Default::default()
690 }
691 }
692 Plan::DropObjects(plan::DropObjectsPlan {
693 referenced_ids,
694 drop_ids: _,
695 object_type,
696 }) => {
697 let privileges = if object_type == &ObjectType::Role {
698 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
699 } else {
700 referenced_ids
701 .iter()
702 .filter_map(|id| match id {
703 ObjectId::ClusterReplica((cluster_id, _)) => Some((
704 SystemObjectId::Object(cluster_id.into()),
705 AclMode::USAGE,
706 role_id,
707 )),
708 ObjectId::Schema((database_spec, _)) => match database_spec {
709 ResolvedDatabaseSpecifier::Ambient => None,
710 ResolvedDatabaseSpecifier::Id(database_id) => Some((
711 SystemObjectId::Object(database_id.into()),
712 AclMode::USAGE,
713 role_id,
714 )),
715 },
716 ObjectId::Item(item_id) => {
717 let item = catalog.get_item(item_id);
718 Some((
719 SystemObjectId::Object(item.name().qualifiers.clone().into()),
720 AclMode::USAGE,
721 role_id,
722 ))
723 }
724 ObjectId::Cluster(_)
725 | ObjectId::Database(_)
726 | ObjectId::Role(_)
727 | ObjectId::NetworkPolicy(_) => None,
728 })
729 .collect()
730 };
731 RbacRequirements {
732 ownership: referenced_ids.clone(),
734 privileges,
735 ..Default::default()
736 }
737 }
738 Plan::DropOwned(plan::DropOwnedPlan {
739 role_ids,
740 drop_ids: _,
741 privilege_revokes: _,
742 default_privilege_revokes: _,
743 }) => RbacRequirements {
744 role_membership: role_ids.into_iter().cloned().collect(),
745 ..Default::default()
746 },
747 Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
748 let container_id = match id {
749 ObjectId::Item(id) => Some(SystemObjectId::Object(
750 catalog.get_item(id).name().qualifiers.clone().into(),
751 )),
752 ObjectId::Schema((database_id, _schema_id)) => match database_id {
753 ResolvedDatabaseSpecifier::Ambient => None,
754 ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
755 },
756 ObjectId::Cluster(_)
757 | ObjectId::ClusterReplica(_)
758 | ObjectId::Database(_)
759 | ObjectId::Role(_)
760 | ObjectId::NetworkPolicy(_) => None,
761 };
762 let privileges = match container_id {
763 Some(id) => vec![(id, AclMode::USAGE, role_id)],
764 None => Vec::new(),
765 };
766 RbacRequirements {
767 privileges,
768 item_usage: &EMPTY_ITEM_USAGE,
769 ..Default::default()
770 }
771 }
772 Plan::ShowColumns(plan::ShowColumnsPlan {
773 id,
774 select_plan,
775 new_resolved_ids: _,
776 }) => {
777 let mut privileges = vec![(
778 SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
779 AclMode::USAGE,
780 role_id,
781 )];
782
783 for privilege in generate_rbac_requirements(
784 catalog,
785 &Plan::Select(select_plan.clone()),
786 active_conns,
787 target_cluster_id,
788 role_id,
789 )
790 .privileges
791 {
792 privileges.push(privilege);
793 }
794 RbacRequirements {
795 privileges,
796 ..Default::default()
797 }
798 }
799 Plan::Select(plan::SelectPlan {
800 source,
801 select: _,
802 when: _,
803 finishing: _,
804 copy_to: _,
805 }) => {
806 let items = source
807 .depends_on()
808 .into_iter()
809 .map(|gid| catalog.resolve_item_id(&gid));
810 let mut privileges = generate_read_privileges(catalog, items, role_id);
811 if let Some(privilege) = generate_cluster_usage_privileges(
812 source.as_const().is_some(),
813 target_cluster_id,
814 role_id,
815 ) {
816 privileges.push(privilege);
817 }
818 RbacRequirements {
819 privileges,
820 ..Default::default()
821 }
822 }
823 Plan::Subscribe(plan::SubscribePlan {
824 from,
825 with_snapshot: _,
826 when: _,
827 up_to: _,
828 copy_to: _,
829 emit_progress: _,
830 output: _,
831 }) => {
832 let items = from
833 .depends_on()
834 .into_iter()
835 .map(|gid| catalog.resolve_item_id(&gid));
836 let mut privileges = generate_read_privileges(catalog, items, role_id);
837 if let Some(cluster_id) = target_cluster_id {
838 privileges.push((
839 SystemObjectId::Object(cluster_id.into()),
840 AclMode::USAGE,
841 role_id,
842 ));
843 }
844 RbacRequirements {
845 privileges,
846 ..Default::default()
847 }
848 }
849 Plan::CopyFrom(plan::CopyFromPlan {
850 target_name: _,
851 target_id,
852 source: _,
853 columns: _,
854 source_desc: _,
855 mfp: _,
856 params: _,
857 filter: _,
858 }) => RbacRequirements {
859 privileges: vec![
860 (
861 SystemObjectId::Object(
862 catalog.get_item(target_id).name().qualifiers.clone().into(),
863 ),
864 AclMode::USAGE,
865 role_id,
866 ),
867 (
868 SystemObjectId::Object(target_id.into()),
869 AclMode::INSERT,
870 role_id,
871 ),
872 ],
873 ..Default::default()
874 },
875 Plan::CopyTo(plan::CopyToPlan {
876 select_plan,
877 desc: _,
878 to: _,
879 connection: _,
880 connection_id: _,
881 format: _,
882 max_file_size: _,
883 }) => {
884 let items = select_plan
885 .source
886 .depends_on()
887 .into_iter()
888 .map(|gid| catalog.resolve_item_id(&gid));
889 let mut privileges = generate_read_privileges(catalog, items, role_id);
890 if let Some(cluster_id) = target_cluster_id {
891 privileges.push((
892 SystemObjectId::Object(cluster_id.into()),
893 AclMode::USAGE,
894 role_id,
895 ));
896 }
897 RbacRequirements {
898 privileges,
899 ..Default::default()
900 }
901 }
902 Plan::ExplainPlan(plan::ExplainPlanPlan {
903 stage: _,
904 format: _,
905 config: _,
906 explainee,
907 })
908 | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
909 privileges: match explainee {
910 Explainee::View(id)
911 | Explainee::MaterializedView(id)
912 | Explainee::Index(id)
913 | Explainee::ReplanView(id)
914 | Explainee::ReplanMaterializedView(id)
915 | Explainee::ReplanIndex(id) => {
916 let item = catalog.get_item(id);
917 let schema_id: ObjectId = item.name().qualifiers.clone().into();
918 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
919 }
920 Explainee::Statement(stmt) => stmt
921 .depends_on()
922 .into_iter()
923 .map(|id| {
924 let item = catalog.get_item_by_global_id(&id);
925 let schema_id: ObjectId = item.name().qualifiers.clone().into();
926 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
927 })
928 .collect(),
929 },
930 item_usage: match explainee {
931 Explainee::View(..)
932 | Explainee::MaterializedView(..)
933 | Explainee::Index(..)
934 | Explainee::ReplanView(..)
935 | Explainee::ReplanMaterializedView(..)
936 | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
937 Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
938 },
939 ..Default::default()
940 },
941 Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
942 RbacRequirements {
943 privileges: {
944 let item = catalog.get_item_by_global_id(sink_from);
945 let schema_id: ObjectId = item.name().qualifiers.clone().into();
946 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
947 },
948 item_usage: &EMPTY_ITEM_USAGE,
949 ..Default::default()
950 }
951 }
952 Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
953 format: _,
954 raw_plan,
955 when: _,
956 }) => RbacRequirements {
957 privileges: raw_plan
958 .depends_on()
959 .into_iter()
960 .map(|id| {
961 let item = catalog.get_item_by_global_id(&id);
962 let schema_id: ObjectId = item.name().qualifiers.clone().into();
963 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
964 })
965 .collect(),
966 ..Default::default()
967 },
968 Plan::Insert(plan::InsertPlan {
969 id,
970 values,
971 returning,
972 }) => {
973 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
974 let mut privileges = vec![
975 (
976 SystemObjectId::Object(schema_id.clone()),
977 AclMode::USAGE,
978 role_id,
979 ),
980 (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
981 ];
982 let mut seen = BTreeSet::from([(schema_id, role_id)]);
983
984 if returning
987 .iter()
988 .any(|assignment| assignment.contains_column())
989 {
990 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
991 seen.insert((id.into(), role_id));
992 }
993
994 let items = values
995 .depends_on()
996 .into_iter()
997 .map(|gid| catalog.resolve_item_id(&gid));
998 privileges.extend_from_slice(&generate_read_privileges_inner(
999 catalog, items, role_id, &mut seen,
1000 ));
1001
1002 if let Some(privilege) = generate_cluster_usage_privileges(
1003 values.as_const().is_some(),
1004 target_cluster_id,
1005 role_id,
1006 ) {
1007 privileges.push(privilege);
1008 } else if !returning.is_empty() {
1009 if let Some(cluster_id) = target_cluster_id {
1012 privileges.push((
1013 SystemObjectId::Object(cluster_id.into()),
1014 AclMode::USAGE,
1015 role_id,
1016 ));
1017 }
1018 }
1019 RbacRequirements {
1020 privileges,
1021 ..Default::default()
1022 }
1023 }
1024 Plan::AlterCluster(plan::AlterClusterPlan {
1025 id,
1026 name: _,
1027 options: _,
1028 strategy: _,
1029 }) => RbacRequirements {
1030 ownership: vec![ObjectId::Cluster(*id)],
1031 item_usage: &CREATE_ITEM_USAGE,
1032 ..Default::default()
1033 },
1034 Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1035 ownership: vec![ObjectId::Item(*id)],
1036 privileges: vec![(
1037 SystemObjectId::Object(set_cluster.into()),
1038 AclMode::CREATE,
1039 role_id,
1040 )],
1041 item_usage: &CREATE_ITEM_USAGE,
1042 ..Default::default()
1043 },
1044 Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1045 id,
1046 window: _,
1047 value: _,
1048 object_type: _,
1049 }) => RbacRequirements {
1050 ownership: vec![ObjectId::Item(*id)],
1051 item_usage: &CREATE_ITEM_USAGE,
1052 ..Default::default()
1053 },
1054 Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1055 ownership: vec![ObjectId::Item(*id)],
1056 ..Default::default()
1057 },
1058 Plan::AlterSource(plan::AlterSourcePlan {
1059 item_id,
1060 ingestion_id: _,
1061 action: _,
1062 }) => RbacRequirements {
1063 ownership: vec![ObjectId::Item(*item_id)],
1064 item_usage: &CREATE_ITEM_USAGE,
1065 ..Default::default()
1066 },
1067 Plan::AlterSink(plan::AlterSinkPlan {
1068 item_id,
1069 global_id: _,
1070 sink,
1071 with_snapshot: _,
1072 in_cluster,
1073 }) => {
1074 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1075 let mut privileges = generate_read_privileges(catalog, items, role_id);
1076 privileges.push((
1077 SystemObjectId::Object(in_cluster.into()),
1078 AclMode::CREATE,
1079 role_id,
1080 ));
1081 RbacRequirements {
1082 ownership: vec![ObjectId::Item(*item_id)],
1083 privileges,
1084 item_usage: &CREATE_ITEM_USAGE,
1085 ..Default::default()
1086 }
1087 }
1088 Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1089 id,
1090 name: _,
1091 to_name: _,
1092 }) => RbacRequirements {
1093 ownership: vec![ObjectId::Cluster(*id)],
1094 ..Default::default()
1095 },
1096 Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1097 id_a,
1098 id_b,
1099 name_a: _,
1100 name_b: _,
1101 name_temp: _,
1102 }) => RbacRequirements {
1103 ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1104 ..Default::default()
1105 },
1106 Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1107 cluster_id,
1108 replica_id,
1109 name: _,
1110 to_name: _,
1111 }) => RbacRequirements {
1112 ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1113 ..Default::default()
1114 },
1115 Plan::AlterItemRename(plan::AlterItemRenamePlan {
1116 id,
1117 current_full_name: _,
1118 to_name: _,
1119 object_type: _,
1120 }) => RbacRequirements {
1121 ownership: vec![ObjectId::Item(*id)],
1122 ..Default::default()
1123 },
1124 Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1125 cur_schema_spec,
1126 new_schema_name: _,
1127 }) => {
1128 let privileges = match cur_schema_spec.0 {
1129 ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1130 SystemObjectId::Object(ObjectId::Database(db_id)),
1131 AclMode::CREATE,
1132 role_id,
1133 )],
1134 ResolvedDatabaseSpecifier::Ambient => vec![],
1135 };
1136
1137 RbacRequirements {
1138 ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1139 privileges,
1140 ..Default::default()
1141 }
1142 }
1143 Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1144 schema_a_spec,
1145 schema_a_name: _,
1146 schema_b_spec,
1147 schema_b_name: _,
1148 name_temp: _,
1149 }) => {
1150 let mut privileges = vec![];
1151 if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1152 privileges.push((
1153 SystemObjectId::Object(ObjectId::Database(id_a)),
1154 AclMode::CREATE,
1155 role_id,
1156 ));
1157 }
1158 if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1159 privileges.push((
1160 SystemObjectId::Object(ObjectId::Database(id_b)),
1161 AclMode::CREATE,
1162 role_id,
1163 ));
1164 }
1165
1166 RbacRequirements {
1167 ownership: vec![
1168 ObjectId::Schema(*schema_a_spec),
1169 ObjectId::Schema(*schema_b_spec),
1170 ],
1171 privileges,
1172 ..Default::default()
1173 }
1174 }
1175 Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1176 ownership: vec![ObjectId::Item(*id)],
1177 item_usage: &CREATE_ITEM_USAGE,
1178 ..Default::default()
1179 },
1180 Plan::AlterRole(plan::AlterRolePlan {
1181 id,
1182 name: _,
1183 option,
1184 }) => match option {
1185 plan::PlannedAlterRoleOption::Attributes(attributes)
1187 if attributes.superuser.unwrap_or(false) =>
1188 {
1189 RbacRequirements {
1190 superuser_action: Some("alter superuser role".to_string()),
1191 ..Default::default()
1192 }
1193 }
1194 plan::PlannedAlterRoleOption::Attributes(attributes)
1196 if attributes.password.is_some() && role_id == *id =>
1197 {
1198 RbacRequirements::default()
1199 }
1200 plan::PlannedAlterRoleOption::Attributes(attributes)
1202 if attributes.password.is_some() =>
1203 {
1204 RbacRequirements {
1205 superuser_action: Some("alter password of role".to_string()),
1206 ..Default::default()
1207 }
1208 }
1209 plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1211 RbacRequirements::default()
1212 }
1213 _ => RbacRequirements {
1215 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1216 item_usage: &CREATE_ITEM_USAGE,
1217 ..Default::default()
1218 },
1219 },
1220 Plan::AlterOwner(plan::AlterOwnerPlan {
1221 id,
1222 object_type: _,
1223 new_owner,
1224 }) => {
1225 let privileges = match id {
1226 ObjectId::ClusterReplica((cluster_id, _)) => {
1227 vec![(
1228 SystemObjectId::Object(cluster_id.into()),
1229 AclMode::CREATE,
1230 role_id,
1231 )]
1232 }
1233 ObjectId::Schema((database_spec, _)) => match database_spec {
1234 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1235 ResolvedDatabaseSpecifier::Id(database_id) => {
1236 vec![(
1237 SystemObjectId::Object(database_id.into()),
1238 AclMode::CREATE,
1239 role_id,
1240 )]
1241 }
1242 },
1243 ObjectId::Item(item_id) => {
1244 let item = catalog.get_item(item_id);
1245 vec![(
1246 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1247 AclMode::CREATE,
1248 role_id,
1249 )]
1250 }
1251 ObjectId::Cluster(_)
1252 | ObjectId::Database(_)
1253 | ObjectId::Role(_)
1254 | ObjectId::NetworkPolicy(_) => Vec::new(),
1255 };
1256 RbacRequirements {
1257 role_membership: BTreeSet::from([*new_owner]),
1258 ownership: vec![id.clone()],
1259 privileges,
1260 ..Default::default()
1261 }
1262 }
1263 Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1264 ownership: vec![ObjectId::Item(*relation_id)],
1265 item_usage: &CREATE_ITEM_USAGE,
1266 ..Default::default()
1267 },
1268 Plan::AlterMaterializedViewApplyReplacement(
1269 plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1270 ) => RbacRequirements {
1271 ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1272 item_usage: &CREATE_ITEM_USAGE,
1273 ..Default::default()
1274 },
1275 Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1276 ownership: vec![ObjectId::NetworkPolicy(*id)],
1277 item_usage: &CREATE_ITEM_USAGE,
1278 ..Default::default()
1279 },
1280 Plan::ReadThenWrite(plan::ReadThenWritePlan {
1281 id,
1282 selection,
1283 finishing: _,
1284 assignments,
1285 kind,
1286 returning,
1287 }) => {
1288 let acl_mode = match kind {
1289 MutationKind::Insert => AclMode::INSERT,
1290 MutationKind::Update => AclMode::UPDATE,
1291 MutationKind::Delete => AclMode::DELETE,
1292 };
1293 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1294 let mut privileges = vec![
1295 (
1296 SystemObjectId::Object(schema_id.clone()),
1297 AclMode::USAGE,
1298 role_id,
1299 ),
1300 (SystemObjectId::Object(id.into()), acl_mode, role_id),
1301 ];
1302 let mut seen = BTreeSet::from([(schema_id, role_id)]);
1303
1304 if assignments
1307 .values()
1308 .chain(returning.iter())
1309 .any(|assignment| assignment.contains_column())
1310 {
1311 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1312 seen.insert((id.into(), role_id));
1313 }
1314
1315 let items = selection
1322 .depends_on()
1323 .into_iter()
1324 .map(|gid| catalog.resolve_item_id(&gid));
1325 privileges.extend_from_slice(&generate_read_privileges_inner(
1326 catalog, items, role_id, &mut seen,
1327 ));
1328
1329 if let Some(privilege) = generate_cluster_usage_privileges(
1330 selection.as_const().is_some(),
1331 target_cluster_id,
1332 role_id,
1333 ) {
1334 privileges.push(privilege);
1335 }
1336 RbacRequirements {
1337 privileges,
1338 ..Default::default()
1339 }
1340 }
1341 Plan::GrantRole(plan::GrantRolePlan {
1342 role_ids: _,
1343 member_ids: _,
1344 grantor_id: _,
1345 })
1346 | Plan::RevokeRole(plan::RevokeRolePlan {
1347 role_ids: _,
1348 member_ids: _,
1349 grantor_id: _,
1350 }) => RbacRequirements {
1351 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1352 ..Default::default()
1353 },
1354 Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1355 update_privileges,
1356 grantees: _,
1357 })
1358 | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1359 update_privileges,
1360 revokees: _,
1361 }) => {
1362 let mut privileges = Vec::with_capacity(update_privileges.len());
1363 for UpdatePrivilege { target_id, .. } in update_privileges {
1364 match target_id {
1365 SystemObjectId::Object(object_id) => match object_id {
1366 ObjectId::ClusterReplica((cluster_id, _)) => {
1367 privileges.push((
1368 SystemObjectId::Object(cluster_id.into()),
1369 AclMode::USAGE,
1370 role_id,
1371 ));
1372 }
1373 ObjectId::Schema((database_spec, _)) => match database_spec {
1374 ResolvedDatabaseSpecifier::Ambient => {}
1375 ResolvedDatabaseSpecifier::Id(database_id) => {
1376 privileges.push((
1377 SystemObjectId::Object(database_id.into()),
1378 AclMode::USAGE,
1379 role_id,
1380 ));
1381 }
1382 },
1383 ObjectId::Item(item_id) => {
1384 let item = catalog.get_item(item_id);
1385 privileges.push((
1386 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1387 AclMode::USAGE,
1388 role_id,
1389 ))
1390 }
1391 ObjectId::Cluster(_)
1392 | ObjectId::Database(_)
1393 | ObjectId::Role(_)
1394 | ObjectId::NetworkPolicy(_) => {}
1395 },
1396 SystemObjectId::System => {}
1397 }
1398 }
1399 RbacRequirements {
1400 ownership: update_privileges
1401 .iter()
1402 .filter_map(|update_privilege| update_privilege.target_id.object_id())
1403 .cloned()
1404 .collect(),
1405 privileges,
1406 superuser_action: if update_privileges
1411 .iter()
1412 .any(|update_privilege| update_privilege.target_id.is_system())
1413 {
1414 Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1415 } else {
1416 None
1417 },
1418 ..Default::default()
1419 }
1420 }
1421 Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1422 privilege_objects,
1423 privilege_acl_items: _,
1424 is_grant: _,
1425 }) => RbacRequirements {
1426 role_membership: privilege_objects
1427 .iter()
1428 .map(|privilege_object| privilege_object.role_id)
1429 .collect(),
1430 privileges: privilege_objects
1431 .into_iter()
1432 .filter_map(|privilege_object| {
1433 if let (Some(database_id), Some(_)) =
1434 (privilege_object.database_id, privilege_object.schema_id)
1435 {
1436 Some((
1437 SystemObjectId::Object(database_id.into()),
1438 AclMode::USAGE,
1439 role_id,
1440 ))
1441 } else {
1442 None
1443 }
1444 })
1445 .collect(),
1446 superuser_action: if privilege_objects
1452 .iter()
1453 .any(|privilege_object| privilege_object.role_id.is_public())
1454 {
1455 Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1456 } else {
1457 None
1458 },
1459 ..Default::default()
1460 },
1461 Plan::ReassignOwned(plan::ReassignOwnedPlan {
1462 old_roles,
1463 new_role,
1464 reassign_ids: _,
1465 }) => RbacRequirements {
1466 role_membership: old_roles
1467 .into_iter()
1468 .cloned()
1469 .chain(iter::once(*new_role))
1470 .collect(),
1471 ..Default::default()
1472 },
1473 Plan::SideEffectingFunc(func) => {
1474 let role_membership = match func {
1475 SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1476 .expect("active_conns is required for Plan::SideEffectingFunc")(
1477 *connection_id
1478 )
1479 .map(|x| [x].into())
1480 .unwrap_or_default(),
1481 };
1482 RbacRequirements {
1483 role_membership,
1484 ..Default::default()
1485 }
1486 }
1487 Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1488 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1489 RbacRequirements {
1490 privileges: vec![
1491 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1492 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1493 ],
1494 ..Default::default()
1495 }
1496 }
1497 Plan::DiscardTemp
1498 | Plan::DiscardAll
1499 | Plan::EmptyQuery
1500 | Plan::ShowAllVariables
1501 | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1502 | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1503 | Plan::SetVariable(plan::SetVariablePlan {
1504 name: _,
1505 value: _,
1506 local: _,
1507 })
1508 | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1509 | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1510 | Plan::StartTransaction(plan::StartTransactionPlan {
1511 access: _,
1512 isolation_level: _,
1513 })
1514 | Plan::CommitTransaction(plan::CommitTransactionPlan {
1515 transaction_type: _,
1516 })
1517 | Plan::AbortTransaction(plan::AbortTransactionPlan {
1518 transaction_type: _,
1519 })
1520 | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1521 | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1522 | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1523 | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1524 | Plan::Declare(plan::DeclarePlan {
1525 name: _,
1526 stmt: _,
1527 sql: _,
1528 params: _,
1529 })
1530 | Plan::Fetch(plan::FetchPlan {
1531 name: _,
1532 count: _,
1533 timeout: _,
1534 })
1535 | Plan::Close(plan::ClosePlan { name: _ })
1536 | Plan::Prepare(plan::PreparePlan {
1537 name: _,
1538 stmt: _,
1539 desc: _,
1540 sql: _,
1541 })
1542 | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1543 | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1544 | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1545 }
1546}
1547
1548fn check_owner_roles(
1550 object_id: &ObjectId,
1551 role_ids: &BTreeSet<RoleId>,
1552 catalog: &impl SessionCatalog,
1553) -> bool {
1554 if let Some(owner_id) = catalog.get_owner_id(object_id) {
1555 role_ids.contains(&owner_id)
1556 } else {
1557 true
1558 }
1559}
1560
1561fn ownership_err(
1562 unheld_ownership: Vec<ObjectId>,
1563 catalog: &impl SessionCatalog,
1564) -> Result<(), UnauthorizedError> {
1565 if !unheld_ownership.is_empty() {
1566 let objects = unheld_ownership
1567 .into_iter()
1568 .map(|ownership| match ownership {
1569 ObjectId::Cluster(id) => (
1570 ObjectType::Cluster,
1571 catalog.get_cluster(id).name().to_string(),
1572 ),
1573 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1574 let cluster = catalog.get_cluster(cluster_id);
1575 let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1576 let name = QualifiedReplica {
1579 cluster: Ident::new_unchecked(cluster.name()),
1580 replica: Ident::new_unchecked(replica.name()),
1581 };
1582 (ObjectType::ClusterReplica, name.to_string())
1583 }
1584 ObjectId::Database(id) => (
1585 ObjectType::Database,
1586 catalog.get_database(&id).name().to_string(),
1587 ),
1588 ObjectId::Schema((database_spec, schema_spec)) => {
1589 let schema = catalog.get_schema(&database_spec, &schema_spec);
1590 let name = catalog.resolve_full_schema_name(schema.name());
1591 (ObjectType::Schema, name.to_string())
1592 }
1593 ObjectId::Item(id) => {
1594 let item = catalog.get_item(&id);
1595 let name = catalog.resolve_full_name(item.name());
1596 (item.item_type().into(), name.to_string())
1597 }
1598 ObjectId::NetworkPolicy(id) => (
1599 ObjectType::NetworkPolicy,
1600 catalog.get_network_policy(&id).name().to_string(),
1601 ),
1602 ObjectId::Role(_) => unreachable!("roles have no owner"),
1603 })
1604 .collect();
1605 Err(UnauthorizedError::Ownership { objects })
1606 } else {
1607 Ok(())
1608 }
1609}
1610
1611fn generate_required_source_privileges(
1612 name: &QualifiedItemName,
1613 data_source: &DataSourceDesc,
1614 in_cluster: Option<ClusterId>,
1615 role_id: RoleId,
1616) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1617 let mut privileges = vec![(
1618 SystemObjectId::Object(name.qualifiers.clone().into()),
1619 AclMode::CREATE,
1620 role_id,
1621 )];
1622 match (data_source, in_cluster) {
1623 (_, Some(id)) => {
1624 privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1625 }
1626 (DataSourceDesc::Ingestion(_), None) => {
1627 privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1628 }
1629 (_, None) => {}
1634 }
1635 privileges
1636}
1637
1638fn generate_read_privileges(
1646 catalog: &impl SessionCatalog,
1647 ids: impl Iterator<Item = CatalogItemId>,
1648 role_id: RoleId,
1649) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1650 generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1651}
1652
1653fn generate_read_privileges_inner(
1654 catalog: &impl SessionCatalog,
1655 ids: impl Iterator<Item = CatalogItemId>,
1656 role_id: RoleId,
1657 seen: &mut BTreeSet<(ObjectId, RoleId)>,
1658) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1659 let mut privileges = Vec::new();
1660 let mut views = Vec::new();
1661
1662 for id in ids {
1663 if seen.insert((id.into(), role_id)) {
1664 let item = catalog.get_item(&id);
1665 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1666 if seen.insert((schema_id.clone(), role_id)) {
1667 privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1668 }
1669 match item.item_type() {
1670 CatalogItemType::View
1671 | CatalogItemType::MaterializedView
1672 | CatalogItemType::ContinualTask => {
1673 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1674 views.push((item.references().items().copied(), item.owner_id()));
1675 }
1676 CatalogItemType::Table | CatalogItemType::Source => {
1677 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1678 }
1679 CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1680 privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1681 }
1682 CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1683 }
1684 }
1685 }
1686
1687 for (view_ids, view_owner) in views {
1688 privileges.extend_from_slice(&generate_read_privileges_inner(
1689 catalog, view_ids, view_owner, seen,
1690 ));
1691 }
1692
1693 privileges
1694}
1695
1696fn generate_usage_privileges(
1697 catalog: &impl SessionCatalog,
1698 ids: &ResolvedIds,
1699 role_id: RoleId,
1700 item_types: &BTreeSet<CatalogItemType>,
1701) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1702 ids.items()
1704 .filter_map(move |id| {
1705 let item = catalog.get_item(id);
1706 if item_types.contains(&item.item_type()) {
1707 let schema_id = item.name().qualifiers.clone().into();
1708 Some([
1709 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1710 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1711 ])
1712 } else {
1713 None
1714 }
1715 })
1716 .flatten()
1717 .collect()
1718}
1719
1720fn generate_cluster_usage_privileges(
1721 expr_is_const: bool,
1722 target_cluster_id: Option<ClusterId>,
1723 role_id: RoleId,
1724) -> Option<(SystemObjectId, AclMode, RoleId)> {
1725 if !expr_is_const {
1728 if let Some(cluster_id) = target_cluster_id {
1729 return Some((
1730 SystemObjectId::Object(cluster_id.into()),
1731 AclMode::USAGE,
1732 role_id,
1733 ));
1734 }
1735 }
1736
1737 None
1738}
1739
1740fn check_object_privileges(
1741 catalog: &impl SessionCatalog,
1742 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1743 role_membership: BTreeSet<RoleId>,
1744 current_role_id: RoleId,
1745) -> Result<(), UnauthorizedError> {
1746 let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1747 role_memberships.insert(current_role_id, role_membership);
1748 for (object_id, acl_mode, role_id) in privileges {
1749 let role_membership = role_memberships
1750 .entry(role_id)
1751 .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1752 let object_privileges = catalog
1753 .get_privileges(&object_id)
1754 .expect("only object types with privileges will generate required privileges");
1755 let role_privileges = role_membership
1756 .iter()
1757 .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1758 .map(|mz_acl_item| mz_acl_item.acl_mode)
1759 .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1760 if !role_privileges.contains(acl_mode) {
1761 let role_name = catalog.get_role(&role_id).name().to_string();
1762 let privileges = acl_mode.to_error_string();
1763 return Err(UnauthorizedError::Privilege {
1764 object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1765 role_name,
1766 privileges,
1767 });
1768 }
1769 }
1770
1771 Ok(())
1772}
1773
1774pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1775 const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1776 .union(AclMode::SELECT)
1777 .union(AclMode::UPDATE)
1778 .union(AclMode::DELETE);
1779 const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1780 const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1781 .union(AclMode::CREATE_DB)
1782 .union(AclMode::CREATE_CLUSTER)
1783 .union(AclMode::CREATE_NETWORK_POLICY);
1784
1785 const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1786 match object_type {
1787 SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1788 SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1789 SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1790 SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1791 SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1792 SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1793 SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1794 SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1795 SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1796 SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1797 SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1798 SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1799 SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1800 SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1801 SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1802 SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1803 SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT,
1804 SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1805 }
1806}
1807
1808pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1809 MzAclItem {
1810 grantee: owner_id,
1811 grantor: owner_id,
1812 acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1813 }
1814}
1815
1816const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1817 match object_type {
1818 ObjectType::Table
1819 | ObjectType::View
1820 | ObjectType::MaterializedView
1821 | ObjectType::Source
1822 | ObjectType::ContinualTask => AclMode::SELECT,
1823 ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1824 ObjectType::Sink
1825 | ObjectType::Index
1826 | ObjectType::Role
1827 | ObjectType::Cluster
1828 | ObjectType::ClusterReplica
1829 | ObjectType::Secret
1830 | ObjectType::Connection
1831 | ObjectType::Database
1832 | ObjectType::Func
1833 | ObjectType::NetworkPolicy => AclMode::empty(),
1834 }
1835}
1836
1837pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1838 let acl_mode = default_builtin_object_acl_mode(object_type);
1839 MzAclItem {
1840 grantee: MZ_SUPPORT_ROLE_ID,
1841 grantor: MZ_SYSTEM_ROLE_ID,
1842 acl_mode,
1843 }
1844}
1845
1846pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1847 let acl_mode = default_builtin_object_acl_mode(object_type);
1848 MzAclItem {
1849 grantee: RoleId::Public,
1850 grantor: MZ_SYSTEM_ROLE_ID,
1851 acl_mode,
1852 }
1853}