1use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26 CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29 CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30 SchemaSpecifier, SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34 DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40fn rbac_check_preamble(
42 catalog: &impl SessionCatalog,
43 session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45 if catalog
50 .try_get_role(&session_meta.role_metadata().current_role)
51 .is_none()
52 {
53 return Err(UnauthorizedError::ConcurrentRoleDrop(
54 session_meta.role_metadata().current_role.clone(),
55 ));
56 };
57 if catalog
58 .try_get_role(&session_meta.role_metadata().session_role)
59 .is_none()
60 {
61 return Err(UnauthorizedError::ConcurrentRoleDrop(
62 session_meta.role_metadata().session_role.clone(),
63 ));
64 };
65 if catalog
66 .try_get_role(&session_meta.role_metadata().authenticated_role)
67 .is_none()
68 {
69 return Err(UnauthorizedError::ConcurrentRoleDrop(
70 session_meta.role_metadata().authenticated_role.clone(),
71 ));
72 };
73
74 Ok(())
75}
76
77fn filter_requirements(
79 catalog: &impl SessionCatalog,
80 session_meta: &dyn SessionMetadata,
81 rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83 let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86 && !session_meta.role_metadata().current_role.is_system()
87 && !session_meta.role_metadata().session_role.is_system();
88 let is_superuser = session_meta.is_superuser();
90 if is_rbac_disabled || is_superuser {
91 return rbac_requirements.filter_to_mandatory_requirements();
92 }
93
94 rbac_requirements
95}
96
97static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99 btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104 let mut items = DEFAULT_ITEM_USAGE.clone();
105 items.insert(CatalogItemType::Type);
106 items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113 #[error("permission denied to {action}")]
115 Superuser { action: String },
116 #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118 Ownership { objects: Vec<(ObjectType, String)> },
119 #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121 RoleMembership { role_names: Vec<String> },
122 #[error("permission denied for {object_description}")]
124 Privilege {
125 object_description: ErrorMessageObjectDescription,
126 role_name: String,
127 privileges: String,
128 },
129 #[error("permission denied to {action}")]
133 MzSystem { action: String },
134 #[error("permission denied to {action}")]
136 MzSupport { action: String },
137 #[error("role {0} was concurrently dropped")]
139 ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143 pub fn detail(&self) -> Option<String> {
144 match &self {
145 UnauthorizedError::Superuser { action } => {
146 Some(format!("You must be a superuser to {}", action))
147 }
148 UnauthorizedError::Privilege {
149 object_description,
150 role_name,
151 privileges,
152 } => Some(format!(
153 "The '{role_name}' role needs {privileges} privileges on {object_description}"
154 )),
155 UnauthorizedError::MzSystem { .. } => {
156 Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157 }
158 UnauthorizedError::MzSupport { .. } => Some(format!(
159 "The '{}' role has very limited privileges",
160 SUPPORT_USER.name
161 )),
162 UnauthorizedError::ConcurrentRoleDrop(_) => {
163 Some("Please disconnect and re-connect with a valid role.".into())
164 }
165 UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166 }
167 }
168}
169
170#[derive(Debug)]
172struct RbacRequirements {
173 role_membership: BTreeSet<RoleId>,
175 ownership: Vec<ObjectId>,
177 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180 item_usage: &'static BTreeSet<CatalogItemType>,
185 superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190 fn empty() -> RbacRequirements {
191 RbacRequirements {
192 role_membership: BTreeSet::new(),
193 ownership: Vec::new(),
194 privileges: Vec::new(),
195 item_usage: &EMPTY_ITEM_USAGE,
196 superuser_action: None,
197 }
198 }
199
200 fn validate(
201 self,
202 catalog: &impl SessionCatalog,
203 session: &dyn SessionMetadata,
204 resolved_ids: &ResolvedIds,
205 ) -> Result<(), UnauthorizedError> {
206 let role_membership =
208 catalog.collect_role_membership(&session.role_metadata().current_role);
209
210 check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212 let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215 if !unheld_membership.is_empty() {
216 let role_names = unheld_membership
217 .into_iter()
218 .map(|role_id| {
219 catalog
221 .try_get_role(role_id)
222 .map(|role| role.name().to_string())
223 .unwrap_or_else(|| role_id.to_string())
224 })
225 .collect();
226 return Err(UnauthorizedError::RoleMembership { role_names });
227 }
228
229 let unheld_ownership = self
232 .ownership
233 .into_iter()
234 .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235 .collect();
236 ownership_err(unheld_ownership, catalog)?;
237
238 check_object_privileges(
239 catalog,
240 self.privileges,
241 role_membership,
242 session.role_metadata().current_role,
243 )?;
244
245 if let Some(action) = self.superuser_action {
246 return Err(UnauthorizedError::Superuser { action });
247 }
248
249 Ok(())
250 }
251
252 fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253 let RbacRequirements {
254 role_membership,
255 ownership,
256 privileges,
257 item_usage,
258 superuser_action: _,
259 } = self;
260 let role_membership = role_membership
261 .into_iter()
262 .filter(|id| id.is_system())
263 .collect();
264 let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265 let privileges = privileges
266 .into_iter()
267 .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268 .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270 .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271 .collect();
272 let superuser_action = None;
273 RbacRequirements {
274 role_membership,
275 ownership,
276 privileges,
277 item_usage,
278 superuser_action,
279 }
280 }
281}
282
283impl Default for RbacRequirements {
284 fn default() -> Self {
285 RbacRequirements {
286 role_membership: BTreeSet::new(),
287 ownership: Vec::new(),
288 privileges: Vec::new(),
289 item_usage: &DEFAULT_ITEM_USAGE,
290 superuser_action: None,
291 }
292 }
293}
294
295pub fn check_usage(
297 catalog: &impl SessionCatalog,
298 session: &dyn SessionMetadata,
299 resolved_ids: &ResolvedIds,
300 item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302 rbac_check_preamble(catalog, session)?;
303
304 let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307 let existing_resolved_ids =
310 resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312 let required_privileges = generate_usage_privileges(
313 catalog,
314 &existing_resolved_ids,
315 session.role_metadata().current_role,
316 item_types,
317 )
318 .into_iter()
319 .collect();
320
321 let mut rbac_requirements = RbacRequirements::empty();
322 rbac_requirements.privileges = required_privileges;
323 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324 let required_privileges = rbac_requirements.privileges;
325
326 check_object_privileges(
327 catalog,
328 required_privileges,
329 role_membership,
330 session.role_metadata().current_role,
331 )?;
332
333 Ok(())
334}
335
336pub fn check_plan(
338 catalog: &impl SessionCatalog,
339 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
345 session: &dyn SessionMetadata,
346 plan: &Plan,
347 target_cluster_id: Option<ClusterId>,
348 resolved_ids: &ResolvedIds,
349) -> Result<(), UnauthorizedError> {
350 rbac_check_preamble(catalog, session)?;
351
352 let rbac_requirements = generate_rbac_requirements(
353 catalog,
354 plan,
355 active_conns,
356 target_cluster_id,
357 session.role_metadata().current_role,
358 );
359 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
360 debug!(
361 "rbac requirements {rbac_requirements:?} for plan {:?}",
362 PlanKind::from(plan)
363 );
364 rbac_requirements.validate(catalog, session, resolved_ids)
365}
366
367pub fn is_rbac_enabled_for_session(
369 system_vars: &SystemVars,
370 session: &dyn SessionMetadata,
371) -> bool {
372 let server_enabled = system_vars.enable_rbac_checks();
373 let session_enabled = session.enable_session_rbac_checks();
374
375 server_enabled || session_enabled
378}
379
380fn generate_rbac_requirements(
382 catalog: &impl SessionCatalog,
383 plan: &Plan,
384 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
385 target_cluster_id: Option<ClusterId>,
386 role_id: RoleId,
387) -> RbacRequirements {
388 match plan {
389 Plan::CreateConnection(plan::CreateConnectionPlan {
390 name,
391 if_not_exists: _,
392 connection: _,
393 validate: _,
394 }) => RbacRequirements {
395 privileges: vec![(
396 SystemObjectId::Object(name.qualifiers.clone().into()),
397 AclMode::CREATE,
398 role_id,
399 )],
400 item_usage: &CREATE_ITEM_USAGE,
401 ..Default::default()
402 },
403 Plan::CreateDatabase(plan::CreateDatabasePlan {
404 name: _,
405 if_not_exists: _,
406 }) => RbacRequirements {
407 privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
408 item_usage: &CREATE_ITEM_USAGE,
409 ..Default::default()
410 },
411 Plan::CreateSchema(plan::CreateSchemaPlan {
412 database_spec,
413 schema_name: _,
414 if_not_exists: _,
415 }) => {
416 let privileges = match database_spec {
417 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
418 ResolvedDatabaseSpecifier::Id(database_id) => {
419 vec![(
420 SystemObjectId::Object(database_id.into()),
421 AclMode::CREATE,
422 role_id,
423 )]
424 }
425 };
426 RbacRequirements {
427 privileges,
428 item_usage: &CREATE_ITEM_USAGE,
429 ..Default::default()
430 }
431 }
432 Plan::CreateRole(plan::CreateRolePlan {
433 name: _,
434 attributes,
435 }) => {
436 if attributes.superuser.unwrap_or(false) {
437 RbacRequirements {
438 superuser_action: Some("create superuser role".to_string()),
439 ..Default::default()
440 }
441 } else {
442 RbacRequirements {
443 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
444 item_usage: &CREATE_ITEM_USAGE,
445 ..Default::default()
446 }
447 }
448 }
449 Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
450 privileges: vec![(
451 SystemObjectId::System,
452 AclMode::CREATE_NETWORK_POLICY,
453 role_id,
454 )],
455 item_usage: &CREATE_ITEM_USAGE,
456 ..Default::default()
457 },
458 Plan::CreateCluster(plan::CreateClusterPlan {
459 name: _,
460 variant: _,
461 workload_class: _,
462 }) => RbacRequirements {
463 privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
464 item_usage: &CREATE_ITEM_USAGE,
465 ..Default::default()
466 },
467 Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
468 cluster_id,
469 name: _,
470 config: _,
471 }) => RbacRequirements {
472 ownership: vec![ObjectId::Cluster(*cluster_id)],
473 item_usage: &CREATE_ITEM_USAGE,
474 ..Default::default()
475 },
476 Plan::CreateSource(plan::CreateSourcePlan {
477 name,
478 source,
479 if_not_exists: _,
480 timeline: _,
481 in_cluster,
482 }) => RbacRequirements {
483 privileges: generate_required_source_privileges(
484 name,
485 &source.data_source,
486 *in_cluster,
487 role_id,
488 ),
489 item_usage: &CREATE_ITEM_USAGE,
490 ..Default::default()
491 },
492 Plan::CreateSources(plans) => RbacRequirements {
493 privileges: plans
494 .iter()
495 .flat_map(
496 |plan::CreateSourcePlanBundle {
497 item_id: _,
498 global_id: _,
499 plan:
500 plan::CreateSourcePlan {
501 name,
502 source,
503 if_not_exists: _,
504 timeline: _,
505 in_cluster,
506 },
507 resolved_ids: _,
508 available_source_references: _,
509 }| {
510 generate_required_source_privileges(
511 name,
512 &source.data_source,
513 *in_cluster,
514 role_id,
515 )
516 .into_iter()
517 },
518 )
519 .collect(),
520 item_usage: &CREATE_ITEM_USAGE,
521 ..Default::default()
522 },
523 Plan::CreateSecret(plan::CreateSecretPlan {
524 name,
525 secret: _,
526 if_not_exists: _,
527 }) => RbacRequirements {
528 privileges: vec![(
529 SystemObjectId::Object(name.qualifiers.clone().into()),
530 AclMode::CREATE,
531 role_id,
532 )],
533 item_usage: &CREATE_ITEM_USAGE,
534 ..Default::default()
535 },
536 Plan::CreateSink(plan::CreateSinkPlan {
537 name,
538 sink,
539 with_snapshot: _,
540 if_not_exists: _,
541 in_cluster,
542 }) => {
543 let mut privileges = vec![(
544 SystemObjectId::Object(name.qualifiers.clone().into()),
545 AclMode::CREATE,
546 role_id,
547 )];
548 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
549 privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
550 privileges.push((
551 SystemObjectId::Object(in_cluster.into()),
552 AclMode::CREATE,
553 role_id,
554 ));
555 RbacRequirements {
556 privileges,
557 item_usage: &CREATE_ITEM_USAGE,
558 ..Default::default()
559 }
560 }
561 Plan::CreateTable(plan::CreateTablePlan {
562 name,
563 table: _,
564 if_not_exists: _,
565 }) => RbacRequirements {
566 privileges: vec![(
567 SystemObjectId::Object(name.qualifiers.clone().into()),
568 AclMode::CREATE,
569 role_id,
570 )],
571 item_usage: &CREATE_ITEM_USAGE,
572 ..Default::default()
573 },
574 Plan::CreateView(plan::CreateViewPlan {
575 name,
576 view: _,
577 replace,
578 drop_ids: _,
579 if_not_exists: _,
580 ambiguous_columns: _,
581 }) => RbacRequirements {
582 ownership: replace
583 .map(|id| vec![ObjectId::Item(id)])
584 .unwrap_or_default(),
585 privileges: vec![(
586 SystemObjectId::Object(name.qualifiers.clone().into()),
587 AclMode::CREATE,
588 role_id,
589 )],
590 item_usage: &CREATE_ITEM_USAGE,
591 ..Default::default()
592 },
593 Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
594 name,
595 materialized_view,
596 replace,
597 drop_ids: _,
598 if_not_exists: _,
599 ambiguous_columns: _,
600 }) => RbacRequirements {
601 ownership: replace
602 .map(|id| vec![ObjectId::Item(id)])
603 .unwrap_or_default(),
604 privileges: vec![
605 (
606 SystemObjectId::Object(name.qualifiers.clone().into()),
607 AclMode::CREATE,
608 role_id,
609 ),
610 (
611 SystemObjectId::Object(materialized_view.cluster_id.into()),
612 AclMode::CREATE,
613 role_id,
614 ),
615 ],
616 item_usage: &CREATE_ITEM_USAGE,
617 ..Default::default()
618 },
619 Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
620 name,
621 placeholder_id: _,
622 desc: _,
623 input_id: _,
624 with_snapshot: _,
625 continual_task,
626 }) => RbacRequirements {
627 privileges: vec![
628 (
629 SystemObjectId::Object(name.qualifiers.clone().into()),
630 AclMode::CREATE,
631 role_id,
632 ),
633 (
634 SystemObjectId::Object(continual_task.cluster_id.into()),
635 AclMode::CREATE,
636 role_id,
637 ),
638 ],
639 item_usage: &CREATE_ITEM_USAGE,
640 ..Default::default()
641 },
642 Plan::CreateIndex(plan::CreateIndexPlan {
643 name,
644 index,
645 if_not_exists: _,
646 }) => {
647 let index_on_item = catalog.resolve_item_id(&index.on);
648 RbacRequirements {
649 ownership: vec![ObjectId::Item(index_on_item)],
650 privileges: vec![
651 (
652 SystemObjectId::Object(name.qualifiers.clone().into()),
653 AclMode::CREATE,
654 role_id,
655 ),
656 (
657 SystemObjectId::Object(index.cluster_id.into()),
658 AclMode::CREATE,
659 role_id,
660 ),
661 ],
662 item_usage: &CREATE_ITEM_USAGE,
663 ..Default::default()
664 }
665 }
666 Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
667 privileges: vec![(
668 SystemObjectId::Object(name.qualifiers.clone().into()),
669 AclMode::CREATE,
670 role_id,
671 )],
672 item_usage: &CREATE_ITEM_USAGE,
673 ..Default::default()
674 },
675 Plan::Comment(plan::CommentPlan {
676 object_id,
677 sub_component: _,
678 comment: _,
679 }) => {
680 let (ownership, privileges) = match object_id {
681 CommentObjectId::Role(_) => (
684 Vec::new(),
685 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
686 ),
687 _ => (vec![ObjectId::from(*object_id)], Vec::new()),
688 };
689 RbacRequirements {
690 ownership,
691 privileges,
692 ..Default::default()
693 }
694 }
695 Plan::DropObjects(plan::DropObjectsPlan {
696 referenced_ids,
697 drop_ids: _,
698 object_type,
699 }) => {
700 let privileges = if object_type == &ObjectType::Role {
701 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
702 } else {
703 referenced_ids
704 .iter()
705 .filter_map(|id| match id {
706 ObjectId::ClusterReplica((cluster_id, _)) => Some((
707 SystemObjectId::Object(cluster_id.into()),
708 AclMode::USAGE,
709 role_id,
710 )),
711 ObjectId::Schema((database_spec, _)) => match database_spec {
712 ResolvedDatabaseSpecifier::Ambient => None,
713 ResolvedDatabaseSpecifier::Id(database_id) => Some((
714 SystemObjectId::Object(database_id.into()),
715 AclMode::USAGE,
716 role_id,
717 )),
718 },
719 ObjectId::Item(item_id) => {
720 let item = catalog.get_item(item_id);
721 Some((
722 SystemObjectId::Object(item.name().qualifiers.clone().into()),
723 AclMode::USAGE,
724 role_id,
725 ))
726 }
727 ObjectId::Cluster(_)
728 | ObjectId::Database(_)
729 | ObjectId::Role(_)
730 | ObjectId::NetworkPolicy(_) => None,
731 })
732 .collect()
733 };
734 RbacRequirements {
735 ownership: referenced_ids.clone(),
737 privileges,
738 ..Default::default()
739 }
740 }
741 Plan::DropOwned(plan::DropOwnedPlan {
742 role_ids,
743 drop_ids: _,
744 privilege_revokes: _,
745 default_privilege_revokes: _,
746 }) => RbacRequirements {
747 role_membership: role_ids.into_iter().cloned().collect(),
748 ..Default::default()
749 },
750 Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
751 let container_id = match id {
752 ObjectId::Item(id) => Some(SystemObjectId::Object(
753 catalog.get_item(id).name().qualifiers.clone().into(),
754 )),
755 ObjectId::Schema((database_id, _schema_id)) => match database_id {
756 ResolvedDatabaseSpecifier::Ambient => None,
757 ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
758 },
759 ObjectId::Cluster(_)
760 | ObjectId::ClusterReplica(_)
761 | ObjectId::Database(_)
762 | ObjectId::Role(_)
763 | ObjectId::NetworkPolicy(_) => None,
764 };
765 let privileges = match container_id {
766 Some(id) => vec![(id, AclMode::USAGE, role_id)],
767 None => Vec::new(),
768 };
769 RbacRequirements {
770 privileges,
771 item_usage: &EMPTY_ITEM_USAGE,
772 ..Default::default()
773 }
774 }
775 Plan::ShowColumns(plan::ShowColumnsPlan {
776 id,
777 select_plan,
778 new_resolved_ids: _,
779 }) => {
780 let mut privileges = vec![(
781 SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
782 AclMode::USAGE,
783 role_id,
784 )];
785
786 for privilege in generate_rbac_requirements(
787 catalog,
788 &Plan::Select(select_plan.clone()),
789 active_conns,
790 target_cluster_id,
791 role_id,
792 )
793 .privileges
794 {
795 privileges.push(privilege);
796 }
797 RbacRequirements {
798 privileges,
799 ..Default::default()
800 }
801 }
802 Plan::Select(plan::SelectPlan {
803 source,
804 select: _,
805 when: _,
806 finishing: _,
807 copy_to: _,
808 }) => {
809 let items = source
810 .depends_on()
811 .into_iter()
812 .map(|gid| catalog.resolve_item_id(&gid));
813 let mut privileges = generate_read_privileges(catalog, items, role_id);
814 if let Some(privilege) = generate_cluster_usage_privileges(
815 source.as_const().is_some(),
816 target_cluster_id,
817 role_id,
818 ) {
819 privileges.push(privilege);
820 }
821 RbacRequirements {
822 privileges,
823 ..Default::default()
824 }
825 }
826 Plan::Subscribe(plan::SubscribePlan {
827 from,
828 with_snapshot: _,
829 when: _,
830 up_to: _,
831 copy_to: _,
832 emit_progress: _,
833 output: _,
834 }) => {
835 let items = from
836 .depends_on()
837 .into_iter()
838 .map(|gid| catalog.resolve_item_id(&gid));
839 let mut privileges = generate_read_privileges(catalog, items, role_id);
840 if let Some(cluster_id) = target_cluster_id {
841 privileges.push((
842 SystemObjectId::Object(cluster_id.into()),
843 AclMode::USAGE,
844 role_id,
845 ));
846 }
847 RbacRequirements {
848 privileges,
849 ..Default::default()
850 }
851 }
852 Plan::CopyFrom(plan::CopyFromPlan {
853 target_name: _,
854 target_id,
855 source: _,
856 columns: _,
857 source_desc: _,
858 mfp: _,
859 params: _,
860 filter: _,
861 }) => RbacRequirements {
862 privileges: vec![
863 (
864 SystemObjectId::Object(
865 catalog.get_item(target_id).name().qualifiers.clone().into(),
866 ),
867 AclMode::USAGE,
868 role_id,
869 ),
870 (
871 SystemObjectId::Object(target_id.into()),
872 AclMode::INSERT,
873 role_id,
874 ),
875 ],
876 ..Default::default()
877 },
878 Plan::CopyTo(plan::CopyToPlan {
879 select_plan,
880 desc: _,
881 to: _,
882 connection: _,
883 connection_id: _,
884 format: _,
885 max_file_size: _,
886 }) => {
887 let items = select_plan
888 .source
889 .depends_on()
890 .into_iter()
891 .map(|gid| catalog.resolve_item_id(&gid));
892 let mut privileges = generate_read_privileges(catalog, items, role_id);
893 if let Some(cluster_id) = target_cluster_id {
894 privileges.push((
895 SystemObjectId::Object(cluster_id.into()),
896 AclMode::USAGE,
897 role_id,
898 ));
899 }
900 RbacRequirements {
901 privileges,
902 ..Default::default()
903 }
904 }
905 Plan::ExplainPlan(plan::ExplainPlanPlan {
906 stage: _,
907 format: _,
908 config: _,
909 explainee,
910 })
911 | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
912 privileges: match explainee {
913 Explainee::View(id)
914 | Explainee::MaterializedView(id)
915 | Explainee::Index(id)
916 | Explainee::ReplanView(id)
917 | Explainee::ReplanMaterializedView(id)
918 | Explainee::ReplanIndex(id) => {
919 let item = catalog.get_item(id);
920 let schema_id: ObjectId = item.name().qualifiers.clone().into();
921 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
922 }
923 Explainee::Statement(stmt) => stmt
924 .depends_on()
925 .into_iter()
926 .map(|id| {
927 let item = catalog.get_item_by_global_id(&id);
928 let schema_id: ObjectId = item.name().qualifiers.clone().into();
929 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
930 })
931 .collect(),
932 },
933 item_usage: match explainee {
934 Explainee::View(..)
935 | Explainee::MaterializedView(..)
936 | Explainee::Index(..)
937 | Explainee::ReplanView(..)
938 | Explainee::ReplanMaterializedView(..)
939 | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
940 Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
941 },
942 ..Default::default()
943 },
944 Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
945 RbacRequirements {
946 privileges: {
947 let item = catalog.get_item_by_global_id(sink_from);
948 let schema_id: ObjectId = item.name().qualifiers.clone().into();
949 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
950 },
951 item_usage: &EMPTY_ITEM_USAGE,
952 ..Default::default()
953 }
954 }
955 Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
956 format: _,
957 raw_plan,
958 when: _,
959 }) => RbacRequirements {
960 privileges: raw_plan
961 .depends_on()
962 .into_iter()
963 .map(|id| {
964 let item = catalog.get_item_by_global_id(&id);
965 let schema_id: ObjectId = item.name().qualifiers.clone().into();
966 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
967 })
968 .collect(),
969 ..Default::default()
970 },
971 Plan::Insert(plan::InsertPlan {
972 id,
973 values,
974 returning,
975 }) => {
976 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
977 let mut privileges = vec![
978 (
979 SystemObjectId::Object(schema_id.clone()),
980 AclMode::USAGE,
981 role_id,
982 ),
983 (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
984 ];
985 let mut seen = BTreeSet::from([(schema_id, role_id)]);
986
987 if returning
990 .iter()
991 .any(|assignment| assignment.contains_column())
992 {
993 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
994 seen.insert((id.into(), role_id));
995 }
996
997 let items = values
998 .depends_on()
999 .into_iter()
1000 .map(|gid| catalog.resolve_item_id(&gid));
1001 privileges.extend_from_slice(&generate_read_privileges_inner(
1002 catalog, items, role_id, &mut seen,
1003 ));
1004
1005 if let Some(privilege) = generate_cluster_usage_privileges(
1006 values.as_const().is_some(),
1007 target_cluster_id,
1008 role_id,
1009 ) {
1010 privileges.push(privilege);
1011 } else if !returning.is_empty() {
1012 if let Some(cluster_id) = target_cluster_id {
1015 privileges.push((
1016 SystemObjectId::Object(cluster_id.into()),
1017 AclMode::USAGE,
1018 role_id,
1019 ));
1020 }
1021 }
1022 RbacRequirements {
1023 privileges,
1024 ..Default::default()
1025 }
1026 }
1027 Plan::AlterCluster(plan::AlterClusterPlan {
1028 id,
1029 name: _,
1030 options: _,
1031 strategy: _,
1032 }) => RbacRequirements {
1033 ownership: vec![ObjectId::Cluster(*id)],
1034 item_usage: &CREATE_ITEM_USAGE,
1035 ..Default::default()
1036 },
1037 Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1038 ownership: vec![ObjectId::Item(*id)],
1039 privileges: vec![(
1040 SystemObjectId::Object(set_cluster.into()),
1041 AclMode::CREATE,
1042 role_id,
1043 )],
1044 item_usage: &CREATE_ITEM_USAGE,
1045 ..Default::default()
1046 },
1047 Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1048 id,
1049 window: _,
1050 value: _,
1051 object_type: _,
1052 }) => RbacRequirements {
1053 ownership: vec![ObjectId::Item(*id)],
1054 item_usage: &CREATE_ITEM_USAGE,
1055 ..Default::default()
1056 },
1057 Plan::AlterSourceTimestampInterval(plan::AlterSourceTimestampIntervalPlan {
1058 id,
1059 value: _,
1060 interval: _,
1061 }) => RbacRequirements {
1062 ownership: vec![ObjectId::Item(*id)],
1063 item_usage: &CREATE_ITEM_USAGE,
1064 ..Default::default()
1065 },
1066 Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1067 ownership: vec![ObjectId::Item(*id)],
1068 ..Default::default()
1069 },
1070 Plan::AlterSource(plan::AlterSourcePlan {
1071 item_id,
1072 ingestion_id: _,
1073 action: _,
1074 }) => RbacRequirements {
1075 ownership: vec![ObjectId::Item(*item_id)],
1076 item_usage: &CREATE_ITEM_USAGE,
1077 ..Default::default()
1078 },
1079 Plan::AlterSink(plan::AlterSinkPlan {
1080 item_id,
1081 global_id: _,
1082 sink,
1083 with_snapshot: _,
1084 in_cluster,
1085 }) => {
1086 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1087 let mut privileges = generate_read_privileges(catalog, items, role_id);
1088 privileges.push((
1089 SystemObjectId::Object(in_cluster.into()),
1090 AclMode::CREATE,
1091 role_id,
1092 ));
1093 RbacRequirements {
1094 ownership: vec![ObjectId::Item(*item_id)],
1095 privileges,
1096 item_usage: &CREATE_ITEM_USAGE,
1097 ..Default::default()
1098 }
1099 }
1100 Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1101 id,
1102 name: _,
1103 to_name: _,
1104 }) => RbacRequirements {
1105 ownership: vec![ObjectId::Cluster(*id)],
1106 ..Default::default()
1107 },
1108 Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1109 id_a,
1110 id_b,
1111 name_a: _,
1112 name_b: _,
1113 name_temp: _,
1114 }) => RbacRequirements {
1115 ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1116 ..Default::default()
1117 },
1118 Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1119 cluster_id,
1120 replica_id,
1121 name: _,
1122 to_name: _,
1123 }) => RbacRequirements {
1124 ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1125 ..Default::default()
1126 },
1127 Plan::AlterItemRename(plan::AlterItemRenamePlan {
1128 id,
1129 current_full_name: _,
1130 to_name: _,
1131 object_type: _,
1132 }) => RbacRequirements {
1133 ownership: vec![ObjectId::Item(*id)],
1134 ..Default::default()
1135 },
1136 Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1137 cur_schema_spec,
1138 new_schema_name: _,
1139 }) => {
1140 let privileges = match cur_schema_spec.0 {
1141 ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1142 SystemObjectId::Object(ObjectId::Database(db_id)),
1143 AclMode::CREATE,
1144 role_id,
1145 )],
1146 ResolvedDatabaseSpecifier::Ambient => vec![],
1147 };
1148
1149 RbacRequirements {
1150 ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1151 privileges,
1152 ..Default::default()
1153 }
1154 }
1155 Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1156 schema_a_spec,
1157 schema_a_name: _,
1158 schema_b_spec,
1159 schema_b_name: _,
1160 name_temp: _,
1161 }) => {
1162 let mut privileges = vec![];
1163 if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1164 privileges.push((
1165 SystemObjectId::Object(ObjectId::Database(id_a)),
1166 AclMode::CREATE,
1167 role_id,
1168 ));
1169 }
1170 if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1171 privileges.push((
1172 SystemObjectId::Object(ObjectId::Database(id_b)),
1173 AclMode::CREATE,
1174 role_id,
1175 ));
1176 }
1177
1178 RbacRequirements {
1179 ownership: vec![
1180 ObjectId::Schema(*schema_a_spec),
1181 ObjectId::Schema(*schema_b_spec),
1182 ],
1183 privileges,
1184 ..Default::default()
1185 }
1186 }
1187 Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1188 ownership: vec![ObjectId::Item(*id)],
1189 item_usage: &CREATE_ITEM_USAGE,
1190 ..Default::default()
1191 },
1192 Plan::AlterRole(plan::AlterRolePlan {
1193 id,
1194 name: _,
1195 option,
1196 }) => match option {
1197 plan::PlannedAlterRoleOption::Attributes(attributes)
1199 if attributes.superuser.is_some() =>
1200 {
1201 RbacRequirements {
1202 superuser_action: Some("alter superuser role".to_string()),
1203 ..Default::default()
1204 }
1205 }
1206 plan::PlannedAlterRoleOption::Attributes(attributes)
1208 if attributes.password.is_some() && role_id == *id =>
1209 {
1210 RbacRequirements::default()
1211 }
1212 plan::PlannedAlterRoleOption::Attributes(attributes)
1214 if attributes.password.is_some() =>
1215 {
1216 RbacRequirements {
1217 superuser_action: Some("alter password of role".to_string()),
1218 ..Default::default()
1219 }
1220 }
1221 plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1223 RbacRequirements::default()
1224 }
1225 _ => RbacRequirements {
1227 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1228 item_usage: &CREATE_ITEM_USAGE,
1229 ..Default::default()
1230 },
1231 },
1232 Plan::AlterOwner(plan::AlterOwnerPlan {
1233 id,
1234 object_type: _,
1235 new_owner,
1236 }) => {
1237 let privileges = match id {
1238 ObjectId::ClusterReplica((cluster_id, _)) => {
1239 vec![(
1240 SystemObjectId::Object(cluster_id.into()),
1241 AclMode::CREATE,
1242 role_id,
1243 )]
1244 }
1245 ObjectId::Schema((database_spec, _)) => match database_spec {
1246 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1247 ResolvedDatabaseSpecifier::Id(database_id) => {
1248 vec![(
1249 SystemObjectId::Object(database_id.into()),
1250 AclMode::CREATE,
1251 role_id,
1252 )]
1253 }
1254 },
1255 ObjectId::Item(item_id) => {
1256 let item = catalog.get_item(item_id);
1257 vec![(
1258 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1259 AclMode::CREATE,
1260 role_id,
1261 )]
1262 }
1263 ObjectId::Cluster(_)
1264 | ObjectId::Database(_)
1265 | ObjectId::Role(_)
1266 | ObjectId::NetworkPolicy(_) => Vec::new(),
1267 };
1268 RbacRequirements {
1269 role_membership: BTreeSet::from([*new_owner]),
1270 ownership: vec![id.clone()],
1271 privileges,
1272 ..Default::default()
1273 }
1274 }
1275 Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1276 ownership: vec![ObjectId::Item(*relation_id)],
1277 item_usage: &CREATE_ITEM_USAGE,
1278 ..Default::default()
1279 },
1280 Plan::AlterMaterializedViewApplyReplacement(
1281 plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1282 ) => RbacRequirements {
1283 ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1284 item_usage: &CREATE_ITEM_USAGE,
1285 ..Default::default()
1286 },
1287 Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1288 ownership: vec![ObjectId::NetworkPolicy(*id)],
1289 item_usage: &CREATE_ITEM_USAGE,
1290 ..Default::default()
1291 },
1292 Plan::ReadThenWrite(plan::ReadThenWritePlan {
1293 id,
1294 selection,
1295 finishing: _,
1296 assignments,
1297 kind,
1298 returning,
1299 }) => {
1300 let acl_mode = match kind {
1301 MutationKind::Insert => AclMode::INSERT,
1302 MutationKind::Update => AclMode::UPDATE,
1303 MutationKind::Delete => AclMode::DELETE,
1304 };
1305 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1306 let mut privileges = vec![
1307 (
1308 SystemObjectId::Object(schema_id.clone()),
1309 AclMode::USAGE,
1310 role_id,
1311 ),
1312 (SystemObjectId::Object(id.into()), acl_mode, role_id),
1313 ];
1314 let mut seen = BTreeSet::from([(schema_id, role_id)]);
1315
1316 if assignments
1319 .values()
1320 .chain(returning.iter())
1321 .any(|assignment| assignment.contains_column())
1322 {
1323 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1324 seen.insert((id.into(), role_id));
1325 }
1326
1327 let items = selection
1334 .depends_on()
1335 .into_iter()
1336 .map(|gid| catalog.resolve_item_id(&gid));
1337 privileges.extend_from_slice(&generate_read_privileges_inner(
1338 catalog, items, role_id, &mut seen,
1339 ));
1340
1341 if let Some(privilege) = generate_cluster_usage_privileges(
1342 selection.as_const().is_some(),
1343 target_cluster_id,
1344 role_id,
1345 ) {
1346 privileges.push(privilege);
1347 }
1348 RbacRequirements {
1349 privileges,
1350 ..Default::default()
1351 }
1352 }
1353 Plan::GrantRole(plan::GrantRolePlan {
1354 role_ids: _,
1355 member_ids: _,
1356 grantor_id: _,
1357 })
1358 | Plan::RevokeRole(plan::RevokeRolePlan {
1359 role_ids: _,
1360 member_ids: _,
1361 grantor_id: _,
1362 }) => RbacRequirements {
1363 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1364 ..Default::default()
1365 },
1366 Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1367 update_privileges,
1368 grantees: _,
1369 })
1370 | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1371 update_privileges,
1372 revokees: _,
1373 }) => {
1374 let mut privileges = Vec::with_capacity(update_privileges.len());
1375 for UpdatePrivilege { target_id, .. } in update_privileges {
1376 match target_id {
1377 SystemObjectId::Object(object_id) => match object_id {
1378 ObjectId::ClusterReplica((cluster_id, _)) => {
1379 privileges.push((
1380 SystemObjectId::Object(cluster_id.into()),
1381 AclMode::USAGE,
1382 role_id,
1383 ));
1384 }
1385 ObjectId::Schema((database_spec, _)) => match database_spec {
1386 ResolvedDatabaseSpecifier::Ambient => {}
1387 ResolvedDatabaseSpecifier::Id(database_id) => {
1388 privileges.push((
1389 SystemObjectId::Object(database_id.into()),
1390 AclMode::USAGE,
1391 role_id,
1392 ));
1393 }
1394 },
1395 ObjectId::Item(item_id) => {
1396 let item = catalog.get_item(item_id);
1397 privileges.push((
1398 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1399 AclMode::USAGE,
1400 role_id,
1401 ))
1402 }
1403 ObjectId::Cluster(_)
1404 | ObjectId::Database(_)
1405 | ObjectId::Role(_)
1406 | ObjectId::NetworkPolicy(_) => {}
1407 },
1408 SystemObjectId::System => {}
1409 }
1410 }
1411 RbacRequirements {
1412 ownership: update_privileges
1413 .iter()
1414 .filter_map(|update_privilege| update_privilege.target_id.object_id())
1415 .cloned()
1416 .collect(),
1417 privileges,
1418 superuser_action: if update_privileges
1423 .iter()
1424 .any(|update_privilege| update_privilege.target_id.is_system())
1425 {
1426 Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1427 } else {
1428 None
1429 },
1430 ..Default::default()
1431 }
1432 }
1433 Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1434 privilege_objects,
1435 privilege_acl_items: _,
1436 is_grant: _,
1437 }) => RbacRequirements {
1438 role_membership: privilege_objects
1439 .iter()
1440 .map(|privilege_object| privilege_object.role_id)
1441 .collect(),
1442 privileges: privilege_objects
1443 .into_iter()
1444 .filter_map(|privilege_object| {
1445 if let (Some(database_id), Some(_)) =
1446 (privilege_object.database_id, privilege_object.schema_id)
1447 {
1448 Some((
1449 SystemObjectId::Object(database_id.into()),
1450 AclMode::USAGE,
1451 role_id,
1452 ))
1453 } else {
1454 None
1455 }
1456 })
1457 .collect(),
1458 superuser_action: if privilege_objects
1464 .iter()
1465 .any(|privilege_object| privilege_object.role_id.is_public())
1466 {
1467 Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1468 } else {
1469 None
1470 },
1471 ..Default::default()
1472 },
1473 Plan::ReassignOwned(plan::ReassignOwnedPlan {
1474 old_roles,
1475 new_role,
1476 reassign_ids: _,
1477 }) => RbacRequirements {
1478 role_membership: old_roles
1479 .into_iter()
1480 .cloned()
1481 .chain(iter::once(*new_role))
1482 .collect(),
1483 ..Default::default()
1484 },
1485 Plan::SideEffectingFunc(func) => {
1486 let role_membership = match func {
1487 SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1488 .expect("active_conns is required for Plan::SideEffectingFunc")(
1489 *connection_id
1490 )
1491 .map(|x| [x].into())
1492 .unwrap_or_default(),
1493 };
1494 RbacRequirements {
1495 role_membership,
1496 ..Default::default()
1497 }
1498 }
1499 Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1500 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1501 RbacRequirements {
1502 privileges: vec![
1503 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1504 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1505 ],
1506 ..Default::default()
1507 }
1508 }
1509 Plan::DiscardTemp
1510 | Plan::DiscardAll
1511 | Plan::EmptyQuery
1512 | Plan::ShowAllVariables
1513 | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1514 | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1515 | Plan::SetVariable(plan::SetVariablePlan {
1516 name: _,
1517 value: _,
1518 local: _,
1519 })
1520 | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1521 | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1522 | Plan::StartTransaction(plan::StartTransactionPlan {
1523 access: _,
1524 isolation_level: _,
1525 })
1526 | Plan::CommitTransaction(plan::CommitTransactionPlan {
1527 transaction_type: _,
1528 })
1529 | Plan::AbortTransaction(plan::AbortTransactionPlan {
1530 transaction_type: _,
1531 })
1532 | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1533 | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1534 | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1535 | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1536 | Plan::Declare(plan::DeclarePlan {
1537 name: _,
1538 stmt: _,
1539 sql: _,
1540 params: _,
1541 })
1542 | Plan::Fetch(plan::FetchPlan {
1543 name: _,
1544 count: _,
1545 timeout: _,
1546 })
1547 | Plan::Close(plan::ClosePlan { name: _ })
1548 | Plan::Prepare(plan::PreparePlan {
1549 name: _,
1550 stmt: _,
1551 desc: _,
1552 sql: _,
1553 })
1554 | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1555 | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1556 | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1557 }
1558}
1559
1560fn check_owner_roles(
1562 object_id: &ObjectId,
1563 role_ids: &BTreeSet<RoleId>,
1564 catalog: &impl SessionCatalog,
1565) -> bool {
1566 if let Some(owner_id) = catalog.get_owner_id(object_id) {
1567 role_ids.contains(&owner_id)
1568 } else {
1569 true
1570 }
1571}
1572
1573fn ownership_err(
1574 unheld_ownership: Vec<ObjectId>,
1575 catalog: &impl SessionCatalog,
1576) -> Result<(), UnauthorizedError> {
1577 if !unheld_ownership.is_empty() {
1578 let objects = unheld_ownership
1579 .into_iter()
1580 .map(|ownership| match ownership {
1581 ObjectId::Cluster(id) => (
1582 ObjectType::Cluster,
1583 catalog.get_cluster(id).name().to_string(),
1584 ),
1585 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1586 let cluster = catalog.get_cluster(cluster_id);
1587 let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1588 let name = QualifiedReplica {
1591 cluster: Ident::new_unchecked(cluster.name()),
1592 replica: Ident::new_unchecked(replica.name()),
1593 };
1594 (ObjectType::ClusterReplica, name.to_string())
1595 }
1596 ObjectId::Database(id) => (
1597 ObjectType::Database,
1598 catalog.get_database(&id).name().to_string(),
1599 ),
1600 ObjectId::Schema((database_spec, schema_spec)) => {
1601 let schema = catalog.get_schema(&database_spec, &schema_spec);
1602 let name = catalog.resolve_full_schema_name(schema.name());
1603 (ObjectType::Schema, name.to_string())
1604 }
1605 ObjectId::Item(id) => {
1606 let item = catalog.get_item(&id);
1607 let name = catalog.resolve_full_name(item.name());
1608 (item.item_type().into(), name.to_string())
1609 }
1610 ObjectId::NetworkPolicy(id) => (
1611 ObjectType::NetworkPolicy,
1612 catalog.get_network_policy(&id).name().to_string(),
1613 ),
1614 ObjectId::Role(_) => unreachable!("roles have no owner"),
1615 })
1616 .collect();
1617 Err(UnauthorizedError::Ownership { objects })
1618 } else {
1619 Ok(())
1620 }
1621}
1622
1623fn generate_required_source_privileges(
1624 name: &QualifiedItemName,
1625 data_source: &DataSourceDesc,
1626 in_cluster: Option<ClusterId>,
1627 role_id: RoleId,
1628) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1629 let mut privileges = vec![(
1630 SystemObjectId::Object(name.qualifiers.clone().into()),
1631 AclMode::CREATE,
1632 role_id,
1633 )];
1634 match (data_source, in_cluster) {
1635 (_, Some(id)) => {
1636 privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1637 }
1638 (DataSourceDesc::Ingestion(_), None) => {
1639 privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1640 }
1641 (_, None) => {}
1646 }
1647 privileges
1648}
1649
1650fn generate_read_privileges(
1658 catalog: &impl SessionCatalog,
1659 ids: impl Iterator<Item = CatalogItemId>,
1660 role_id: RoleId,
1661) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1662 generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1663}
1664
1665fn generate_read_privileges_inner(
1666 catalog: &impl SessionCatalog,
1667 ids: impl Iterator<Item = CatalogItemId>,
1668 role_id: RoleId,
1669 seen: &mut BTreeSet<(ObjectId, RoleId)>,
1670) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1671 let mut privileges = Vec::new();
1672 let mut views = Vec::new();
1673
1674 for id in ids {
1675 if seen.insert((id.into(), role_id)) {
1676 let item = catalog.get_item(&id);
1677 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1678 if seen.insert((schema_id.clone(), role_id)) {
1679 privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1680 }
1681 match item.item_type() {
1682 CatalogItemType::View
1683 | CatalogItemType::MaterializedView
1684 | CatalogItemType::ContinualTask => {
1685 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1686 views.push((item.references().items().copied(), item.owner_id()));
1687 }
1688 CatalogItemType::Table | CatalogItemType::Source => {
1689 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1690 }
1691 CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1692 privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1693 }
1694 CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1695 }
1696 }
1697 }
1698
1699 for (view_ids, view_owner) in views {
1700 privileges.extend_from_slice(&generate_read_privileges_inner(
1701 catalog, view_ids, view_owner, seen,
1702 ));
1703 }
1704
1705 privileges
1706}
1707
1708fn generate_usage_privileges(
1709 catalog: &impl SessionCatalog,
1710 ids: &ResolvedIds,
1711 role_id: RoleId,
1712 item_types: &BTreeSet<CatalogItemType>,
1713) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1714 ids.items()
1716 .filter_map(move |id| {
1717 let item = catalog.get_item(id);
1718 if item_types.contains(&item.item_type()) {
1719 let schema_id = item.name().qualifiers.clone().into();
1720 Some([
1721 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1722 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1723 ])
1724 } else {
1725 None
1726 }
1727 })
1728 .flatten()
1729 .collect()
1730}
1731
1732fn generate_cluster_usage_privileges(
1733 expr_is_const: bool,
1734 target_cluster_id: Option<ClusterId>,
1735 role_id: RoleId,
1736) -> Option<(SystemObjectId, AclMode, RoleId)> {
1737 if !expr_is_const {
1740 if let Some(cluster_id) = target_cluster_id {
1741 return Some((
1742 SystemObjectId::Object(cluster_id.into()),
1743 AclMode::USAGE,
1744 role_id,
1745 ));
1746 }
1747 }
1748
1749 None
1750}
1751
1752fn check_object_privileges(
1753 catalog: &impl SessionCatalog,
1754 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1755 role_membership: BTreeSet<RoleId>,
1756 current_role_id: RoleId,
1757) -> Result<(), UnauthorizedError> {
1758 let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1759 role_memberships.insert(current_role_id, role_membership);
1760 for (object_id, acl_mode, role_id) in privileges {
1761 if matches!(
1765 &object_id,
1766 SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary)))
1767 ) {
1768 continue;
1769 }
1770
1771 let role_membership = role_memberships
1772 .entry(role_id)
1773 .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1774 let object_privileges = catalog
1775 .get_privileges(&object_id)
1776 .expect("only object types with privileges will generate required privileges");
1777 let role_privileges = role_membership
1778 .iter()
1779 .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1780 .map(|mz_acl_item| mz_acl_item.acl_mode)
1781 .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1782 if !role_privileges.contains(acl_mode) {
1783 let role_name = catalog.get_role(&role_id).name().to_string();
1784 let privileges = acl_mode.to_error_string();
1785 return Err(UnauthorizedError::Privilege {
1786 object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1787 role_name,
1788 privileges,
1789 });
1790 }
1791 }
1792
1793 Ok(())
1794}
1795
1796pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1797 const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1798 .union(AclMode::SELECT)
1799 .union(AclMode::UPDATE)
1800 .union(AclMode::DELETE);
1801 const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1802 const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1803 .union(AclMode::CREATE_DB)
1804 .union(AclMode::CREATE_CLUSTER)
1805 .union(AclMode::CREATE_NETWORK_POLICY);
1806
1807 const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1808 match object_type {
1809 SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1810 SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1811 SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1812 SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1813 SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1814 SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1815 SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1816 SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1817 SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1818 SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1819 SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1820 SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1821 SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1822 SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1823 SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1824 SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1825 SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT,
1826 SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1827 }
1828}
1829
1830pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1831 MzAclItem {
1832 grantee: owner_id,
1833 grantor: owner_id,
1834 acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1835 }
1836}
1837
1838const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1839 match object_type {
1840 ObjectType::Table
1841 | ObjectType::View
1842 | ObjectType::MaterializedView
1843 | ObjectType::Source
1844 | ObjectType::ContinualTask => AclMode::SELECT,
1845 ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1846 ObjectType::Sink
1847 | ObjectType::Index
1848 | ObjectType::Role
1849 | ObjectType::Cluster
1850 | ObjectType::ClusterReplica
1851 | ObjectType::Secret
1852 | ObjectType::Connection
1853 | ObjectType::Database
1854 | ObjectType::Func
1855 | ObjectType::NetworkPolicy => AclMode::empty(),
1856 }
1857}
1858
1859pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1860 let acl_mode = default_builtin_object_acl_mode(object_type);
1861 MzAclItem {
1862 grantee: MZ_SUPPORT_ROLE_ID,
1863 grantor: MZ_SYSTEM_ROLE_ID,
1864 acl_mode,
1865 }
1866}
1867
1868pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1869 let acl_mode = default_builtin_object_acl_mode(object_type);
1870 MzAclItem {
1871 grantee: RoleId::Public,
1872 grantor: MZ_SYSTEM_ROLE_ID,
1873 acl_mode,
1874 }
1875}