1use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26 CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29 CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30 SchemaSpecifier, SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34 DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40fn rbac_check_preamble(
42 catalog: &impl SessionCatalog,
43 session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45 if catalog
50 .try_get_role(&session_meta.role_metadata().current_role)
51 .is_none()
52 {
53 return Err(UnauthorizedError::ConcurrentRoleDrop(
54 session_meta.role_metadata().current_role.clone(),
55 ));
56 };
57 if catalog
58 .try_get_role(&session_meta.role_metadata().session_role)
59 .is_none()
60 {
61 return Err(UnauthorizedError::ConcurrentRoleDrop(
62 session_meta.role_metadata().session_role.clone(),
63 ));
64 };
65 if catalog
66 .try_get_role(&session_meta.role_metadata().authenticated_role)
67 .is_none()
68 {
69 return Err(UnauthorizedError::ConcurrentRoleDrop(
70 session_meta.role_metadata().authenticated_role.clone(),
71 ));
72 };
73
74 Ok(())
75}
76
77fn filter_requirements(
79 catalog: &impl SessionCatalog,
80 session_meta: &dyn SessionMetadata,
81 rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83 let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86 && !session_meta.role_metadata().current_role.is_system()
87 && !session_meta.role_metadata().session_role.is_system();
88 let is_superuser = session_meta.is_superuser();
90 if is_rbac_disabled || is_superuser {
91 return rbac_requirements.filter_to_mandatory_requirements();
92 }
93
94 rbac_requirements
95}
96
97static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99 btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104 let mut items = DEFAULT_ITEM_USAGE.clone();
105 items.insert(CatalogItemType::Type);
106 items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113 #[error("permission denied to {action}")]
115 Superuser { action: String },
116 #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118 Ownership { objects: Vec<(ObjectType, String)> },
119 #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121 RoleMembership { role_names: Vec<String> },
122 #[error("permission denied for {object_description}")]
124 Privilege {
125 object_description: ErrorMessageObjectDescription,
126 role_name: String,
127 privileges: String,
128 },
129 #[error("permission denied to {action}")]
133 MzSystem { action: String },
134 #[error("permission denied to {action}")]
136 MzSupport { action: String },
137 #[error("role {0} was concurrently dropped")]
139 ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143 pub fn detail(&self) -> Option<String> {
144 match &self {
145 UnauthorizedError::Superuser { action } => {
146 Some(format!("You must be a superuser to {}", action))
147 }
148 UnauthorizedError::Privilege {
149 object_description,
150 role_name,
151 privileges,
152 } => Some(format!(
153 "The '{role_name}' role needs {privileges} privileges on {object_description}"
154 )),
155 UnauthorizedError::MzSystem { .. } => {
156 Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157 }
158 UnauthorizedError::MzSupport { .. } => Some(format!(
159 "The '{}' role has very limited privileges",
160 SUPPORT_USER.name
161 )),
162 UnauthorizedError::ConcurrentRoleDrop(_) => {
163 Some("Please disconnect and re-connect with a valid role.".into())
164 }
165 UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166 }
167 }
168}
169
170#[derive(Debug)]
172struct RbacRequirements {
173 role_membership: BTreeSet<RoleId>,
175 ownership: Vec<ObjectId>,
177 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180 item_usage: &'static BTreeSet<CatalogItemType>,
185 superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190 fn empty() -> RbacRequirements {
191 RbacRequirements {
192 role_membership: BTreeSet::new(),
193 ownership: Vec::new(),
194 privileges: Vec::new(),
195 item_usage: &EMPTY_ITEM_USAGE,
196 superuser_action: None,
197 }
198 }
199
200 fn validate(
201 self,
202 catalog: &impl SessionCatalog,
203 session: &dyn SessionMetadata,
204 resolved_ids: &ResolvedIds,
205 ) -> Result<(), UnauthorizedError> {
206 let role_membership =
208 catalog.collect_role_membership(&session.role_metadata().current_role);
209
210 check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212 let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215 if !unheld_membership.is_empty() {
216 let role_names = unheld_membership
217 .into_iter()
218 .map(|role_id| {
219 catalog
221 .try_get_role(role_id)
222 .map(|role| role.name().to_string())
223 .unwrap_or_else(|| role_id.to_string())
224 })
225 .collect();
226 return Err(UnauthorizedError::RoleMembership { role_names });
227 }
228
229 let unheld_ownership = self
232 .ownership
233 .into_iter()
234 .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235 .collect();
236 ownership_err(unheld_ownership, catalog)?;
237
238 check_object_privileges(
239 catalog,
240 self.privileges,
241 role_membership,
242 session.role_metadata().current_role,
243 )?;
244
245 if let Some(action) = self.superuser_action {
246 return Err(UnauthorizedError::Superuser { action });
247 }
248
249 Ok(())
250 }
251
252 fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253 let RbacRequirements {
254 role_membership,
255 ownership,
256 privileges,
257 item_usage,
258 superuser_action: _,
259 } = self;
260 let role_membership = role_membership
261 .into_iter()
262 .filter(|id| id.is_system())
263 .collect();
264 let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265 let privileges = privileges
266 .into_iter()
267 .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268 .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270 .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271 .collect();
272 let superuser_action = None;
273 RbacRequirements {
274 role_membership,
275 ownership,
276 privileges,
277 item_usage,
278 superuser_action,
279 }
280 }
281}
282
283impl Default for RbacRequirements {
284 fn default() -> Self {
285 RbacRequirements {
286 role_membership: BTreeSet::new(),
287 ownership: Vec::new(),
288 privileges: Vec::new(),
289 item_usage: &DEFAULT_ITEM_USAGE,
290 superuser_action: None,
291 }
292 }
293}
294
295pub fn check_usage(
297 catalog: &impl SessionCatalog,
298 session: &dyn SessionMetadata,
299 resolved_ids: &ResolvedIds,
300 item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302 rbac_check_preamble(catalog, session)?;
303
304 let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307 let existing_resolved_ids =
310 resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312 let required_privileges = generate_usage_privileges(
313 catalog,
314 &existing_resolved_ids,
315 session.role_metadata().current_role,
316 item_types,
317 )
318 .into_iter()
319 .collect();
320
321 let mut rbac_requirements = RbacRequirements::empty();
322 rbac_requirements.privileges = required_privileges;
323 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324 let required_privileges = rbac_requirements.privileges;
325
326 check_object_privileges(
327 catalog,
328 required_privileges,
329 role_membership,
330 session.role_metadata().current_role,
331 )?;
332
333 Ok(())
334}
335
336pub fn check_plan(
338 catalog: &impl SessionCatalog,
339 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
345 session: &dyn SessionMetadata,
346 plan: &Plan,
347 target_cluster_id: Option<ClusterId>,
348 resolved_ids: &ResolvedIds,
349) -> Result<(), UnauthorizedError> {
350 rbac_check_preamble(catalog, session)?;
351
352 let rbac_requirements = generate_rbac_requirements(
353 catalog,
354 plan,
355 active_conns,
356 target_cluster_id,
357 session.role_metadata().current_role,
358 );
359 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
360 debug!(
361 "rbac requirements {rbac_requirements:?} for plan {:?}",
362 PlanKind::from(plan)
363 );
364 rbac_requirements.validate(catalog, session, resolved_ids)
365}
366
367pub fn is_rbac_enabled_for_session(
369 system_vars: &SystemVars,
370 session: &dyn SessionMetadata,
371) -> bool {
372 let server_enabled = system_vars.enable_rbac_checks();
373 let session_enabled = session.enable_session_rbac_checks();
374
375 server_enabled || session_enabled
378}
379
380fn generate_rbac_requirements(
382 catalog: &impl SessionCatalog,
383 plan: &Plan,
384 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
385 target_cluster_id: Option<ClusterId>,
386 role_id: RoleId,
387) -> RbacRequirements {
388 match plan {
389 Plan::CreateConnection(plan::CreateConnectionPlan {
390 name,
391 if_not_exists: _,
392 connection: _,
393 validate: _,
394 }) => RbacRequirements {
395 privileges: vec![(
396 SystemObjectId::Object(name.qualifiers.clone().into()),
397 AclMode::CREATE,
398 role_id,
399 )],
400 item_usage: &CREATE_ITEM_USAGE,
401 ..Default::default()
402 },
403 Plan::CreateDatabase(plan::CreateDatabasePlan {
404 name: _,
405 if_not_exists: _,
406 }) => RbacRequirements {
407 privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
408 item_usage: &CREATE_ITEM_USAGE,
409 ..Default::default()
410 },
411 Plan::CreateSchema(plan::CreateSchemaPlan {
412 database_spec,
413 schema_name: _,
414 if_not_exists: _,
415 }) => {
416 let privileges = match database_spec {
417 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
418 ResolvedDatabaseSpecifier::Id(database_id) => {
419 vec![(
420 SystemObjectId::Object(database_id.into()),
421 AclMode::CREATE,
422 role_id,
423 )]
424 }
425 };
426 RbacRequirements {
427 privileges,
428 item_usage: &CREATE_ITEM_USAGE,
429 ..Default::default()
430 }
431 }
432 Plan::CreateRole(plan::CreateRolePlan {
433 name: _,
434 attributes,
435 }) => {
436 if attributes.superuser.unwrap_or(false) {
437 RbacRequirements {
438 superuser_action: Some("create superuser role".to_string()),
439 ..Default::default()
440 }
441 } else {
442 RbacRequirements {
443 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
444 item_usage: &CREATE_ITEM_USAGE,
445 ..Default::default()
446 }
447 }
448 }
449 Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
450 privileges: vec![(
451 SystemObjectId::System,
452 AclMode::CREATE_NETWORK_POLICY,
453 role_id,
454 )],
455 item_usage: &CREATE_ITEM_USAGE,
456 ..Default::default()
457 },
458 Plan::CreateCluster(plan::CreateClusterPlan {
459 name: _,
460 variant: _,
461 workload_class: _,
462 }) => RbacRequirements {
463 privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
464 item_usage: &CREATE_ITEM_USAGE,
465 ..Default::default()
466 },
467 Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
468 cluster_id,
469 name: _,
470 config: _,
471 }) => RbacRequirements {
472 ownership: vec![ObjectId::Cluster(*cluster_id)],
473 item_usage: &CREATE_ITEM_USAGE,
474 ..Default::default()
475 },
476 Plan::CreateSource(plan::CreateSourcePlan {
477 name,
478 source,
479 if_not_exists: _,
480 timeline: _,
481 in_cluster,
482 }) => RbacRequirements {
483 privileges: generate_required_source_privileges(
484 name,
485 &source.data_source,
486 *in_cluster,
487 role_id,
488 ),
489 item_usage: &CREATE_ITEM_USAGE,
490 ..Default::default()
491 },
492 Plan::CreateSources(plans) => RbacRequirements {
493 privileges: plans
494 .iter()
495 .flat_map(
496 |plan::CreateSourcePlanBundle {
497 item_id: _,
498 global_id: _,
499 plan:
500 plan::CreateSourcePlan {
501 name,
502 source,
503 if_not_exists: _,
504 timeline: _,
505 in_cluster,
506 },
507 resolved_ids: _,
508 available_source_references: _,
509 }| {
510 generate_required_source_privileges(
511 name,
512 &source.data_source,
513 *in_cluster,
514 role_id,
515 )
516 .into_iter()
517 },
518 )
519 .collect(),
520 item_usage: &CREATE_ITEM_USAGE,
521 ..Default::default()
522 },
523 Plan::CreateSecret(plan::CreateSecretPlan {
524 name,
525 secret: _,
526 if_not_exists: _,
527 }) => RbacRequirements {
528 privileges: vec![(
529 SystemObjectId::Object(name.qualifiers.clone().into()),
530 AclMode::CREATE,
531 role_id,
532 )],
533 item_usage: &CREATE_ITEM_USAGE,
534 ..Default::default()
535 },
536 Plan::CreateSink(plan::CreateSinkPlan {
537 name,
538 sink,
539 with_snapshot: _,
540 if_not_exists: _,
541 in_cluster,
542 }) => {
543 let mut privileges = vec![(
544 SystemObjectId::Object(name.qualifiers.clone().into()),
545 AclMode::CREATE,
546 role_id,
547 )];
548 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
549 privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
550 privileges.push((
551 SystemObjectId::Object(in_cluster.into()),
552 AclMode::CREATE,
553 role_id,
554 ));
555 RbacRequirements {
556 privileges,
557 item_usage: &CREATE_ITEM_USAGE,
558 ..Default::default()
559 }
560 }
561 Plan::CreateTable(plan::CreateTablePlan {
562 name,
563 table: _,
564 if_not_exists: _,
565 }) => RbacRequirements {
566 privileges: vec![(
567 SystemObjectId::Object(name.qualifiers.clone().into()),
568 AclMode::CREATE,
569 role_id,
570 )],
571 item_usage: &CREATE_ITEM_USAGE,
572 ..Default::default()
573 },
574 Plan::CreateView(plan::CreateViewPlan {
575 name,
576 view: _,
577 replace,
578 drop_ids: _,
579 if_not_exists: _,
580 ambiguous_columns: _,
581 }) => RbacRequirements {
582 ownership: replace
583 .map(|id| vec![ObjectId::Item(id)])
584 .unwrap_or_default(),
585 privileges: vec![(
586 SystemObjectId::Object(name.qualifiers.clone().into()),
587 AclMode::CREATE,
588 role_id,
589 )],
590 item_usage: &CREATE_ITEM_USAGE,
591 ..Default::default()
592 },
593 Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
594 name,
595 materialized_view,
596 replace,
597 drop_ids: _,
598 if_not_exists: _,
599 ambiguous_columns: _,
600 }) => RbacRequirements {
601 ownership: replace
602 .map(|id| vec![ObjectId::Item(id)])
603 .unwrap_or_default(),
604 privileges: vec![
605 (
606 SystemObjectId::Object(name.qualifiers.clone().into()),
607 AclMode::CREATE,
608 role_id,
609 ),
610 (
611 SystemObjectId::Object(materialized_view.cluster_id.into()),
612 AclMode::CREATE,
613 role_id,
614 ),
615 ],
616 item_usage: &CREATE_ITEM_USAGE,
617 ..Default::default()
618 },
619 Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
620 name,
621 placeholder_id: _,
622 desc: _,
623 input_id: _,
624 with_snapshot: _,
625 continual_task,
626 }) => RbacRequirements {
627 privileges: vec![
628 (
629 SystemObjectId::Object(name.qualifiers.clone().into()),
630 AclMode::CREATE,
631 role_id,
632 ),
633 (
634 SystemObjectId::Object(continual_task.cluster_id.into()),
635 AclMode::CREATE,
636 role_id,
637 ),
638 ],
639 item_usage: &CREATE_ITEM_USAGE,
640 ..Default::default()
641 },
642 Plan::CreateIndex(plan::CreateIndexPlan {
643 name,
644 index,
645 if_not_exists: _,
646 }) => {
647 let index_on_item = catalog.resolve_item_id(&index.on);
648 RbacRequirements {
649 ownership: vec![ObjectId::Item(index_on_item)],
650 privileges: vec![
651 (
652 SystemObjectId::Object(name.qualifiers.clone().into()),
653 AclMode::CREATE,
654 role_id,
655 ),
656 (
657 SystemObjectId::Object(index.cluster_id.into()),
658 AclMode::CREATE,
659 role_id,
660 ),
661 ],
662 item_usage: &CREATE_ITEM_USAGE,
663 ..Default::default()
664 }
665 }
666 Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
667 privileges: vec![(
668 SystemObjectId::Object(name.qualifiers.clone().into()),
669 AclMode::CREATE,
670 role_id,
671 )],
672 item_usage: &CREATE_ITEM_USAGE,
673 ..Default::default()
674 },
675 Plan::Comment(plan::CommentPlan {
676 object_id,
677 sub_component: _,
678 comment: _,
679 }) => {
680 let (ownership, privileges) = match object_id {
681 CommentObjectId::Role(_) => (
684 Vec::new(),
685 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
686 ),
687 _ => (vec![ObjectId::from(*object_id)], Vec::new()),
688 };
689 RbacRequirements {
690 ownership,
691 privileges,
692 ..Default::default()
693 }
694 }
695 Plan::DropObjects(plan::DropObjectsPlan {
696 referenced_ids,
697 drop_ids: _,
698 object_type,
699 }) => {
700 let privileges = if object_type == &ObjectType::Role {
701 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
702 } else {
703 referenced_ids
704 .iter()
705 .filter_map(|id| match id {
706 ObjectId::ClusterReplica((cluster_id, _)) => Some((
707 SystemObjectId::Object(cluster_id.into()),
708 AclMode::USAGE,
709 role_id,
710 )),
711 ObjectId::Schema((database_spec, _)) => match database_spec {
712 ResolvedDatabaseSpecifier::Ambient => None,
713 ResolvedDatabaseSpecifier::Id(database_id) => Some((
714 SystemObjectId::Object(database_id.into()),
715 AclMode::USAGE,
716 role_id,
717 )),
718 },
719 ObjectId::Item(item_id) => {
720 let item = catalog.get_item(item_id);
721 Some((
722 SystemObjectId::Object(item.name().qualifiers.clone().into()),
723 AclMode::USAGE,
724 role_id,
725 ))
726 }
727 ObjectId::Cluster(_)
728 | ObjectId::Database(_)
729 | ObjectId::Role(_)
730 | ObjectId::NetworkPolicy(_) => None,
731 })
732 .collect()
733 };
734 RbacRequirements {
735 ownership: referenced_ids.clone(),
737 privileges,
738 ..Default::default()
739 }
740 }
741 Plan::DropOwned(plan::DropOwnedPlan {
742 role_ids,
743 drop_ids: _,
744 privilege_revokes: _,
745 default_privilege_revokes: _,
746 }) => RbacRequirements {
747 role_membership: role_ids.into_iter().cloned().collect(),
748 ..Default::default()
749 },
750 Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
751 let container_id = match id {
752 ObjectId::Item(id) => Some(SystemObjectId::Object(
753 catalog.get_item(id).name().qualifiers.clone().into(),
754 )),
755 ObjectId::Schema((database_id, _schema_id)) => match database_id {
756 ResolvedDatabaseSpecifier::Ambient => None,
757 ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
758 },
759 ObjectId::Cluster(_)
760 | ObjectId::ClusterReplica(_)
761 | ObjectId::Database(_)
762 | ObjectId::Role(_)
763 | ObjectId::NetworkPolicy(_) => None,
764 };
765 let privileges = match container_id {
766 Some(id) => vec![(id, AclMode::USAGE, role_id)],
767 None => Vec::new(),
768 };
769 RbacRequirements {
770 privileges,
771 item_usage: &EMPTY_ITEM_USAGE,
772 ..Default::default()
773 }
774 }
775 Plan::ShowColumns(plan::ShowColumnsPlan {
776 id,
777 select_plan,
778 new_resolved_ids: _,
779 }) => {
780 let mut privileges = vec![(
781 SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
782 AclMode::USAGE,
783 role_id,
784 )];
785
786 for privilege in generate_rbac_requirements(
787 catalog,
788 &Plan::Select(select_plan.clone()),
789 active_conns,
790 target_cluster_id,
791 role_id,
792 )
793 .privileges
794 {
795 privileges.push(privilege);
796 }
797 RbacRequirements {
798 privileges,
799 ..Default::default()
800 }
801 }
802 Plan::Select(plan::SelectPlan {
803 source,
804 select: _,
805 when: _,
806 finishing: _,
807 copy_to: _,
808 }) => {
809 let items = source
810 .depends_on()
811 .into_iter()
812 .map(|gid| catalog.resolve_item_id(&gid));
813 let mut privileges = generate_read_privileges(catalog, items, role_id);
814 if let Some(privilege) = generate_cluster_usage_privileges(
815 source.as_const().is_some(),
816 target_cluster_id,
817 role_id,
818 ) {
819 privileges.push(privilege);
820 }
821 RbacRequirements {
822 privileges,
823 ..Default::default()
824 }
825 }
826 Plan::Subscribe(plan::SubscribePlan {
827 from,
828 with_snapshot: _,
829 when: _,
830 up_to: _,
831 copy_to: _,
832 emit_progress: _,
833 output: _,
834 }) => {
835 let items = from
836 .depends_on()
837 .into_iter()
838 .map(|gid| catalog.resolve_item_id(&gid));
839 let mut privileges = generate_read_privileges(catalog, items, role_id);
840 if let Some(cluster_id) = target_cluster_id {
841 privileges.push((
842 SystemObjectId::Object(cluster_id.into()),
843 AclMode::USAGE,
844 role_id,
845 ));
846 }
847 RbacRequirements {
848 privileges,
849 ..Default::default()
850 }
851 }
852 Plan::CopyFrom(plan::CopyFromPlan {
853 target_name: _,
854 target_id,
855 source: _,
856 columns: _,
857 source_desc: _,
858 mfp: _,
859 params: _,
860 filter: _,
861 }) => RbacRequirements {
862 privileges: vec![
863 (
864 SystemObjectId::Object(
865 catalog.get_item(target_id).name().qualifiers.clone().into(),
866 ),
867 AclMode::USAGE,
868 role_id,
869 ),
870 (
871 SystemObjectId::Object(target_id.into()),
872 AclMode::INSERT,
873 role_id,
874 ),
875 ],
876 ..Default::default()
877 },
878 Plan::CopyTo(plan::CopyToPlan {
879 select_plan,
880 desc: _,
881 to: _,
882 connection: _,
883 connection_id: _,
884 format: _,
885 max_file_size: _,
886 }) => {
887 let items = select_plan
888 .source
889 .depends_on()
890 .into_iter()
891 .map(|gid| catalog.resolve_item_id(&gid));
892 let mut privileges = generate_read_privileges(catalog, items, role_id);
893 if let Some(cluster_id) = target_cluster_id {
894 privileges.push((
895 SystemObjectId::Object(cluster_id.into()),
896 AclMode::USAGE,
897 role_id,
898 ));
899 }
900 RbacRequirements {
901 privileges,
902 ..Default::default()
903 }
904 }
905 Plan::ExplainPlan(plan::ExplainPlanPlan {
906 stage: _,
907 format: _,
908 config: _,
909 explainee,
910 })
911 | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
912 privileges: match explainee {
913 Explainee::View(id)
914 | Explainee::MaterializedView(id)
915 | Explainee::Index(id)
916 | Explainee::ReplanView(id)
917 | Explainee::ReplanMaterializedView(id)
918 | Explainee::ReplanIndex(id) => {
919 let item = catalog.get_item(id);
920 let schema_id: ObjectId = item.name().qualifiers.clone().into();
921 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
922 }
923 Explainee::Statement(stmt) => stmt
924 .depends_on()
925 .into_iter()
926 .map(|id| {
927 let item = catalog.get_item_by_global_id(&id);
928 let schema_id: ObjectId = item.name().qualifiers.clone().into();
929 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
930 })
931 .collect(),
932 },
933 item_usage: match explainee {
934 Explainee::View(..)
935 | Explainee::MaterializedView(..)
936 | Explainee::Index(..)
937 | Explainee::ReplanView(..)
938 | Explainee::ReplanMaterializedView(..)
939 | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
940 Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
941 },
942 ..Default::default()
943 },
944 Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
945 RbacRequirements {
946 privileges: {
947 let item = catalog.get_item_by_global_id(sink_from);
948 let schema_id: ObjectId = item.name().qualifiers.clone().into();
949 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
950 },
951 item_usage: &EMPTY_ITEM_USAGE,
952 ..Default::default()
953 }
954 }
955 Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
956 format: _,
957 raw_plan,
958 when: _,
959 }) => RbacRequirements {
960 privileges: raw_plan
961 .depends_on()
962 .into_iter()
963 .map(|id| {
964 let item = catalog.get_item_by_global_id(&id);
965 let schema_id: ObjectId = item.name().qualifiers.clone().into();
966 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
967 })
968 .collect(),
969 ..Default::default()
970 },
971 Plan::Insert(plan::InsertPlan {
972 id,
973 values,
974 returning,
975 }) => {
976 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
977 let mut privileges = vec![
978 (
979 SystemObjectId::Object(schema_id.clone()),
980 AclMode::USAGE,
981 role_id,
982 ),
983 (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
984 ];
985 let mut seen = BTreeSet::from([(schema_id, role_id)]);
986
987 if returning
990 .iter()
991 .any(|assignment| assignment.contains_column())
992 {
993 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
994 seen.insert((id.into(), role_id));
995 }
996
997 let items = values
998 .depends_on()
999 .into_iter()
1000 .map(|gid| catalog.resolve_item_id(&gid));
1001 privileges.extend_from_slice(&generate_read_privileges_inner(
1002 catalog, items, role_id, &mut seen,
1003 ));
1004
1005 if let Some(privilege) = generate_cluster_usage_privileges(
1006 values.as_const().is_some(),
1007 target_cluster_id,
1008 role_id,
1009 ) {
1010 privileges.push(privilege);
1011 } else if !returning.is_empty() {
1012 if let Some(cluster_id) = target_cluster_id {
1015 privileges.push((
1016 SystemObjectId::Object(cluster_id.into()),
1017 AclMode::USAGE,
1018 role_id,
1019 ));
1020 }
1021 }
1022 RbacRequirements {
1023 privileges,
1024 ..Default::default()
1025 }
1026 }
1027 Plan::AlterCluster(plan::AlterClusterPlan {
1028 id,
1029 name: _,
1030 options: _,
1031 strategy: _,
1032 }) => RbacRequirements {
1033 ownership: vec![ObjectId::Cluster(*id)],
1034 item_usage: &CREATE_ITEM_USAGE,
1035 ..Default::default()
1036 },
1037 Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1038 ownership: vec![ObjectId::Item(*id)],
1039 privileges: vec![(
1040 SystemObjectId::Object(set_cluster.into()),
1041 AclMode::CREATE,
1042 role_id,
1043 )],
1044 item_usage: &CREATE_ITEM_USAGE,
1045 ..Default::default()
1046 },
1047 Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1048 id,
1049 window: _,
1050 value: _,
1051 object_type: _,
1052 }) => RbacRequirements {
1053 ownership: vec![ObjectId::Item(*id)],
1054 item_usage: &CREATE_ITEM_USAGE,
1055 ..Default::default()
1056 },
1057 Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1058 ownership: vec![ObjectId::Item(*id)],
1059 ..Default::default()
1060 },
1061 Plan::AlterSource(plan::AlterSourcePlan {
1062 item_id,
1063 ingestion_id: _,
1064 action: _,
1065 }) => RbacRequirements {
1066 ownership: vec![ObjectId::Item(*item_id)],
1067 item_usage: &CREATE_ITEM_USAGE,
1068 ..Default::default()
1069 },
1070 Plan::AlterSink(plan::AlterSinkPlan {
1071 item_id,
1072 global_id: _,
1073 sink,
1074 with_snapshot: _,
1075 in_cluster,
1076 }) => {
1077 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1078 let mut privileges = generate_read_privileges(catalog, items, role_id);
1079 privileges.push((
1080 SystemObjectId::Object(in_cluster.into()),
1081 AclMode::CREATE,
1082 role_id,
1083 ));
1084 RbacRequirements {
1085 ownership: vec![ObjectId::Item(*item_id)],
1086 privileges,
1087 item_usage: &CREATE_ITEM_USAGE,
1088 ..Default::default()
1089 }
1090 }
1091 Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1092 id,
1093 name: _,
1094 to_name: _,
1095 }) => RbacRequirements {
1096 ownership: vec![ObjectId::Cluster(*id)],
1097 ..Default::default()
1098 },
1099 Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1100 id_a,
1101 id_b,
1102 name_a: _,
1103 name_b: _,
1104 name_temp: _,
1105 }) => RbacRequirements {
1106 ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1107 ..Default::default()
1108 },
1109 Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1110 cluster_id,
1111 replica_id,
1112 name: _,
1113 to_name: _,
1114 }) => RbacRequirements {
1115 ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1116 ..Default::default()
1117 },
1118 Plan::AlterItemRename(plan::AlterItemRenamePlan {
1119 id,
1120 current_full_name: _,
1121 to_name: _,
1122 object_type: _,
1123 }) => RbacRequirements {
1124 ownership: vec![ObjectId::Item(*id)],
1125 ..Default::default()
1126 },
1127 Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1128 cur_schema_spec,
1129 new_schema_name: _,
1130 }) => {
1131 let privileges = match cur_schema_spec.0 {
1132 ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1133 SystemObjectId::Object(ObjectId::Database(db_id)),
1134 AclMode::CREATE,
1135 role_id,
1136 )],
1137 ResolvedDatabaseSpecifier::Ambient => vec![],
1138 };
1139
1140 RbacRequirements {
1141 ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1142 privileges,
1143 ..Default::default()
1144 }
1145 }
1146 Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1147 schema_a_spec,
1148 schema_a_name: _,
1149 schema_b_spec,
1150 schema_b_name: _,
1151 name_temp: _,
1152 }) => {
1153 let mut privileges = vec![];
1154 if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1155 privileges.push((
1156 SystemObjectId::Object(ObjectId::Database(id_a)),
1157 AclMode::CREATE,
1158 role_id,
1159 ));
1160 }
1161 if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1162 privileges.push((
1163 SystemObjectId::Object(ObjectId::Database(id_b)),
1164 AclMode::CREATE,
1165 role_id,
1166 ));
1167 }
1168
1169 RbacRequirements {
1170 ownership: vec![
1171 ObjectId::Schema(*schema_a_spec),
1172 ObjectId::Schema(*schema_b_spec),
1173 ],
1174 privileges,
1175 ..Default::default()
1176 }
1177 }
1178 Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1179 ownership: vec![ObjectId::Item(*id)],
1180 item_usage: &CREATE_ITEM_USAGE,
1181 ..Default::default()
1182 },
1183 Plan::AlterRole(plan::AlterRolePlan {
1184 id,
1185 name: _,
1186 option,
1187 }) => match option {
1188 plan::PlannedAlterRoleOption::Attributes(attributes)
1190 if attributes.superuser.unwrap_or(false) =>
1191 {
1192 RbacRequirements {
1193 superuser_action: Some("alter superuser role".to_string()),
1194 ..Default::default()
1195 }
1196 }
1197 plan::PlannedAlterRoleOption::Attributes(attributes)
1199 if attributes.password.is_some() && role_id == *id =>
1200 {
1201 RbacRequirements::default()
1202 }
1203 plan::PlannedAlterRoleOption::Attributes(attributes)
1205 if attributes.password.is_some() =>
1206 {
1207 RbacRequirements {
1208 superuser_action: Some("alter password of role".to_string()),
1209 ..Default::default()
1210 }
1211 }
1212 plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1214 RbacRequirements::default()
1215 }
1216 _ => RbacRequirements {
1218 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1219 item_usage: &CREATE_ITEM_USAGE,
1220 ..Default::default()
1221 },
1222 },
1223 Plan::AlterOwner(plan::AlterOwnerPlan {
1224 id,
1225 object_type: _,
1226 new_owner,
1227 }) => {
1228 let privileges = match id {
1229 ObjectId::ClusterReplica((cluster_id, _)) => {
1230 vec![(
1231 SystemObjectId::Object(cluster_id.into()),
1232 AclMode::CREATE,
1233 role_id,
1234 )]
1235 }
1236 ObjectId::Schema((database_spec, _)) => match database_spec {
1237 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1238 ResolvedDatabaseSpecifier::Id(database_id) => {
1239 vec![(
1240 SystemObjectId::Object(database_id.into()),
1241 AclMode::CREATE,
1242 role_id,
1243 )]
1244 }
1245 },
1246 ObjectId::Item(item_id) => {
1247 let item = catalog.get_item(item_id);
1248 vec![(
1249 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1250 AclMode::CREATE,
1251 role_id,
1252 )]
1253 }
1254 ObjectId::Cluster(_)
1255 | ObjectId::Database(_)
1256 | ObjectId::Role(_)
1257 | ObjectId::NetworkPolicy(_) => Vec::new(),
1258 };
1259 RbacRequirements {
1260 role_membership: BTreeSet::from([*new_owner]),
1261 ownership: vec![id.clone()],
1262 privileges,
1263 ..Default::default()
1264 }
1265 }
1266 Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1267 ownership: vec![ObjectId::Item(*relation_id)],
1268 item_usage: &CREATE_ITEM_USAGE,
1269 ..Default::default()
1270 },
1271 Plan::AlterMaterializedViewApplyReplacement(
1272 plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1273 ) => RbacRequirements {
1274 ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1275 item_usage: &CREATE_ITEM_USAGE,
1276 ..Default::default()
1277 },
1278 Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1279 ownership: vec![ObjectId::NetworkPolicy(*id)],
1280 item_usage: &CREATE_ITEM_USAGE,
1281 ..Default::default()
1282 },
1283 Plan::ReadThenWrite(plan::ReadThenWritePlan {
1284 id,
1285 selection,
1286 finishing: _,
1287 assignments,
1288 kind,
1289 returning,
1290 }) => {
1291 let acl_mode = match kind {
1292 MutationKind::Insert => AclMode::INSERT,
1293 MutationKind::Update => AclMode::UPDATE,
1294 MutationKind::Delete => AclMode::DELETE,
1295 };
1296 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1297 let mut privileges = vec![
1298 (
1299 SystemObjectId::Object(schema_id.clone()),
1300 AclMode::USAGE,
1301 role_id,
1302 ),
1303 (SystemObjectId::Object(id.into()), acl_mode, role_id),
1304 ];
1305 let mut seen = BTreeSet::from([(schema_id, role_id)]);
1306
1307 if assignments
1310 .values()
1311 .chain(returning.iter())
1312 .any(|assignment| assignment.contains_column())
1313 {
1314 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1315 seen.insert((id.into(), role_id));
1316 }
1317
1318 let items = selection
1325 .depends_on()
1326 .into_iter()
1327 .map(|gid| catalog.resolve_item_id(&gid));
1328 privileges.extend_from_slice(&generate_read_privileges_inner(
1329 catalog, items, role_id, &mut seen,
1330 ));
1331
1332 if let Some(privilege) = generate_cluster_usage_privileges(
1333 selection.as_const().is_some(),
1334 target_cluster_id,
1335 role_id,
1336 ) {
1337 privileges.push(privilege);
1338 }
1339 RbacRequirements {
1340 privileges,
1341 ..Default::default()
1342 }
1343 }
1344 Plan::GrantRole(plan::GrantRolePlan {
1345 role_ids: _,
1346 member_ids: _,
1347 grantor_id: _,
1348 })
1349 | Plan::RevokeRole(plan::RevokeRolePlan {
1350 role_ids: _,
1351 member_ids: _,
1352 grantor_id: _,
1353 }) => RbacRequirements {
1354 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1355 ..Default::default()
1356 },
1357 Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1358 update_privileges,
1359 grantees: _,
1360 })
1361 | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1362 update_privileges,
1363 revokees: _,
1364 }) => {
1365 let mut privileges = Vec::with_capacity(update_privileges.len());
1366 for UpdatePrivilege { target_id, .. } in update_privileges {
1367 match target_id {
1368 SystemObjectId::Object(object_id) => match object_id {
1369 ObjectId::ClusterReplica((cluster_id, _)) => {
1370 privileges.push((
1371 SystemObjectId::Object(cluster_id.into()),
1372 AclMode::USAGE,
1373 role_id,
1374 ));
1375 }
1376 ObjectId::Schema((database_spec, _)) => match database_spec {
1377 ResolvedDatabaseSpecifier::Ambient => {}
1378 ResolvedDatabaseSpecifier::Id(database_id) => {
1379 privileges.push((
1380 SystemObjectId::Object(database_id.into()),
1381 AclMode::USAGE,
1382 role_id,
1383 ));
1384 }
1385 },
1386 ObjectId::Item(item_id) => {
1387 let item = catalog.get_item(item_id);
1388 privileges.push((
1389 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1390 AclMode::USAGE,
1391 role_id,
1392 ))
1393 }
1394 ObjectId::Cluster(_)
1395 | ObjectId::Database(_)
1396 | ObjectId::Role(_)
1397 | ObjectId::NetworkPolicy(_) => {}
1398 },
1399 SystemObjectId::System => {}
1400 }
1401 }
1402 RbacRequirements {
1403 ownership: update_privileges
1404 .iter()
1405 .filter_map(|update_privilege| update_privilege.target_id.object_id())
1406 .cloned()
1407 .collect(),
1408 privileges,
1409 superuser_action: if update_privileges
1414 .iter()
1415 .any(|update_privilege| update_privilege.target_id.is_system())
1416 {
1417 Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1418 } else {
1419 None
1420 },
1421 ..Default::default()
1422 }
1423 }
1424 Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1425 privilege_objects,
1426 privilege_acl_items: _,
1427 is_grant: _,
1428 }) => RbacRequirements {
1429 role_membership: privilege_objects
1430 .iter()
1431 .map(|privilege_object| privilege_object.role_id)
1432 .collect(),
1433 privileges: privilege_objects
1434 .into_iter()
1435 .filter_map(|privilege_object| {
1436 if let (Some(database_id), Some(_)) =
1437 (privilege_object.database_id, privilege_object.schema_id)
1438 {
1439 Some((
1440 SystemObjectId::Object(database_id.into()),
1441 AclMode::USAGE,
1442 role_id,
1443 ))
1444 } else {
1445 None
1446 }
1447 })
1448 .collect(),
1449 superuser_action: if privilege_objects
1455 .iter()
1456 .any(|privilege_object| privilege_object.role_id.is_public())
1457 {
1458 Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1459 } else {
1460 None
1461 },
1462 ..Default::default()
1463 },
1464 Plan::ReassignOwned(plan::ReassignOwnedPlan {
1465 old_roles,
1466 new_role,
1467 reassign_ids: _,
1468 }) => RbacRequirements {
1469 role_membership: old_roles
1470 .into_iter()
1471 .cloned()
1472 .chain(iter::once(*new_role))
1473 .collect(),
1474 ..Default::default()
1475 },
1476 Plan::SideEffectingFunc(func) => {
1477 let role_membership = match func {
1478 SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1479 .expect("active_conns is required for Plan::SideEffectingFunc")(
1480 *connection_id
1481 )
1482 .map(|x| [x].into())
1483 .unwrap_or_default(),
1484 };
1485 RbacRequirements {
1486 role_membership,
1487 ..Default::default()
1488 }
1489 }
1490 Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1491 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1492 RbacRequirements {
1493 privileges: vec![
1494 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1495 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1496 ],
1497 ..Default::default()
1498 }
1499 }
1500 Plan::DiscardTemp
1501 | Plan::DiscardAll
1502 | Plan::EmptyQuery
1503 | Plan::ShowAllVariables
1504 | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1505 | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1506 | Plan::SetVariable(plan::SetVariablePlan {
1507 name: _,
1508 value: _,
1509 local: _,
1510 })
1511 | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1512 | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1513 | Plan::StartTransaction(plan::StartTransactionPlan {
1514 access: _,
1515 isolation_level: _,
1516 })
1517 | Plan::CommitTransaction(plan::CommitTransactionPlan {
1518 transaction_type: _,
1519 })
1520 | Plan::AbortTransaction(plan::AbortTransactionPlan {
1521 transaction_type: _,
1522 })
1523 | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1524 | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1525 | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1526 | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1527 | Plan::Declare(plan::DeclarePlan {
1528 name: _,
1529 stmt: _,
1530 sql: _,
1531 params: _,
1532 })
1533 | Plan::Fetch(plan::FetchPlan {
1534 name: _,
1535 count: _,
1536 timeout: _,
1537 })
1538 | Plan::Close(plan::ClosePlan { name: _ })
1539 | Plan::Prepare(plan::PreparePlan {
1540 name: _,
1541 stmt: _,
1542 desc: _,
1543 sql: _,
1544 })
1545 | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1546 | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1547 | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1548 }
1549}
1550
1551fn check_owner_roles(
1553 object_id: &ObjectId,
1554 role_ids: &BTreeSet<RoleId>,
1555 catalog: &impl SessionCatalog,
1556) -> bool {
1557 if let Some(owner_id) = catalog.get_owner_id(object_id) {
1558 role_ids.contains(&owner_id)
1559 } else {
1560 true
1561 }
1562}
1563
1564fn ownership_err(
1565 unheld_ownership: Vec<ObjectId>,
1566 catalog: &impl SessionCatalog,
1567) -> Result<(), UnauthorizedError> {
1568 if !unheld_ownership.is_empty() {
1569 let objects = unheld_ownership
1570 .into_iter()
1571 .map(|ownership| match ownership {
1572 ObjectId::Cluster(id) => (
1573 ObjectType::Cluster,
1574 catalog.get_cluster(id).name().to_string(),
1575 ),
1576 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1577 let cluster = catalog.get_cluster(cluster_id);
1578 let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1579 let name = QualifiedReplica {
1582 cluster: Ident::new_unchecked(cluster.name()),
1583 replica: Ident::new_unchecked(replica.name()),
1584 };
1585 (ObjectType::ClusterReplica, name.to_string())
1586 }
1587 ObjectId::Database(id) => (
1588 ObjectType::Database,
1589 catalog.get_database(&id).name().to_string(),
1590 ),
1591 ObjectId::Schema((database_spec, schema_spec)) => {
1592 let schema = catalog.get_schema(&database_spec, &schema_spec);
1593 let name = catalog.resolve_full_schema_name(schema.name());
1594 (ObjectType::Schema, name.to_string())
1595 }
1596 ObjectId::Item(id) => {
1597 let item = catalog.get_item(&id);
1598 let name = catalog.resolve_full_name(item.name());
1599 (item.item_type().into(), name.to_string())
1600 }
1601 ObjectId::NetworkPolicy(id) => (
1602 ObjectType::NetworkPolicy,
1603 catalog.get_network_policy(&id).name().to_string(),
1604 ),
1605 ObjectId::Role(_) => unreachable!("roles have no owner"),
1606 })
1607 .collect();
1608 Err(UnauthorizedError::Ownership { objects })
1609 } else {
1610 Ok(())
1611 }
1612}
1613
1614fn generate_required_source_privileges(
1615 name: &QualifiedItemName,
1616 data_source: &DataSourceDesc,
1617 in_cluster: Option<ClusterId>,
1618 role_id: RoleId,
1619) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1620 let mut privileges = vec![(
1621 SystemObjectId::Object(name.qualifiers.clone().into()),
1622 AclMode::CREATE,
1623 role_id,
1624 )];
1625 match (data_source, in_cluster) {
1626 (_, Some(id)) => {
1627 privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1628 }
1629 (DataSourceDesc::Ingestion(_), None) => {
1630 privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1631 }
1632 (_, None) => {}
1637 }
1638 privileges
1639}
1640
1641fn generate_read_privileges(
1649 catalog: &impl SessionCatalog,
1650 ids: impl Iterator<Item = CatalogItemId>,
1651 role_id: RoleId,
1652) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1653 generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1654}
1655
1656fn generate_read_privileges_inner(
1657 catalog: &impl SessionCatalog,
1658 ids: impl Iterator<Item = CatalogItemId>,
1659 role_id: RoleId,
1660 seen: &mut BTreeSet<(ObjectId, RoleId)>,
1661) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1662 let mut privileges = Vec::new();
1663 let mut views = Vec::new();
1664
1665 for id in ids {
1666 if seen.insert((id.into(), role_id)) {
1667 let item = catalog.get_item(&id);
1668 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1669 if seen.insert((schema_id.clone(), role_id)) {
1670 privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1671 }
1672 match item.item_type() {
1673 CatalogItemType::View
1674 | CatalogItemType::MaterializedView
1675 | CatalogItemType::ContinualTask => {
1676 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1677 views.push((item.references().items().copied(), item.owner_id()));
1678 }
1679 CatalogItemType::Table | CatalogItemType::Source => {
1680 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1681 }
1682 CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1683 privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1684 }
1685 CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1686 }
1687 }
1688 }
1689
1690 for (view_ids, view_owner) in views {
1691 privileges.extend_from_slice(&generate_read_privileges_inner(
1692 catalog, view_ids, view_owner, seen,
1693 ));
1694 }
1695
1696 privileges
1697}
1698
1699fn generate_usage_privileges(
1700 catalog: &impl SessionCatalog,
1701 ids: &ResolvedIds,
1702 role_id: RoleId,
1703 item_types: &BTreeSet<CatalogItemType>,
1704) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1705 ids.items()
1707 .filter_map(move |id| {
1708 let item = catalog.get_item(id);
1709 if item_types.contains(&item.item_type()) {
1710 let schema_id = item.name().qualifiers.clone().into();
1711 Some([
1712 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1713 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1714 ])
1715 } else {
1716 None
1717 }
1718 })
1719 .flatten()
1720 .collect()
1721}
1722
1723fn generate_cluster_usage_privileges(
1724 expr_is_const: bool,
1725 target_cluster_id: Option<ClusterId>,
1726 role_id: RoleId,
1727) -> Option<(SystemObjectId, AclMode, RoleId)> {
1728 if !expr_is_const {
1731 if let Some(cluster_id) = target_cluster_id {
1732 return Some((
1733 SystemObjectId::Object(cluster_id.into()),
1734 AclMode::USAGE,
1735 role_id,
1736 ));
1737 }
1738 }
1739
1740 None
1741}
1742
1743fn check_object_privileges(
1744 catalog: &impl SessionCatalog,
1745 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1746 role_membership: BTreeSet<RoleId>,
1747 current_role_id: RoleId,
1748) -> Result<(), UnauthorizedError> {
1749 let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1750 role_memberships.insert(current_role_id, role_membership);
1751 for (object_id, acl_mode, role_id) in privileges {
1752 if matches!(
1756 &object_id,
1757 SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary)))
1758 ) {
1759 continue;
1760 }
1761
1762 let role_membership = role_memberships
1763 .entry(role_id)
1764 .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1765 let object_privileges = catalog
1766 .get_privileges(&object_id)
1767 .expect("only object types with privileges will generate required privileges");
1768 let role_privileges = role_membership
1769 .iter()
1770 .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1771 .map(|mz_acl_item| mz_acl_item.acl_mode)
1772 .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1773 if !role_privileges.contains(acl_mode) {
1774 let role_name = catalog.get_role(&role_id).name().to_string();
1775 let privileges = acl_mode.to_error_string();
1776 return Err(UnauthorizedError::Privilege {
1777 object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1778 role_name,
1779 privileges,
1780 });
1781 }
1782 }
1783
1784 Ok(())
1785}
1786
1787pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1788 const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1789 .union(AclMode::SELECT)
1790 .union(AclMode::UPDATE)
1791 .union(AclMode::DELETE);
1792 const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1793 const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1794 .union(AclMode::CREATE_DB)
1795 .union(AclMode::CREATE_CLUSTER)
1796 .union(AclMode::CREATE_NETWORK_POLICY);
1797
1798 const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1799 match object_type {
1800 SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1801 SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1802 SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1803 SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1804 SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1805 SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1806 SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1807 SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1808 SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1809 SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1810 SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1811 SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1812 SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1813 SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1814 SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1815 SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1816 SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT,
1817 SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1818 }
1819}
1820
1821pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1822 MzAclItem {
1823 grantee: owner_id,
1824 grantor: owner_id,
1825 acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1826 }
1827}
1828
1829const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1830 match object_type {
1831 ObjectType::Table
1832 | ObjectType::View
1833 | ObjectType::MaterializedView
1834 | ObjectType::Source
1835 | ObjectType::ContinualTask => AclMode::SELECT,
1836 ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1837 ObjectType::Sink
1838 | ObjectType::Index
1839 | ObjectType::Role
1840 | ObjectType::Cluster
1841 | ObjectType::ClusterReplica
1842 | ObjectType::Secret
1843 | ObjectType::Connection
1844 | ObjectType::Database
1845 | ObjectType::Func
1846 | ObjectType::NetworkPolicy => AclMode::empty(),
1847 }
1848}
1849
1850pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1851 let acl_mode = default_builtin_object_acl_mode(object_type);
1852 MzAclItem {
1853 grantee: MZ_SUPPORT_ROLE_ID,
1854 grantor: MZ_SYSTEM_ROLE_ID,
1855 acl_mode,
1856 }
1857}
1858
1859pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1860 let acl_mode = default_builtin_object_acl_mode(object_type);
1861 MzAclItem {
1862 grantee: RoleId::Public,
1863 grantor: MZ_SYSTEM_ROLE_ID,
1864 acl_mode,
1865 }
1866}