1use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26 CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29 CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30 SchemaSpecifier, SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34 DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40fn rbac_check_preamble(
42 catalog: &impl SessionCatalog,
43 session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45 if catalog
50 .try_get_role(&session_meta.role_metadata().current_role)
51 .is_none()
52 {
53 return Err(UnauthorizedError::ConcurrentRoleDrop(
54 session_meta.role_metadata().current_role.clone(),
55 ));
56 };
57 if catalog
58 .try_get_role(&session_meta.role_metadata().session_role)
59 .is_none()
60 {
61 return Err(UnauthorizedError::ConcurrentRoleDrop(
62 session_meta.role_metadata().session_role.clone(),
63 ));
64 };
65 if catalog
66 .try_get_role(&session_meta.role_metadata().authenticated_role)
67 .is_none()
68 {
69 return Err(UnauthorizedError::ConcurrentRoleDrop(
70 session_meta.role_metadata().authenticated_role.clone(),
71 ));
72 };
73
74 Ok(())
75}
76
77fn filter_requirements(
79 catalog: &impl SessionCatalog,
80 session_meta: &dyn SessionMetadata,
81 rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83 let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86 && !session_meta.role_metadata().current_role.is_system()
87 && !session_meta.role_metadata().session_role.is_system();
88 let is_superuser = session_meta.is_superuser();
90 if is_rbac_disabled || is_superuser {
91 return rbac_requirements.filter_to_mandatory_requirements();
92 }
93
94 rbac_requirements
95}
96
97static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99 btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104 let mut items = DEFAULT_ITEM_USAGE.clone();
105 items.insert(CatalogItemType::Type);
106 items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110static RESTRICT_TO_USER_OBJECTS_ALLOWED_OIDS: LazyLock<BTreeSet<u32>> = LazyLock::new(|| {
116 use mz_pgrepr::oid;
117 btreeset! {
118 oid::VIEW_MZ_MCP_DATA_PRODUCTS_OID,
119 oid::VIEW_MZ_MCP_DATA_PRODUCT_DETAILS_OID,
120 }
121});
122
123#[derive(Debug, thiserror::Error)]
125pub enum UnauthorizedError {
126 #[error("permission denied to {action}")]
128 Superuser { action: String },
129 #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
131 Ownership { objects: Vec<(ObjectType, String)> },
132 #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
134 RoleMembership { role_names: Vec<String> },
135 #[error("permission denied for {object_description}")]
137 Privilege {
138 object_description: ErrorMessageObjectDescription,
139 role_name: String,
140 privileges: String,
141 },
142 #[error("permission denied to {action}")]
146 MzSystem { action: String },
147 #[error("permission denied to {action}")]
149 MzSupport { action: String },
150 #[error("role {0} was concurrently dropped")]
152 ConcurrentRoleDrop(RoleId),
153 #[error("access to system object {object_name} is restricted")]
155 RestrictedSystemObject { object_name: String },
156}
157
158impl UnauthorizedError {
159 pub fn detail(&self) -> Option<String> {
160 match &self {
161 UnauthorizedError::Superuser { action } => {
162 Some(format!("You must be a superuser to {}", action))
163 }
164 UnauthorizedError::Privilege {
165 object_description,
166 role_name,
167 privileges,
168 } => Some(format!(
169 "The '{role_name}' role needs {privileges} privileges on {object_description}"
170 )),
171 UnauthorizedError::MzSystem { .. } => {
172 Some(format!("You must be the '{}' role", SYSTEM_USER.name))
173 }
174 UnauthorizedError::MzSupport { .. } => Some(format!(
175 "The '{}' role has very limited privileges",
176 SUPPORT_USER.name
177 )),
178 UnauthorizedError::ConcurrentRoleDrop(_) => {
179 Some("Please disconnect and re-connect with a valid role.".into())
180 }
181 UnauthorizedError::RestrictedSystemObject { .. } => Some(
182 "Access to system catalog objects is restricted for this role. \
183 Contact your administrator if you need access."
184 .into(),
185 ),
186 UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
187 }
188 }
189}
190
191#[derive(Debug)]
193struct RbacRequirements {
194 role_membership: BTreeSet<RoleId>,
196 ownership: Vec<ObjectId>,
198 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
201 item_usage: &'static BTreeSet<CatalogItemType>,
206 superuser_action: Option<String>,
208}
209
210impl RbacRequirements {
211 fn empty() -> RbacRequirements {
212 RbacRequirements {
213 role_membership: BTreeSet::new(),
214 ownership: Vec::new(),
215 privileges: Vec::new(),
216 item_usage: &EMPTY_ITEM_USAGE,
217 superuser_action: None,
218 }
219 }
220
221 fn validate(
222 self,
223 catalog: &impl SessionCatalog,
224 session: &dyn SessionMetadata,
225 resolved_ids: &ResolvedIds,
226 ) -> Result<(), UnauthorizedError> {
227 let role_membership =
229 catalog.collect_role_membership(&session.role_metadata().current_role);
230
231 check_usage(catalog, session, resolved_ids, self.item_usage)?;
232
233 let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
236 if !unheld_membership.is_empty() {
237 let role_names = unheld_membership
238 .into_iter()
239 .map(|role_id| {
240 catalog
242 .try_get_role(role_id)
243 .map(|role| role.name().to_string())
244 .unwrap_or_else(|| role_id.to_string())
245 })
246 .collect();
247 return Err(UnauthorizedError::RoleMembership { role_names });
248 }
249
250 let unheld_ownership = self
253 .ownership
254 .into_iter()
255 .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
256 .collect();
257 ownership_err(unheld_ownership, catalog)?;
258
259 check_object_privileges(
260 catalog,
261 self.privileges,
262 role_membership,
263 session.role_metadata().current_role,
264 )?;
265
266 if let Some(action) = self.superuser_action {
267 return Err(UnauthorizedError::Superuser { action });
268 }
269
270 Ok(())
271 }
272
273 fn filter_to_mandatory_requirements(self) -> RbacRequirements {
274 let RbacRequirements {
275 role_membership,
276 ownership,
277 privileges,
278 item_usage,
279 superuser_action: _,
280 } = self;
281 let role_membership = role_membership
282 .into_iter()
283 .filter(|id| id.is_system())
284 .collect();
285 let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
286 let privileges = privileges
287 .into_iter()
288 .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
289 .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
291 .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
292 .collect();
293 let superuser_action = None;
294 RbacRequirements {
295 role_membership,
296 ownership,
297 privileges,
298 item_usage,
299 superuser_action,
300 }
301 }
302}
303
304impl Default for RbacRequirements {
305 fn default() -> Self {
306 RbacRequirements {
307 role_membership: BTreeSet::new(),
308 ownership: Vec::new(),
309 privileges: Vec::new(),
310 item_usage: &DEFAULT_ITEM_USAGE,
311 superuser_action: None,
312 }
313 }
314}
315
316fn check_restrict_to_user_objects(
324 catalog: &impl SessionCatalog,
325 session: &dyn SessionMetadata,
326 resolved_ids: &ResolvedIds,
327) -> Result<(), UnauthorizedError> {
328 if !session.restrict_to_user_objects() {
329 return Ok(());
330 }
331 for item_id in resolved_ids.items() {
332 if item_id.is_system() {
333 if let Some(item) = catalog.try_get_item(item_id) {
334 match item.item_type() {
335 CatalogItemType::Func | CatalogItemType::Type => {}
336 _ => {
337 if RESTRICT_TO_USER_OBJECTS_ALLOWED_OIDS.contains(&item.oid()) {
338 continue;
339 }
340 return Err(UnauthorizedError::RestrictedSystemObject {
341 object_name: item.name().item.clone(),
342 });
343 }
344 }
345 }
346 }
347 }
348 Ok(())
349}
350
351pub fn check_usage(
353 catalog: &impl SessionCatalog,
354 session: &dyn SessionMetadata,
355 resolved_ids: &ResolvedIds,
356 item_types: &BTreeSet<CatalogItemType>,
357) -> Result<(), UnauthorizedError> {
358 rbac_check_preamble(catalog, session)?;
359
360 check_restrict_to_user_objects(catalog, session, resolved_ids)?;
362
363 let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
365
366 let existing_resolved_ids =
369 resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
370
371 let required_privileges = generate_usage_privileges(
372 catalog,
373 &existing_resolved_ids,
374 session.role_metadata().current_role,
375 item_types,
376 )
377 .into_iter()
378 .collect();
379
380 let mut rbac_requirements = RbacRequirements::empty();
381 rbac_requirements.privileges = required_privileges;
382 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
383 let required_privileges = rbac_requirements.privileges;
384
385 check_object_privileges(
386 catalog,
387 required_privileges,
388 role_membership,
389 session.role_metadata().current_role,
390 )?;
391
392 Ok(())
393}
394
395pub fn check_plan(
402 catalog: &impl SessionCatalog,
403 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
409 session: &dyn SessionMetadata,
410 plan: &Plan,
411 target_cluster_id: Option<ClusterId>,
412 resolved_ids: &ResolvedIds,
413 sql_impl_resolved_ids: &ResolvedIds,
414) -> Result<(), UnauthorizedError> {
415 rbac_check_preamble(catalog, session)?;
416
417 check_restrict_to_user_objects(catalog, session, sql_impl_resolved_ids)?;
421
422 let rbac_requirements = generate_rbac_requirements(
423 catalog,
424 plan,
425 active_conns,
426 target_cluster_id,
427 session.role_metadata().current_role,
428 );
429 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
430 debug!(
431 "rbac requirements {rbac_requirements:?} for plan {:?}",
432 PlanKind::from(plan)
433 );
434 rbac_requirements.validate(catalog, session, resolved_ids)
435}
436
437pub fn is_rbac_enabled_for_session(
439 system_vars: &SystemVars,
440 session: &dyn SessionMetadata,
441) -> bool {
442 let server_enabled = system_vars.enable_rbac_checks();
443 let session_enabled = session.enable_session_rbac_checks();
444
445 server_enabled || session_enabled
448}
449
450fn generate_rbac_requirements(
452 catalog: &impl SessionCatalog,
453 plan: &Plan,
454 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
455 target_cluster_id: Option<ClusterId>,
456 role_id: RoleId,
457) -> RbacRequirements {
458 match plan {
459 Plan::CreateConnection(plan::CreateConnectionPlan {
460 name,
461 if_not_exists: _,
462 connection: _,
463 validate: _,
464 }) => RbacRequirements {
465 privileges: vec![(
466 SystemObjectId::Object(name.qualifiers.clone().into()),
467 AclMode::CREATE,
468 role_id,
469 )],
470 item_usage: &CREATE_ITEM_USAGE,
471 ..Default::default()
472 },
473 Plan::CreateDatabase(plan::CreateDatabasePlan {
474 name: _,
475 if_not_exists: _,
476 }) => RbacRequirements {
477 privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
478 item_usage: &CREATE_ITEM_USAGE,
479 ..Default::default()
480 },
481 Plan::CreateSchema(plan::CreateSchemaPlan {
482 database_spec,
483 schema_name: _,
484 if_not_exists: _,
485 }) => {
486 let privileges = match database_spec {
487 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
488 ResolvedDatabaseSpecifier::Id(database_id) => {
489 vec![(
490 SystemObjectId::Object(database_id.into()),
491 AclMode::CREATE,
492 role_id,
493 )]
494 }
495 };
496 RbacRequirements {
497 privileges,
498 item_usage: &CREATE_ITEM_USAGE,
499 ..Default::default()
500 }
501 }
502 Plan::CreateRole(plan::CreateRolePlan {
503 name: _,
504 attributes,
505 }) => {
506 if attributes.superuser.unwrap_or(false) {
507 RbacRequirements {
508 superuser_action: Some("create superuser role".to_string()),
509 ..Default::default()
510 }
511 } else {
512 RbacRequirements {
513 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
514 item_usage: &CREATE_ITEM_USAGE,
515 ..Default::default()
516 }
517 }
518 }
519 Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
520 privileges: vec![(
521 SystemObjectId::System,
522 AclMode::CREATE_NETWORK_POLICY,
523 role_id,
524 )],
525 item_usage: &CREATE_ITEM_USAGE,
526 ..Default::default()
527 },
528 Plan::CreateCluster(plan::CreateClusterPlan {
529 name: _,
530 variant: _,
531 workload_class: _,
532 }) => RbacRequirements {
533 privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
534 item_usage: &CREATE_ITEM_USAGE,
535 ..Default::default()
536 },
537 Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
538 cluster_id,
539 name: _,
540 config: _,
541 }) => RbacRequirements {
542 ownership: vec![ObjectId::Cluster(*cluster_id)],
543 item_usage: &CREATE_ITEM_USAGE,
544 ..Default::default()
545 },
546 Plan::CreateSource(plan::CreateSourcePlan {
547 name,
548 source,
549 if_not_exists: _,
550 timeline: _,
551 in_cluster,
552 }) => RbacRequirements {
553 privileges: generate_required_source_privileges(
554 name,
555 &source.data_source,
556 *in_cluster,
557 role_id,
558 ),
559 item_usage: &CREATE_ITEM_USAGE,
560 ..Default::default()
561 },
562 Plan::CreateSources(plans) => RbacRequirements {
563 privileges: plans
564 .iter()
565 .flat_map(
566 |plan::CreateSourcePlanBundle {
567 item_id: _,
568 global_id: _,
569 plan:
570 plan::CreateSourcePlan {
571 name,
572 source,
573 if_not_exists: _,
574 timeline: _,
575 in_cluster,
576 },
577 resolved_ids: _,
578 available_source_references: _,
579 }| {
580 generate_required_source_privileges(
581 name,
582 &source.data_source,
583 *in_cluster,
584 role_id,
585 )
586 .into_iter()
587 },
588 )
589 .collect(),
590 item_usage: &CREATE_ITEM_USAGE,
591 ..Default::default()
592 },
593 Plan::CreateSecret(plan::CreateSecretPlan {
594 name,
595 secret: _,
596 if_not_exists: _,
597 }) => RbacRequirements {
598 privileges: vec![(
599 SystemObjectId::Object(name.qualifiers.clone().into()),
600 AclMode::CREATE,
601 role_id,
602 )],
603 item_usage: &CREATE_ITEM_USAGE,
604 ..Default::default()
605 },
606 Plan::CreateSink(plan::CreateSinkPlan {
607 name,
608 sink,
609 with_snapshot: _,
610 if_not_exists: _,
611 in_cluster,
612 }) => {
613 let mut privileges = vec![(
614 SystemObjectId::Object(name.qualifiers.clone().into()),
615 AclMode::CREATE,
616 role_id,
617 )];
618 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
619 privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
620 privileges.push((
621 SystemObjectId::Object(in_cluster.into()),
622 AclMode::CREATE,
623 role_id,
624 ));
625 RbacRequirements {
626 privileges,
627 item_usage: &CREATE_ITEM_USAGE,
628 ..Default::default()
629 }
630 }
631 Plan::CreateTable(plan::CreateTablePlan {
632 name,
633 table: _,
634 if_not_exists: _,
635 }) => RbacRequirements {
636 privileges: vec![(
637 SystemObjectId::Object(name.qualifiers.clone().into()),
638 AclMode::CREATE,
639 role_id,
640 )],
641 item_usage: &CREATE_ITEM_USAGE,
642 ..Default::default()
643 },
644 Plan::CreateView(plan::CreateViewPlan {
645 name,
646 view: _,
647 replace,
648 drop_ids: _,
649 if_not_exists: _,
650 ambiguous_columns: _,
651 }) => RbacRequirements {
652 ownership: replace
653 .map(|id| vec![ObjectId::Item(id)])
654 .unwrap_or_default(),
655 privileges: vec![(
656 SystemObjectId::Object(name.qualifiers.clone().into()),
657 AclMode::CREATE,
658 role_id,
659 )],
660 item_usage: &CREATE_ITEM_USAGE,
661 ..Default::default()
662 },
663 Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
664 name,
665 materialized_view,
666 replace,
667 drop_ids: _,
668 if_not_exists: _,
669 ambiguous_columns: _,
670 }) => RbacRequirements {
671 ownership: replace
672 .map(|id| vec![ObjectId::Item(id)])
673 .unwrap_or_default(),
674 privileges: vec![
675 (
676 SystemObjectId::Object(name.qualifiers.clone().into()),
677 AclMode::CREATE,
678 role_id,
679 ),
680 (
681 SystemObjectId::Object(materialized_view.cluster_id.into()),
682 AclMode::CREATE,
683 role_id,
684 ),
685 ],
686 item_usage: &CREATE_ITEM_USAGE,
687 ..Default::default()
688 },
689 Plan::CreateIndex(plan::CreateIndexPlan {
690 name,
691 index,
692 if_not_exists: _,
693 }) => {
694 let index_on_item = catalog.resolve_item_id(&index.on);
695 RbacRequirements {
696 ownership: vec![ObjectId::Item(index_on_item)],
697 privileges: vec![
698 (
699 SystemObjectId::Object(name.qualifiers.clone().into()),
700 AclMode::CREATE,
701 role_id,
702 ),
703 (
704 SystemObjectId::Object(index.cluster_id.into()),
705 AclMode::CREATE,
706 role_id,
707 ),
708 ],
709 item_usage: &CREATE_ITEM_USAGE,
710 ..Default::default()
711 }
712 }
713 Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
714 privileges: vec![(
715 SystemObjectId::Object(name.qualifiers.clone().into()),
716 AclMode::CREATE,
717 role_id,
718 )],
719 item_usage: &CREATE_ITEM_USAGE,
720 ..Default::default()
721 },
722 Plan::Comment(plan::CommentPlan {
723 object_id,
724 sub_component: _,
725 comment: _,
726 }) => {
727 let (ownership, privileges) = match object_id {
728 CommentObjectId::Role(_) => (
731 Vec::new(),
732 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
733 ),
734 _ => (vec![ObjectId::from(*object_id)], Vec::new()),
735 };
736 RbacRequirements {
737 ownership,
738 privileges,
739 ..Default::default()
740 }
741 }
742 Plan::DropObjects(plan::DropObjectsPlan {
743 referenced_ids,
744 drop_ids: _,
745 object_type,
746 }) => {
747 let privileges = if object_type == &ObjectType::Role {
748 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
749 } else {
750 referenced_ids
751 .iter()
752 .filter_map(|id| match id {
753 ObjectId::ClusterReplica((cluster_id, _)) => Some((
754 SystemObjectId::Object(cluster_id.into()),
755 AclMode::USAGE,
756 role_id,
757 )),
758 ObjectId::Schema((database_spec, _)) => match database_spec {
759 ResolvedDatabaseSpecifier::Ambient => None,
760 ResolvedDatabaseSpecifier::Id(database_id) => Some((
761 SystemObjectId::Object(database_id.into()),
762 AclMode::USAGE,
763 role_id,
764 )),
765 },
766 ObjectId::Item(item_id) => {
767 let item = catalog.get_item(item_id);
768 Some((
769 SystemObjectId::Object(item.name().qualifiers.clone().into()),
770 AclMode::USAGE,
771 role_id,
772 ))
773 }
774 ObjectId::Cluster(_)
775 | ObjectId::Database(_)
776 | ObjectId::Role(_)
777 | ObjectId::NetworkPolicy(_) => None,
778 })
779 .collect()
780 };
781 RbacRequirements {
782 ownership: referenced_ids.clone(),
784 privileges,
785 ..Default::default()
786 }
787 }
788 Plan::DropOwned(plan::DropOwnedPlan {
789 role_ids,
790 drop_ids: _,
791 privilege_revokes: _,
792 default_privilege_revokes: _,
793 }) => RbacRequirements {
794 role_membership: role_ids.into_iter().cloned().collect(),
795 ..Default::default()
796 },
797 Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
798 let container_id = match id {
799 ObjectId::Item(id) => Some(SystemObjectId::Object(
800 catalog.get_item(id).name().qualifiers.clone().into(),
801 )),
802 ObjectId::Schema((database_id, _schema_id)) => match database_id {
803 ResolvedDatabaseSpecifier::Ambient => None,
804 ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
805 },
806 ObjectId::Cluster(_)
807 | ObjectId::ClusterReplica(_)
808 | ObjectId::Database(_)
809 | ObjectId::Role(_)
810 | ObjectId::NetworkPolicy(_) => None,
811 };
812 let privileges = match container_id {
813 Some(id) => vec![(id, AclMode::USAGE, role_id)],
814 None => Vec::new(),
815 };
816 RbacRequirements {
817 privileges,
818 item_usage: &EMPTY_ITEM_USAGE,
819 ..Default::default()
820 }
821 }
822 Plan::ShowColumns(plan::ShowColumnsPlan {
823 id,
824 select_plan,
825 new_resolved_ids: _,
826 }) => {
827 let mut privileges = vec![(
828 SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
829 AclMode::USAGE,
830 role_id,
831 )];
832
833 for privilege in generate_rbac_requirements(
834 catalog,
835 &Plan::Select(select_plan.clone()),
836 active_conns,
837 target_cluster_id,
838 role_id,
839 )
840 .privileges
841 {
842 privileges.push(privilege);
843 }
844 RbacRequirements {
845 privileges,
846 ..Default::default()
847 }
848 }
849 Plan::Select(plan::SelectPlan {
850 source,
851 select: _,
852 when: _,
853 finishing: _,
854 copy_to: _,
855 }) => {
856 let items = source
857 .depends_on()
858 .into_iter()
859 .map(|gid| catalog.resolve_item_id(&gid));
860 let mut privileges = generate_read_privileges(catalog, items, role_id);
861 if let Some(privilege) = generate_cluster_usage_privileges(
862 source.as_const().is_some(),
863 target_cluster_id,
864 role_id,
865 ) {
866 privileges.push(privilege);
867 }
868 RbacRequirements {
869 privileges,
870 ..Default::default()
871 }
872 }
873 Plan::Subscribe(plan::SubscribePlan {
874 from,
875 with_snapshot: _,
876 when: _,
877 up_to: _,
878 copy_to: _,
879 emit_progress: _,
880 output: _,
881 }) => {
882 let items = from
883 .depends_on()
884 .into_iter()
885 .map(|gid| catalog.resolve_item_id(&gid));
886 let mut privileges = generate_read_privileges(catalog, items, role_id);
887 if let Some(cluster_id) = target_cluster_id {
888 privileges.push((
889 SystemObjectId::Object(cluster_id.into()),
890 AclMode::USAGE,
891 role_id,
892 ));
893 }
894 RbacRequirements {
895 privileges,
896 ..Default::default()
897 }
898 }
899 Plan::CopyFrom(plan::CopyFromPlan {
900 target_name: _,
901 target_id,
902 source: _,
903 columns: _,
904 source_desc: _,
905 mfp: _,
906 params: _,
907 filter: _,
908 }) => RbacRequirements {
909 privileges: vec![
910 (
911 SystemObjectId::Object(
912 catalog.get_item(target_id).name().qualifiers.clone().into(),
913 ),
914 AclMode::USAGE,
915 role_id,
916 ),
917 (
918 SystemObjectId::Object(target_id.into()),
919 AclMode::INSERT,
920 role_id,
921 ),
922 ],
923 ..Default::default()
924 },
925 Plan::CopyTo(plan::CopyToPlan {
926 select_plan,
927 desc: _,
928 to: _,
929 connection: _,
930 connection_id: _,
931 format: _,
932 max_file_size: _,
933 }) => {
934 let items = select_plan
935 .source
936 .depends_on()
937 .into_iter()
938 .map(|gid| catalog.resolve_item_id(&gid));
939 let mut privileges = generate_read_privileges(catalog, items, role_id);
940 if let Some(cluster_id) = target_cluster_id {
941 privileges.push((
942 SystemObjectId::Object(cluster_id.into()),
943 AclMode::USAGE,
944 role_id,
945 ));
946 }
947 RbacRequirements {
948 privileges,
949 ..Default::default()
950 }
951 }
952 Plan::ExplainPlan(plan::ExplainPlanPlan {
953 stage: _,
954 format: _,
955 config: _,
956 explainee,
957 })
958 | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
959 privileges: match explainee {
960 Explainee::View(id)
961 | Explainee::MaterializedView(id)
962 | Explainee::Index(id)
963 | Explainee::ReplanView(id)
964 | Explainee::ReplanMaterializedView(id)
965 | Explainee::ReplanIndex(id) => {
966 let item = catalog.get_item(id);
967 let schema_id: ObjectId = item.name().qualifiers.clone().into();
968 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
969 }
970 Explainee::Statement(stmt) => stmt
971 .depends_on()
972 .into_iter()
973 .map(|id| {
974 let item = catalog.get_item_by_global_id(&id);
975 let schema_id: ObjectId = item.name().qualifiers.clone().into();
976 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
977 })
978 .collect(),
979 },
980 item_usage: match explainee {
981 Explainee::View(..)
982 | Explainee::MaterializedView(..)
983 | Explainee::Index(..)
984 | Explainee::ReplanView(..)
985 | Explainee::ReplanMaterializedView(..)
986 | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
987 Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
988 },
989 ..Default::default()
990 },
991 Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
992 RbacRequirements {
993 privileges: {
994 let item = catalog.get_item_by_global_id(sink_from);
995 let schema_id: ObjectId = item.name().qualifiers.clone().into();
996 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
997 },
998 item_usage: &EMPTY_ITEM_USAGE,
999 ..Default::default()
1000 }
1001 }
1002 Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
1003 format: _,
1004 raw_plan,
1005 when: _,
1006 }) => RbacRequirements {
1007 privileges: raw_plan
1008 .depends_on()
1009 .into_iter()
1010 .map(|id| {
1011 let item = catalog.get_item_by_global_id(&id);
1012 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1013 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
1014 })
1015 .collect(),
1016 ..Default::default()
1017 },
1018 Plan::Insert(plan::InsertPlan {
1019 id,
1020 values,
1021 returning,
1022 }) => {
1023 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1024 let mut privileges = vec![
1025 (
1026 SystemObjectId::Object(schema_id.clone()),
1027 AclMode::USAGE,
1028 role_id,
1029 ),
1030 (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
1031 ];
1032 let mut seen = BTreeSet::from([(schema_id, role_id)]);
1033
1034 if returning
1037 .iter()
1038 .any(|assignment| assignment.contains_column())
1039 {
1040 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1041 seen.insert((id.into(), role_id));
1042 }
1043
1044 let items = values
1045 .depends_on()
1046 .into_iter()
1047 .map(|gid| catalog.resolve_item_id(&gid));
1048 privileges.extend_from_slice(&generate_read_privileges_inner(
1049 catalog, items, role_id, &mut seen,
1050 ));
1051
1052 if let Some(privilege) = generate_cluster_usage_privileges(
1053 values.as_const().is_some(),
1054 target_cluster_id,
1055 role_id,
1056 ) {
1057 privileges.push(privilege);
1058 } else if !returning.is_empty() {
1059 if let Some(cluster_id) = target_cluster_id {
1062 privileges.push((
1063 SystemObjectId::Object(cluster_id.into()),
1064 AclMode::USAGE,
1065 role_id,
1066 ));
1067 }
1068 }
1069 RbacRequirements {
1070 privileges,
1071 ..Default::default()
1072 }
1073 }
1074 Plan::AlterCluster(plan::AlterClusterPlan {
1075 id,
1076 name: _,
1077 options: _,
1078 strategy: _,
1079 }) => RbacRequirements {
1080 ownership: vec![ObjectId::Cluster(*id)],
1081 item_usage: &CREATE_ITEM_USAGE,
1082 ..Default::default()
1083 },
1084 Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1085 ownership: vec![ObjectId::Item(*id)],
1086 privileges: vec![(
1087 SystemObjectId::Object(set_cluster.into()),
1088 AclMode::CREATE,
1089 role_id,
1090 )],
1091 item_usage: &CREATE_ITEM_USAGE,
1092 ..Default::default()
1093 },
1094 Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1095 id,
1096 window: _,
1097 value: _,
1098 object_type: _,
1099 }) => RbacRequirements {
1100 ownership: vec![ObjectId::Item(*id)],
1101 item_usage: &CREATE_ITEM_USAGE,
1102 ..Default::default()
1103 },
1104 Plan::AlterSourceTimestampInterval(plan::AlterSourceTimestampIntervalPlan {
1105 id,
1106 value: _,
1107 interval: _,
1108 }) => RbacRequirements {
1109 ownership: vec![ObjectId::Item(*id)],
1110 item_usage: &CREATE_ITEM_USAGE,
1111 ..Default::default()
1112 },
1113 Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1114 ownership: vec![ObjectId::Item(*id)],
1115 ..Default::default()
1116 },
1117 Plan::AlterSource(plan::AlterSourcePlan {
1118 item_id,
1119 ingestion_id: _,
1120 action: _,
1121 }) => RbacRequirements {
1122 ownership: vec![ObjectId::Item(*item_id)],
1123 item_usage: &CREATE_ITEM_USAGE,
1124 ..Default::default()
1125 },
1126 Plan::AlterSink(plan::AlterSinkPlan {
1127 item_id,
1128 global_id: _,
1129 sink,
1130 with_snapshot: _,
1131 in_cluster,
1132 }) => {
1133 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1134 let mut privileges = generate_read_privileges(catalog, items, role_id);
1135 privileges.push((
1136 SystemObjectId::Object(in_cluster.into()),
1137 AclMode::CREATE,
1138 role_id,
1139 ));
1140 RbacRequirements {
1141 ownership: vec![ObjectId::Item(*item_id)],
1142 privileges,
1143 item_usage: &CREATE_ITEM_USAGE,
1144 ..Default::default()
1145 }
1146 }
1147 Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1148 id,
1149 name: _,
1150 to_name: _,
1151 }) => RbacRequirements {
1152 ownership: vec![ObjectId::Cluster(*id)],
1153 ..Default::default()
1154 },
1155 Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1156 id_a,
1157 id_b,
1158 name_a: _,
1159 name_b: _,
1160 name_temp: _,
1161 }) => RbacRequirements {
1162 ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1163 ..Default::default()
1164 },
1165 Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1166 cluster_id,
1167 replica_id,
1168 name: _,
1169 to_name: _,
1170 }) => RbacRequirements {
1171 ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1172 ..Default::default()
1173 },
1174 Plan::AlterItemRename(plan::AlterItemRenamePlan {
1175 id,
1176 current_full_name: _,
1177 to_name: _,
1178 object_type: _,
1179 }) => RbacRequirements {
1180 ownership: vec![ObjectId::Item(*id)],
1181 ..Default::default()
1182 },
1183 Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1184 cur_schema_spec,
1185 new_schema_name: _,
1186 }) => {
1187 let privileges = match cur_schema_spec.0 {
1188 ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1189 SystemObjectId::Object(ObjectId::Database(db_id)),
1190 AclMode::CREATE,
1191 role_id,
1192 )],
1193 ResolvedDatabaseSpecifier::Ambient => vec![],
1194 };
1195
1196 RbacRequirements {
1197 ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1198 privileges,
1199 ..Default::default()
1200 }
1201 }
1202 Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1203 schema_a_spec,
1204 schema_a_name: _,
1205 schema_b_spec,
1206 schema_b_name: _,
1207 name_temp: _,
1208 }) => {
1209 let mut privileges = vec![];
1210 if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1211 privileges.push((
1212 SystemObjectId::Object(ObjectId::Database(id_a)),
1213 AclMode::CREATE,
1214 role_id,
1215 ));
1216 }
1217 if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1218 privileges.push((
1219 SystemObjectId::Object(ObjectId::Database(id_b)),
1220 AclMode::CREATE,
1221 role_id,
1222 ));
1223 }
1224
1225 RbacRequirements {
1226 ownership: vec![
1227 ObjectId::Schema(*schema_a_spec),
1228 ObjectId::Schema(*schema_b_spec),
1229 ],
1230 privileges,
1231 ..Default::default()
1232 }
1233 }
1234 Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1235 ownership: vec![ObjectId::Item(*id)],
1236 item_usage: &CREATE_ITEM_USAGE,
1237 ..Default::default()
1238 },
1239 Plan::AlterRole(plan::AlterRolePlan {
1240 id,
1241 name: _,
1242 option,
1243 }) => match option {
1244 plan::PlannedAlterRoleOption::Attributes(attributes)
1246 if attributes.superuser.is_some() =>
1247 {
1248 RbacRequirements {
1249 superuser_action: Some("alter superuser role".to_string()),
1250 ..Default::default()
1251 }
1252 }
1253 plan::PlannedAlterRoleOption::Attributes(plan::PlannedRoleAttributes {
1256 password,
1257 scram_iterations: _,
1260 nopassword: _,
1261 superuser: None,
1264 inherit: None,
1265 login: None,
1266 }) if password.is_some() && role_id == *id => RbacRequirements::default(),
1267 plan::PlannedAlterRoleOption::Attributes(attributes)
1269 if attributes.password.is_some() && role_id != *id =>
1270 {
1271 RbacRequirements {
1272 superuser_action: Some("alter password of role".to_string()),
1273 ..Default::default()
1274 }
1275 }
1276 plan::PlannedAlterRoleOption::Variable(var)
1281 if var.name().eq_ignore_ascii_case("restrict_to_user_objects") =>
1282 {
1283 RbacRequirements {
1284 superuser_action: Some("set restrict_to_user_objects".to_string()),
1285 ..Default::default()
1286 }
1287 }
1288 plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1290 RbacRequirements::default()
1291 }
1292 _ => RbacRequirements {
1294 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1295 item_usage: &CREATE_ITEM_USAGE,
1296 ..Default::default()
1297 },
1298 },
1299 Plan::AlterOwner(plan::AlterOwnerPlan {
1300 id,
1301 object_type: _,
1302 new_owner,
1303 }) => {
1304 let privileges = match id {
1305 ObjectId::ClusterReplica((cluster_id, _)) => {
1306 vec![(
1307 SystemObjectId::Object(cluster_id.into()),
1308 AclMode::CREATE,
1309 role_id,
1310 )]
1311 }
1312 ObjectId::Schema((database_spec, _)) => match database_spec {
1313 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1314 ResolvedDatabaseSpecifier::Id(database_id) => {
1315 vec![(
1316 SystemObjectId::Object(database_id.into()),
1317 AclMode::CREATE,
1318 role_id,
1319 )]
1320 }
1321 },
1322 ObjectId::Item(item_id) => {
1323 let item = catalog.get_item(item_id);
1324 vec![(
1325 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1326 AclMode::CREATE,
1327 role_id,
1328 )]
1329 }
1330 ObjectId::Cluster(_)
1331 | ObjectId::Database(_)
1332 | ObjectId::Role(_)
1333 | ObjectId::NetworkPolicy(_) => Vec::new(),
1334 };
1335 RbacRequirements {
1336 role_membership: BTreeSet::from([*new_owner]),
1337 ownership: vec![id.clone()],
1338 privileges,
1339 ..Default::default()
1340 }
1341 }
1342 Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1343 ownership: vec![ObjectId::Item(*relation_id)],
1344 item_usage: &CREATE_ITEM_USAGE,
1345 ..Default::default()
1346 },
1347 Plan::AlterMaterializedViewApplyReplacement(
1348 plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1349 ) => RbacRequirements {
1350 ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1351 item_usage: &CREATE_ITEM_USAGE,
1352 ..Default::default()
1353 },
1354 Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1355 ownership: vec![ObjectId::NetworkPolicy(*id)],
1356 item_usage: &CREATE_ITEM_USAGE,
1357 ..Default::default()
1358 },
1359 Plan::ReadThenWrite(plan::ReadThenWritePlan {
1360 id,
1361 selection,
1362 finishing: _,
1363 assignments,
1364 kind,
1365 returning,
1366 }) => {
1367 let acl_mode = match kind {
1368 MutationKind::Insert => AclMode::INSERT,
1369 MutationKind::Update => AclMode::UPDATE,
1370 MutationKind::Delete => AclMode::DELETE,
1371 };
1372 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1373 let mut privileges = vec![
1374 (
1375 SystemObjectId::Object(schema_id.clone()),
1376 AclMode::USAGE,
1377 role_id,
1378 ),
1379 (SystemObjectId::Object(id.into()), acl_mode, role_id),
1380 ];
1381 let mut seen = BTreeSet::from([(schema_id, role_id)]);
1382
1383 if assignments
1386 .values()
1387 .chain(returning.iter())
1388 .any(|assignment| assignment.contains_column())
1389 {
1390 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1391 seen.insert((id.into(), role_id));
1392 }
1393
1394 let items = selection
1401 .depends_on()
1402 .into_iter()
1403 .map(|gid| catalog.resolve_item_id(&gid));
1404 privileges.extend_from_slice(&generate_read_privileges_inner(
1405 catalog, items, role_id, &mut seen,
1406 ));
1407
1408 if let Some(privilege) = generate_cluster_usage_privileges(
1409 selection.as_const().is_some(),
1410 target_cluster_id,
1411 role_id,
1412 ) {
1413 privileges.push(privilege);
1414 }
1415 RbacRequirements {
1416 privileges,
1417 ..Default::default()
1418 }
1419 }
1420 Plan::GrantRole(plan::GrantRolePlan {
1421 role_ids: _,
1422 member_ids: _,
1423 grantor_id: _,
1424 })
1425 | Plan::RevokeRole(plan::RevokeRolePlan {
1426 role_ids: _,
1427 member_ids: _,
1428 grantor_id: _,
1429 }) => RbacRequirements {
1430 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1431 ..Default::default()
1432 },
1433 Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1434 update_privileges,
1435 grantees: _,
1436 })
1437 | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1438 update_privileges,
1439 revokees: _,
1440 }) => {
1441 let mut privileges = Vec::with_capacity(update_privileges.len());
1442 for UpdatePrivilege { target_id, .. } in update_privileges {
1443 match target_id {
1444 SystemObjectId::Object(object_id) => match object_id {
1445 ObjectId::ClusterReplica((cluster_id, _)) => {
1446 privileges.push((
1447 SystemObjectId::Object(cluster_id.into()),
1448 AclMode::USAGE,
1449 role_id,
1450 ));
1451 }
1452 ObjectId::Schema((database_spec, _)) => match database_spec {
1453 ResolvedDatabaseSpecifier::Ambient => {}
1454 ResolvedDatabaseSpecifier::Id(database_id) => {
1455 privileges.push((
1456 SystemObjectId::Object(database_id.into()),
1457 AclMode::USAGE,
1458 role_id,
1459 ));
1460 }
1461 },
1462 ObjectId::Item(item_id) => {
1463 let item = catalog.get_item(item_id);
1464 privileges.push((
1465 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1466 AclMode::USAGE,
1467 role_id,
1468 ))
1469 }
1470 ObjectId::Cluster(_)
1471 | ObjectId::Database(_)
1472 | ObjectId::Role(_)
1473 | ObjectId::NetworkPolicy(_) => {}
1474 },
1475 SystemObjectId::System => {}
1476 }
1477 }
1478 RbacRequirements {
1479 ownership: update_privileges
1480 .iter()
1481 .filter_map(|update_privilege| update_privilege.target_id.object_id())
1482 .cloned()
1483 .collect(),
1484 privileges,
1485 superuser_action: if update_privileges
1490 .iter()
1491 .any(|update_privilege| update_privilege.target_id.is_system())
1492 {
1493 Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1494 } else {
1495 None
1496 },
1497 ..Default::default()
1498 }
1499 }
1500 Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1501 privilege_objects,
1502 privilege_acl_items: _,
1503 is_grant: _,
1504 }) => RbacRequirements {
1505 role_membership: privilege_objects
1506 .iter()
1507 .map(|privilege_object| privilege_object.role_id)
1508 .collect(),
1509 privileges: privilege_objects
1510 .into_iter()
1511 .filter_map(|privilege_object| {
1512 if let (Some(database_id), Some(_)) =
1513 (privilege_object.database_id, privilege_object.schema_id)
1514 {
1515 Some((
1516 SystemObjectId::Object(database_id.into()),
1517 AclMode::USAGE,
1518 role_id,
1519 ))
1520 } else {
1521 None
1522 }
1523 })
1524 .collect(),
1525 superuser_action: if privilege_objects
1531 .iter()
1532 .any(|privilege_object| privilege_object.role_id.is_public())
1533 {
1534 Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1535 } else {
1536 None
1537 },
1538 ..Default::default()
1539 },
1540 Plan::ReassignOwned(plan::ReassignOwnedPlan {
1541 old_roles,
1542 new_role,
1543 reassign_ids: _,
1544 }) => RbacRequirements {
1545 role_membership: old_roles
1546 .into_iter()
1547 .cloned()
1548 .chain(iter::once(*new_role))
1549 .collect(),
1550 ..Default::default()
1551 },
1552 Plan::SideEffectingFunc(func) => {
1553 let role_membership = match func {
1554 SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1555 .expect("active_conns is required for Plan::SideEffectingFunc")(
1556 *connection_id
1557 )
1558 .map(|x| [x].into())
1559 .unwrap_or_default(),
1560 };
1561 RbacRequirements {
1562 role_membership,
1563 ..Default::default()
1564 }
1565 }
1566 Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1567 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1568 RbacRequirements {
1569 privileges: vec![
1570 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1571 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1572 ],
1573 ..Default::default()
1574 }
1575 }
1576 Plan::DiscardTemp
1577 | Plan::DiscardAll
1578 | Plan::EmptyQuery
1579 | Plan::ShowAllVariables
1580 | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1581 | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1582 | Plan::SetVariable(plan::SetVariablePlan {
1583 name: _,
1584 value: _,
1585 local: _,
1586 })
1587 | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1588 | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1589 | Plan::StartTransaction(plan::StartTransactionPlan {
1590 access: _,
1591 isolation_level: _,
1592 })
1593 | Plan::CommitTransaction(plan::CommitTransactionPlan {
1594 transaction_type: _,
1595 })
1596 | Plan::AbortTransaction(plan::AbortTransactionPlan {
1597 transaction_type: _,
1598 })
1599 | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1600 | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1601 | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1602 | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1603 | Plan::Declare(plan::DeclarePlan {
1604 name: _,
1605 stmt: _,
1606 sql: _,
1607 params: _,
1608 })
1609 | Plan::Fetch(plan::FetchPlan {
1610 name: _,
1611 count: _,
1612 timeout: _,
1613 })
1614 | Plan::Close(plan::ClosePlan { name: _ })
1615 | Plan::Prepare(plan::PreparePlan {
1616 name: _,
1617 stmt: _,
1618 desc: _,
1619 sql: _,
1620 })
1621 | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1622 | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1623 | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1624 }
1625}
1626
1627fn check_owner_roles(
1629 object_id: &ObjectId,
1630 role_ids: &BTreeSet<RoleId>,
1631 catalog: &impl SessionCatalog,
1632) -> bool {
1633 if let Some(owner_id) = catalog.get_owner_id(object_id) {
1634 role_ids.contains(&owner_id)
1635 } else {
1636 true
1637 }
1638}
1639
1640fn ownership_err(
1641 unheld_ownership: Vec<ObjectId>,
1642 catalog: &impl SessionCatalog,
1643) -> Result<(), UnauthorizedError> {
1644 if !unheld_ownership.is_empty() {
1645 let objects = unheld_ownership
1646 .into_iter()
1647 .map(|ownership| match ownership {
1648 ObjectId::Cluster(id) => (
1649 ObjectType::Cluster,
1650 catalog.get_cluster(id).name().to_string(),
1651 ),
1652 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1653 let cluster = catalog.get_cluster(cluster_id);
1654 let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1655 let name = QualifiedReplica {
1658 cluster: Ident::new_unchecked(cluster.name()),
1659 replica: Ident::new_unchecked(replica.name()),
1660 };
1661 (ObjectType::ClusterReplica, name.to_string())
1662 }
1663 ObjectId::Database(id) => (
1664 ObjectType::Database,
1665 catalog.get_database(&id).name().to_string(),
1666 ),
1667 ObjectId::Schema((database_spec, schema_spec)) => {
1668 let schema = catalog.get_schema(&database_spec, &schema_spec);
1669 let name = catalog.resolve_full_schema_name(schema.name());
1670 (ObjectType::Schema, name.to_string())
1671 }
1672 ObjectId::Item(id) => {
1673 let item = catalog.get_item(&id);
1674 let name = catalog.resolve_full_name(item.name());
1675 (item.item_type().into(), name.to_string())
1676 }
1677 ObjectId::NetworkPolicy(id) => (
1678 ObjectType::NetworkPolicy,
1679 catalog.get_network_policy(&id).name().to_string(),
1680 ),
1681 ObjectId::Role(_) => unreachable!("roles have no owner"),
1682 })
1683 .collect();
1684 Err(UnauthorizedError::Ownership { objects })
1685 } else {
1686 Ok(())
1687 }
1688}
1689
1690fn generate_required_source_privileges(
1691 name: &QualifiedItemName,
1692 data_source: &DataSourceDesc,
1693 in_cluster: Option<ClusterId>,
1694 role_id: RoleId,
1695) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1696 let mut privileges = vec![(
1697 SystemObjectId::Object(name.qualifiers.clone().into()),
1698 AclMode::CREATE,
1699 role_id,
1700 )];
1701 match (data_source, in_cluster) {
1702 (_, Some(id)) => {
1703 privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1704 }
1705 (DataSourceDesc::Ingestion(_), None) => {
1706 privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1707 }
1708 (_, None) => {}
1713 }
1714 privileges
1715}
1716
1717fn generate_read_privileges(
1725 catalog: &impl SessionCatalog,
1726 ids: impl Iterator<Item = CatalogItemId>,
1727 role_id: RoleId,
1728) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1729 generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1730}
1731
1732fn generate_read_privileges_inner(
1733 catalog: &impl SessionCatalog,
1734 ids: impl Iterator<Item = CatalogItemId>,
1735 role_id: RoleId,
1736 seen: &mut BTreeSet<(ObjectId, RoleId)>,
1737) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1738 let mut privileges = Vec::new();
1739 let mut views = Vec::new();
1740
1741 for id in ids {
1742 if seen.insert((id.into(), role_id)) {
1743 let item = catalog.get_item(&id);
1744 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1745 if seen.insert((schema_id.clone(), role_id)) {
1746 privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1747 }
1748 match item.item_type() {
1749 CatalogItemType::View | CatalogItemType::MaterializedView => {
1750 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1751 views.push((item.references().items().copied(), item.owner_id()));
1752 }
1753 CatalogItemType::Table | CatalogItemType::Source => {
1754 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1755 }
1756 CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1757 privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1758 }
1759 CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1760 }
1761 }
1762 }
1763
1764 for (view_ids, view_owner) in views {
1765 privileges.extend_from_slice(&generate_read_privileges_inner(
1766 catalog, view_ids, view_owner, seen,
1767 ));
1768 }
1769
1770 privileges
1771}
1772
1773fn generate_usage_privileges(
1774 catalog: &impl SessionCatalog,
1775 ids: &ResolvedIds,
1776 role_id: RoleId,
1777 item_types: &BTreeSet<CatalogItemType>,
1778) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1779 ids.items()
1781 .filter_map(move |id| {
1782 let item = catalog.get_item(id);
1783 if item_types.contains(&item.item_type()) {
1784 let schema_id = item.name().qualifiers.clone().into();
1785 Some([
1786 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1787 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1788 ])
1789 } else {
1790 None
1791 }
1792 })
1793 .flatten()
1794 .collect()
1795}
1796
1797fn generate_cluster_usage_privileges(
1798 expr_is_const: bool,
1799 target_cluster_id: Option<ClusterId>,
1800 role_id: RoleId,
1801) -> Option<(SystemObjectId, AclMode, RoleId)> {
1802 if !expr_is_const {
1805 if let Some(cluster_id) = target_cluster_id {
1806 return Some((
1807 SystemObjectId::Object(cluster_id.into()),
1808 AclMode::USAGE,
1809 role_id,
1810 ));
1811 }
1812 }
1813
1814 None
1815}
1816
1817fn check_object_privileges(
1818 catalog: &impl SessionCatalog,
1819 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1820 role_membership: BTreeSet<RoleId>,
1821 current_role_id: RoleId,
1822) -> Result<(), UnauthorizedError> {
1823 let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1824 role_memberships.insert(current_role_id, role_membership);
1825 for (object_id, acl_mode, role_id) in privileges {
1826 if matches!(
1830 &object_id,
1831 SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary)))
1832 ) {
1833 continue;
1834 }
1835
1836 let role_membership = role_memberships
1837 .entry(role_id)
1838 .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1839 let object_privileges = catalog
1840 .get_privileges(&object_id)
1841 .expect("only object types with privileges will generate required privileges");
1842 let role_privileges = role_membership
1843 .iter()
1844 .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1845 .map(|mz_acl_item| mz_acl_item.acl_mode)
1846 .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1847 if !role_privileges.contains(acl_mode) {
1848 let role_name = catalog.get_role(&role_id).name().to_string();
1849 let privileges = acl_mode.to_error_string();
1850 return Err(UnauthorizedError::Privilege {
1851 object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1852 role_name,
1853 privileges,
1854 });
1855 }
1856 }
1857
1858 Ok(())
1859}
1860
1861pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1862 const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1863 .union(AclMode::SELECT)
1864 .union(AclMode::UPDATE)
1865 .union(AclMode::DELETE);
1866 const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1867 const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1868 .union(AclMode::CREATE_DB)
1869 .union(AclMode::CREATE_CLUSTER)
1870 .union(AclMode::CREATE_NETWORK_POLICY);
1871
1872 const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1873 match object_type {
1874 SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1875 SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1876 SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1877 SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1878 SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1879 SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1880 SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1881 SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1882 SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1883 SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1884 SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1885 SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1886 SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1887 SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1888 SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1889 SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1890 SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1891 }
1892}
1893
1894pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1895 MzAclItem {
1896 grantee: owner_id,
1897 grantor: owner_id,
1898 acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1899 }
1900}
1901
1902const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1903 match object_type {
1904 ObjectType::Table
1905 | ObjectType::View
1906 | ObjectType::MaterializedView
1907 | ObjectType::Source => AclMode::SELECT,
1908 ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1909 ObjectType::Sink
1910 | ObjectType::Index
1911 | ObjectType::Role
1912 | ObjectType::Cluster
1913 | ObjectType::ClusterReplica
1914 | ObjectType::Secret
1915 | ObjectType::Connection
1916 | ObjectType::Database
1917 | ObjectType::Func
1918 | ObjectType::NetworkPolicy => AclMode::empty(),
1919 }
1920}
1921
1922pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1923 let acl_mode = default_builtin_object_acl_mode(object_type);
1924 MzAclItem {
1925 grantee: MZ_SUPPORT_ROLE_ID,
1926 grantor: MZ_SYSTEM_ROLE_ID,
1927 acl_mode,
1928 }
1929}
1930
1931pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1932 let acl_mode = default_builtin_object_acl_mode(object_type);
1933 MzAclItem {
1934 grantee: RoleId::Public,
1935 grantor: MZ_SYSTEM_ROLE_ID,
1936 acl_mode,
1937 }
1938}