1use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26 CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29 CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30 SchemaSpecifier, SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34 DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40fn rbac_check_preamble(
42 catalog: &impl SessionCatalog,
43 session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45 if catalog
50 .try_get_role(&session_meta.role_metadata().current_role)
51 .is_none()
52 {
53 return Err(UnauthorizedError::ConcurrentRoleDrop(
54 session_meta.role_metadata().current_role.clone(),
55 ));
56 };
57 if catalog
58 .try_get_role(&session_meta.role_metadata().session_role)
59 .is_none()
60 {
61 return Err(UnauthorizedError::ConcurrentRoleDrop(
62 session_meta.role_metadata().session_role.clone(),
63 ));
64 };
65 if catalog
66 .try_get_role(&session_meta.role_metadata().authenticated_role)
67 .is_none()
68 {
69 return Err(UnauthorizedError::ConcurrentRoleDrop(
70 session_meta.role_metadata().authenticated_role.clone(),
71 ));
72 };
73
74 Ok(())
75}
76
77fn filter_requirements(
79 catalog: &impl SessionCatalog,
80 session_meta: &dyn SessionMetadata,
81 rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83 let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86 && !session_meta.role_metadata().current_role.is_system()
87 && !session_meta.role_metadata().session_role.is_system();
88 let is_superuser = session_meta.is_superuser();
90 if is_rbac_disabled || is_superuser {
91 return rbac_requirements.filter_to_mandatory_requirements();
92 }
93
94 rbac_requirements
95}
96
97static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99 btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104 let mut items = DEFAULT_ITEM_USAGE.clone();
105 items.insert(CatalogItemType::Type);
106 items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113 #[error("permission denied to {action}")]
115 Superuser { action: String },
116 #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118 Ownership { objects: Vec<(ObjectType, String)> },
119 #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121 RoleMembership { role_names: Vec<String> },
122 #[error("permission denied for {object_description}")]
124 Privilege {
125 object_description: ErrorMessageObjectDescription,
126 role_name: String,
127 privileges: String,
128 },
129 #[error("permission denied to {action}")]
133 MzSystem { action: String },
134 #[error("permission denied to {action}")]
136 MzSupport { action: String },
137 #[error("role {0} was concurrently dropped")]
139 ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143 pub fn detail(&self) -> Option<String> {
144 match &self {
145 UnauthorizedError::Superuser { action } => {
146 Some(format!("You must be a superuser to {}", action))
147 }
148 UnauthorizedError::Privilege {
149 object_description,
150 role_name,
151 privileges,
152 } => Some(format!(
153 "The '{role_name}' role needs {privileges} privileges on {object_description}"
154 )),
155 UnauthorizedError::MzSystem { .. } => {
156 Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157 }
158 UnauthorizedError::MzSupport { .. } => Some(format!(
159 "The '{}' role has very limited privileges",
160 SUPPORT_USER.name
161 )),
162 UnauthorizedError::ConcurrentRoleDrop(_) => {
163 Some("Please disconnect and re-connect with a valid role.".into())
164 }
165 UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166 }
167 }
168}
169
170#[derive(Debug)]
172struct RbacRequirements {
173 role_membership: BTreeSet<RoleId>,
175 ownership: Vec<ObjectId>,
177 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180 item_usage: &'static BTreeSet<CatalogItemType>,
185 superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190 fn empty() -> RbacRequirements {
191 RbacRequirements {
192 role_membership: BTreeSet::new(),
193 ownership: Vec::new(),
194 privileges: Vec::new(),
195 item_usage: &EMPTY_ITEM_USAGE,
196 superuser_action: None,
197 }
198 }
199
200 fn validate(
201 self,
202 catalog: &impl SessionCatalog,
203 session: &dyn SessionMetadata,
204 resolved_ids: &ResolvedIds,
205 ) -> Result<(), UnauthorizedError> {
206 let role_membership =
208 catalog.collect_role_membership(&session.role_metadata().current_role);
209
210 check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212 let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215 if !unheld_membership.is_empty() {
216 let role_names = unheld_membership
217 .into_iter()
218 .map(|role_id| {
219 catalog
221 .try_get_role(role_id)
222 .map(|role| role.name().to_string())
223 .unwrap_or_else(|| role_id.to_string())
224 })
225 .collect();
226 return Err(UnauthorizedError::RoleMembership { role_names });
227 }
228
229 let unheld_ownership = self
232 .ownership
233 .into_iter()
234 .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235 .collect();
236 ownership_err(unheld_ownership, catalog)?;
237
238 check_object_privileges(
239 catalog,
240 self.privileges,
241 role_membership,
242 session.role_metadata().current_role,
243 )?;
244
245 if let Some(action) = self.superuser_action {
246 return Err(UnauthorizedError::Superuser { action });
247 }
248
249 Ok(())
250 }
251
252 fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253 let RbacRequirements {
254 role_membership,
255 ownership,
256 privileges,
257 item_usage,
258 superuser_action: _,
259 } = self;
260 let role_membership = role_membership
261 .into_iter()
262 .filter(|id| id.is_system())
263 .collect();
264 let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265 let privileges = privileges
266 .into_iter()
267 .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268 .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270 .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271 .collect();
272 let superuser_action = None;
273 RbacRequirements {
274 role_membership,
275 ownership,
276 privileges,
277 item_usage,
278 superuser_action,
279 }
280 }
281}
282
283impl Default for RbacRequirements {
284 fn default() -> Self {
285 RbacRequirements {
286 role_membership: BTreeSet::new(),
287 ownership: Vec::new(),
288 privileges: Vec::new(),
289 item_usage: &DEFAULT_ITEM_USAGE,
290 superuser_action: None,
291 }
292 }
293}
294
295pub fn check_usage(
297 catalog: &impl SessionCatalog,
298 session: &dyn SessionMetadata,
299 resolved_ids: &ResolvedIds,
300 item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302 rbac_check_preamble(catalog, session)?;
303
304 let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307 let existing_resolved_ids =
310 resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312 let required_privileges = generate_usage_privileges(
313 catalog,
314 &existing_resolved_ids,
315 session.role_metadata().current_role,
316 item_types,
317 )
318 .into_iter()
319 .collect();
320
321 let mut rbac_requirements = RbacRequirements::empty();
322 rbac_requirements.privileges = required_privileges;
323 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324 let required_privileges = rbac_requirements.privileges;
325
326 check_object_privileges(
327 catalog,
328 required_privileges,
329 role_membership,
330 session.role_metadata().current_role,
331 )?;
332
333 Ok(())
334}
335
336pub fn check_plan(
338 catalog: &impl SessionCatalog,
339 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
345 session: &dyn SessionMetadata,
346 plan: &Plan,
347 target_cluster_id: Option<ClusterId>,
348 resolved_ids: &ResolvedIds,
349) -> Result<(), UnauthorizedError> {
350 rbac_check_preamble(catalog, session)?;
351
352 let rbac_requirements = generate_rbac_requirements(
353 catalog,
354 plan,
355 active_conns,
356 target_cluster_id,
357 session.role_metadata().current_role,
358 );
359 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
360 debug!(
361 "rbac requirements {rbac_requirements:?} for plan {:?}",
362 PlanKind::from(plan)
363 );
364 rbac_requirements.validate(catalog, session, resolved_ids)
365}
366
367pub fn is_rbac_enabled_for_session(
369 system_vars: &SystemVars,
370 session: &dyn SessionMetadata,
371) -> bool {
372 let server_enabled = system_vars.enable_rbac_checks();
373 let session_enabled = session.enable_session_rbac_checks();
374
375 server_enabled || session_enabled
378}
379
380fn generate_rbac_requirements(
382 catalog: &impl SessionCatalog,
383 plan: &Plan,
384 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
385 target_cluster_id: Option<ClusterId>,
386 role_id: RoleId,
387) -> RbacRequirements {
388 match plan {
389 Plan::CreateConnection(plan::CreateConnectionPlan {
390 name,
391 if_not_exists: _,
392 connection: _,
393 validate: _,
394 }) => RbacRequirements {
395 privileges: vec![(
396 SystemObjectId::Object(name.qualifiers.clone().into()),
397 AclMode::CREATE,
398 role_id,
399 )],
400 item_usage: &CREATE_ITEM_USAGE,
401 ..Default::default()
402 },
403 Plan::CreateDatabase(plan::CreateDatabasePlan {
404 name: _,
405 if_not_exists: _,
406 }) => RbacRequirements {
407 privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
408 item_usage: &CREATE_ITEM_USAGE,
409 ..Default::default()
410 },
411 Plan::CreateSchema(plan::CreateSchemaPlan {
412 database_spec,
413 schema_name: _,
414 if_not_exists: _,
415 }) => {
416 let privileges = match database_spec {
417 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
418 ResolvedDatabaseSpecifier::Id(database_id) => {
419 vec![(
420 SystemObjectId::Object(database_id.into()),
421 AclMode::CREATE,
422 role_id,
423 )]
424 }
425 };
426 RbacRequirements {
427 privileges,
428 item_usage: &CREATE_ITEM_USAGE,
429 ..Default::default()
430 }
431 }
432 Plan::CreateRole(plan::CreateRolePlan {
433 name: _,
434 attributes,
435 }) => {
436 if attributes.superuser.unwrap_or(false) {
437 RbacRequirements {
438 superuser_action: Some("create superuser role".to_string()),
439 ..Default::default()
440 }
441 } else {
442 RbacRequirements {
443 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
444 item_usage: &CREATE_ITEM_USAGE,
445 ..Default::default()
446 }
447 }
448 }
449 Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
450 privileges: vec![(
451 SystemObjectId::System,
452 AclMode::CREATE_NETWORK_POLICY,
453 role_id,
454 )],
455 item_usage: &CREATE_ITEM_USAGE,
456 ..Default::default()
457 },
458 Plan::CreateCluster(plan::CreateClusterPlan {
459 name: _,
460 variant: _,
461 workload_class: _,
462 }) => RbacRequirements {
463 privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
464 item_usage: &CREATE_ITEM_USAGE,
465 ..Default::default()
466 },
467 Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
468 cluster_id,
469 name: _,
470 config: _,
471 }) => RbacRequirements {
472 ownership: vec![ObjectId::Cluster(*cluster_id)],
473 item_usage: &CREATE_ITEM_USAGE,
474 ..Default::default()
475 },
476 Plan::CreateSource(plan::CreateSourcePlan {
477 name,
478 source,
479 if_not_exists: _,
480 timeline: _,
481 in_cluster,
482 }) => RbacRequirements {
483 privileges: generate_required_source_privileges(
484 name,
485 &source.data_source,
486 *in_cluster,
487 role_id,
488 ),
489 item_usage: &CREATE_ITEM_USAGE,
490 ..Default::default()
491 },
492 Plan::CreateSources(plans) => RbacRequirements {
493 privileges: plans
494 .iter()
495 .flat_map(
496 |plan::CreateSourcePlanBundle {
497 item_id: _,
498 global_id: _,
499 plan:
500 plan::CreateSourcePlan {
501 name,
502 source,
503 if_not_exists: _,
504 timeline: _,
505 in_cluster,
506 },
507 resolved_ids: _,
508 available_source_references: _,
509 }| {
510 generate_required_source_privileges(
511 name,
512 &source.data_source,
513 *in_cluster,
514 role_id,
515 )
516 .into_iter()
517 },
518 )
519 .collect(),
520 item_usage: &CREATE_ITEM_USAGE,
521 ..Default::default()
522 },
523 Plan::CreateSecret(plan::CreateSecretPlan {
524 name,
525 secret: _,
526 if_not_exists: _,
527 }) => RbacRequirements {
528 privileges: vec![(
529 SystemObjectId::Object(name.qualifiers.clone().into()),
530 AclMode::CREATE,
531 role_id,
532 )],
533 item_usage: &CREATE_ITEM_USAGE,
534 ..Default::default()
535 },
536 Plan::CreateSink(plan::CreateSinkPlan {
537 name,
538 sink,
539 with_snapshot: _,
540 if_not_exists: _,
541 in_cluster,
542 }) => {
543 let mut privileges = vec![(
544 SystemObjectId::Object(name.qualifiers.clone().into()),
545 AclMode::CREATE,
546 role_id,
547 )];
548 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
549 privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
550 privileges.push((
551 SystemObjectId::Object(in_cluster.into()),
552 AclMode::CREATE,
553 role_id,
554 ));
555 RbacRequirements {
556 privileges,
557 item_usage: &CREATE_ITEM_USAGE,
558 ..Default::default()
559 }
560 }
561 Plan::CreateTable(plan::CreateTablePlan {
562 name,
563 table: _,
564 if_not_exists: _,
565 }) => RbacRequirements {
566 privileges: vec![(
567 SystemObjectId::Object(name.qualifiers.clone().into()),
568 AclMode::CREATE,
569 role_id,
570 )],
571 item_usage: &CREATE_ITEM_USAGE,
572 ..Default::default()
573 },
574 Plan::CreateView(plan::CreateViewPlan {
575 name,
576 view: _,
577 replace,
578 drop_ids: _,
579 if_not_exists: _,
580 ambiguous_columns: _,
581 }) => RbacRequirements {
582 ownership: replace
583 .map(|id| vec![ObjectId::Item(id)])
584 .unwrap_or_default(),
585 privileges: vec![(
586 SystemObjectId::Object(name.qualifiers.clone().into()),
587 AclMode::CREATE,
588 role_id,
589 )],
590 item_usage: &CREATE_ITEM_USAGE,
591 ..Default::default()
592 },
593 Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
594 name,
595 materialized_view,
596 replace,
597 drop_ids: _,
598 if_not_exists: _,
599 ambiguous_columns: _,
600 }) => RbacRequirements {
601 ownership: replace
602 .map(|id| vec![ObjectId::Item(id)])
603 .unwrap_or_default(),
604 privileges: vec![
605 (
606 SystemObjectId::Object(name.qualifiers.clone().into()),
607 AclMode::CREATE,
608 role_id,
609 ),
610 (
611 SystemObjectId::Object(materialized_view.cluster_id.into()),
612 AclMode::CREATE,
613 role_id,
614 ),
615 ],
616 item_usage: &CREATE_ITEM_USAGE,
617 ..Default::default()
618 },
619 Plan::CreateIndex(plan::CreateIndexPlan {
620 name,
621 index,
622 if_not_exists: _,
623 }) => {
624 let index_on_item = catalog.resolve_item_id(&index.on);
625 RbacRequirements {
626 ownership: vec![ObjectId::Item(index_on_item)],
627 privileges: vec![
628 (
629 SystemObjectId::Object(name.qualifiers.clone().into()),
630 AclMode::CREATE,
631 role_id,
632 ),
633 (
634 SystemObjectId::Object(index.cluster_id.into()),
635 AclMode::CREATE,
636 role_id,
637 ),
638 ],
639 item_usage: &CREATE_ITEM_USAGE,
640 ..Default::default()
641 }
642 }
643 Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
644 privileges: vec![(
645 SystemObjectId::Object(name.qualifiers.clone().into()),
646 AclMode::CREATE,
647 role_id,
648 )],
649 item_usage: &CREATE_ITEM_USAGE,
650 ..Default::default()
651 },
652 Plan::Comment(plan::CommentPlan {
653 object_id,
654 sub_component: _,
655 comment: _,
656 }) => {
657 let (ownership, privileges) = match object_id {
658 CommentObjectId::Role(_) => (
661 Vec::new(),
662 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
663 ),
664 _ => (vec![ObjectId::from(*object_id)], Vec::new()),
665 };
666 RbacRequirements {
667 ownership,
668 privileges,
669 ..Default::default()
670 }
671 }
672 Plan::DropObjects(plan::DropObjectsPlan {
673 referenced_ids,
674 drop_ids: _,
675 object_type,
676 }) => {
677 let privileges = if object_type == &ObjectType::Role {
678 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
679 } else {
680 referenced_ids
681 .iter()
682 .filter_map(|id| match id {
683 ObjectId::ClusterReplica((cluster_id, _)) => Some((
684 SystemObjectId::Object(cluster_id.into()),
685 AclMode::USAGE,
686 role_id,
687 )),
688 ObjectId::Schema((database_spec, _)) => match database_spec {
689 ResolvedDatabaseSpecifier::Ambient => None,
690 ResolvedDatabaseSpecifier::Id(database_id) => Some((
691 SystemObjectId::Object(database_id.into()),
692 AclMode::USAGE,
693 role_id,
694 )),
695 },
696 ObjectId::Item(item_id) => {
697 let item = catalog.get_item(item_id);
698 Some((
699 SystemObjectId::Object(item.name().qualifiers.clone().into()),
700 AclMode::USAGE,
701 role_id,
702 ))
703 }
704 ObjectId::Cluster(_)
705 | ObjectId::Database(_)
706 | ObjectId::Role(_)
707 | ObjectId::NetworkPolicy(_) => None,
708 })
709 .collect()
710 };
711 RbacRequirements {
712 ownership: referenced_ids.clone(),
714 privileges,
715 ..Default::default()
716 }
717 }
718 Plan::DropOwned(plan::DropOwnedPlan {
719 role_ids,
720 drop_ids: _,
721 privilege_revokes: _,
722 default_privilege_revokes: _,
723 }) => RbacRequirements {
724 role_membership: role_ids.into_iter().cloned().collect(),
725 ..Default::default()
726 },
727 Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
728 let container_id = match id {
729 ObjectId::Item(id) => Some(SystemObjectId::Object(
730 catalog.get_item(id).name().qualifiers.clone().into(),
731 )),
732 ObjectId::Schema((database_id, _schema_id)) => match database_id {
733 ResolvedDatabaseSpecifier::Ambient => None,
734 ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
735 },
736 ObjectId::Cluster(_)
737 | ObjectId::ClusterReplica(_)
738 | ObjectId::Database(_)
739 | ObjectId::Role(_)
740 | ObjectId::NetworkPolicy(_) => None,
741 };
742 let privileges = match container_id {
743 Some(id) => vec![(id, AclMode::USAGE, role_id)],
744 None => Vec::new(),
745 };
746 RbacRequirements {
747 privileges,
748 item_usage: &EMPTY_ITEM_USAGE,
749 ..Default::default()
750 }
751 }
752 Plan::ShowColumns(plan::ShowColumnsPlan {
753 id,
754 select_plan,
755 new_resolved_ids: _,
756 }) => {
757 let mut privileges = vec![(
758 SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
759 AclMode::USAGE,
760 role_id,
761 )];
762
763 for privilege in generate_rbac_requirements(
764 catalog,
765 &Plan::Select(select_plan.clone()),
766 active_conns,
767 target_cluster_id,
768 role_id,
769 )
770 .privileges
771 {
772 privileges.push(privilege);
773 }
774 RbacRequirements {
775 privileges,
776 ..Default::default()
777 }
778 }
779 Plan::Select(plan::SelectPlan {
780 source,
781 select: _,
782 when: _,
783 finishing: _,
784 copy_to: _,
785 }) => {
786 let items = source
787 .depends_on()
788 .into_iter()
789 .map(|gid| catalog.resolve_item_id(&gid));
790 let mut privileges = generate_read_privileges(catalog, items, role_id);
791 if let Some(privilege) = generate_cluster_usage_privileges(
792 source.as_const().is_some(),
793 target_cluster_id,
794 role_id,
795 ) {
796 privileges.push(privilege);
797 }
798 RbacRequirements {
799 privileges,
800 ..Default::default()
801 }
802 }
803 Plan::Subscribe(plan::SubscribePlan {
804 from,
805 with_snapshot: _,
806 when: _,
807 up_to: _,
808 copy_to: _,
809 emit_progress: _,
810 output: _,
811 }) => {
812 let items = from
813 .depends_on()
814 .into_iter()
815 .map(|gid| catalog.resolve_item_id(&gid));
816 let mut privileges = generate_read_privileges(catalog, items, role_id);
817 if let Some(cluster_id) = target_cluster_id {
818 privileges.push((
819 SystemObjectId::Object(cluster_id.into()),
820 AclMode::USAGE,
821 role_id,
822 ));
823 }
824 RbacRequirements {
825 privileges,
826 ..Default::default()
827 }
828 }
829 Plan::CopyFrom(plan::CopyFromPlan {
830 target_name: _,
831 target_id,
832 source: _,
833 columns: _,
834 source_desc: _,
835 mfp: _,
836 params: _,
837 filter: _,
838 }) => RbacRequirements {
839 privileges: vec![
840 (
841 SystemObjectId::Object(
842 catalog.get_item(target_id).name().qualifiers.clone().into(),
843 ),
844 AclMode::USAGE,
845 role_id,
846 ),
847 (
848 SystemObjectId::Object(target_id.into()),
849 AclMode::INSERT,
850 role_id,
851 ),
852 ],
853 ..Default::default()
854 },
855 Plan::CopyTo(plan::CopyToPlan {
856 select_plan,
857 desc: _,
858 to: _,
859 connection: _,
860 connection_id: _,
861 format: _,
862 max_file_size: _,
863 }) => {
864 let items = select_plan
865 .source
866 .depends_on()
867 .into_iter()
868 .map(|gid| catalog.resolve_item_id(&gid));
869 let mut privileges = generate_read_privileges(catalog, items, role_id);
870 if let Some(cluster_id) = target_cluster_id {
871 privileges.push((
872 SystemObjectId::Object(cluster_id.into()),
873 AclMode::USAGE,
874 role_id,
875 ));
876 }
877 RbacRequirements {
878 privileges,
879 ..Default::default()
880 }
881 }
882 Plan::ExplainPlan(plan::ExplainPlanPlan {
883 stage: _,
884 format: _,
885 config: _,
886 explainee,
887 })
888 | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
889 privileges: match explainee {
890 Explainee::View(id)
891 | Explainee::MaterializedView(id)
892 | Explainee::Index(id)
893 | Explainee::ReplanView(id)
894 | Explainee::ReplanMaterializedView(id)
895 | Explainee::ReplanIndex(id) => {
896 let item = catalog.get_item(id);
897 let schema_id: ObjectId = item.name().qualifiers.clone().into();
898 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
899 }
900 Explainee::Statement(stmt) => stmt
901 .depends_on()
902 .into_iter()
903 .map(|id| {
904 let item = catalog.get_item_by_global_id(&id);
905 let schema_id: ObjectId = item.name().qualifiers.clone().into();
906 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
907 })
908 .collect(),
909 },
910 item_usage: match explainee {
911 Explainee::View(..)
912 | Explainee::MaterializedView(..)
913 | Explainee::Index(..)
914 | Explainee::ReplanView(..)
915 | Explainee::ReplanMaterializedView(..)
916 | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
917 Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
918 },
919 ..Default::default()
920 },
921 Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
922 RbacRequirements {
923 privileges: {
924 let item = catalog.get_item_by_global_id(sink_from);
925 let schema_id: ObjectId = item.name().qualifiers.clone().into();
926 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
927 },
928 item_usage: &EMPTY_ITEM_USAGE,
929 ..Default::default()
930 }
931 }
932 Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
933 format: _,
934 raw_plan,
935 when: _,
936 }) => RbacRequirements {
937 privileges: raw_plan
938 .depends_on()
939 .into_iter()
940 .map(|id| {
941 let item = catalog.get_item_by_global_id(&id);
942 let schema_id: ObjectId = item.name().qualifiers.clone().into();
943 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
944 })
945 .collect(),
946 ..Default::default()
947 },
948 Plan::Insert(plan::InsertPlan {
949 id,
950 values,
951 returning,
952 }) => {
953 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
954 let mut privileges = vec![
955 (
956 SystemObjectId::Object(schema_id.clone()),
957 AclMode::USAGE,
958 role_id,
959 ),
960 (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
961 ];
962 let mut seen = BTreeSet::from([(schema_id, role_id)]);
963
964 if returning
967 .iter()
968 .any(|assignment| assignment.contains_column())
969 {
970 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
971 seen.insert((id.into(), role_id));
972 }
973
974 let items = values
975 .depends_on()
976 .into_iter()
977 .map(|gid| catalog.resolve_item_id(&gid));
978 privileges.extend_from_slice(&generate_read_privileges_inner(
979 catalog, items, role_id, &mut seen,
980 ));
981
982 if let Some(privilege) = generate_cluster_usage_privileges(
983 values.as_const().is_some(),
984 target_cluster_id,
985 role_id,
986 ) {
987 privileges.push(privilege);
988 } else if !returning.is_empty() {
989 if let Some(cluster_id) = target_cluster_id {
992 privileges.push((
993 SystemObjectId::Object(cluster_id.into()),
994 AclMode::USAGE,
995 role_id,
996 ));
997 }
998 }
999 RbacRequirements {
1000 privileges,
1001 ..Default::default()
1002 }
1003 }
1004 Plan::AlterCluster(plan::AlterClusterPlan {
1005 id,
1006 name: _,
1007 options: _,
1008 strategy: _,
1009 }) => RbacRequirements {
1010 ownership: vec![ObjectId::Cluster(*id)],
1011 item_usage: &CREATE_ITEM_USAGE,
1012 ..Default::default()
1013 },
1014 Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1015 ownership: vec![ObjectId::Item(*id)],
1016 privileges: vec![(
1017 SystemObjectId::Object(set_cluster.into()),
1018 AclMode::CREATE,
1019 role_id,
1020 )],
1021 item_usage: &CREATE_ITEM_USAGE,
1022 ..Default::default()
1023 },
1024 Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1025 id,
1026 window: _,
1027 value: _,
1028 object_type: _,
1029 }) => RbacRequirements {
1030 ownership: vec![ObjectId::Item(*id)],
1031 item_usage: &CREATE_ITEM_USAGE,
1032 ..Default::default()
1033 },
1034 Plan::AlterSourceTimestampInterval(plan::AlterSourceTimestampIntervalPlan {
1035 id,
1036 value: _,
1037 interval: _,
1038 }) => RbacRequirements {
1039 ownership: vec![ObjectId::Item(*id)],
1040 item_usage: &CREATE_ITEM_USAGE,
1041 ..Default::default()
1042 },
1043 Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1044 ownership: vec![ObjectId::Item(*id)],
1045 ..Default::default()
1046 },
1047 Plan::AlterSource(plan::AlterSourcePlan {
1048 item_id,
1049 ingestion_id: _,
1050 action: _,
1051 }) => RbacRequirements {
1052 ownership: vec![ObjectId::Item(*item_id)],
1053 item_usage: &CREATE_ITEM_USAGE,
1054 ..Default::default()
1055 },
1056 Plan::AlterSink(plan::AlterSinkPlan {
1057 item_id,
1058 global_id: _,
1059 sink,
1060 with_snapshot: _,
1061 in_cluster,
1062 }) => {
1063 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1064 let mut privileges = generate_read_privileges(catalog, items, role_id);
1065 privileges.push((
1066 SystemObjectId::Object(in_cluster.into()),
1067 AclMode::CREATE,
1068 role_id,
1069 ));
1070 RbacRequirements {
1071 ownership: vec![ObjectId::Item(*item_id)],
1072 privileges,
1073 item_usage: &CREATE_ITEM_USAGE,
1074 ..Default::default()
1075 }
1076 }
1077 Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1078 id,
1079 name: _,
1080 to_name: _,
1081 }) => RbacRequirements {
1082 ownership: vec![ObjectId::Cluster(*id)],
1083 ..Default::default()
1084 },
1085 Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1086 id_a,
1087 id_b,
1088 name_a: _,
1089 name_b: _,
1090 name_temp: _,
1091 }) => RbacRequirements {
1092 ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1093 ..Default::default()
1094 },
1095 Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1096 cluster_id,
1097 replica_id,
1098 name: _,
1099 to_name: _,
1100 }) => RbacRequirements {
1101 ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1102 ..Default::default()
1103 },
1104 Plan::AlterItemRename(plan::AlterItemRenamePlan {
1105 id,
1106 current_full_name: _,
1107 to_name: _,
1108 object_type: _,
1109 }) => RbacRequirements {
1110 ownership: vec![ObjectId::Item(*id)],
1111 ..Default::default()
1112 },
1113 Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1114 cur_schema_spec,
1115 new_schema_name: _,
1116 }) => {
1117 let privileges = match cur_schema_spec.0 {
1118 ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1119 SystemObjectId::Object(ObjectId::Database(db_id)),
1120 AclMode::CREATE,
1121 role_id,
1122 )],
1123 ResolvedDatabaseSpecifier::Ambient => vec![],
1124 };
1125
1126 RbacRequirements {
1127 ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1128 privileges,
1129 ..Default::default()
1130 }
1131 }
1132 Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1133 schema_a_spec,
1134 schema_a_name: _,
1135 schema_b_spec,
1136 schema_b_name: _,
1137 name_temp: _,
1138 }) => {
1139 let mut privileges = vec![];
1140 if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1141 privileges.push((
1142 SystemObjectId::Object(ObjectId::Database(id_a)),
1143 AclMode::CREATE,
1144 role_id,
1145 ));
1146 }
1147 if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1148 privileges.push((
1149 SystemObjectId::Object(ObjectId::Database(id_b)),
1150 AclMode::CREATE,
1151 role_id,
1152 ));
1153 }
1154
1155 RbacRequirements {
1156 ownership: vec![
1157 ObjectId::Schema(*schema_a_spec),
1158 ObjectId::Schema(*schema_b_spec),
1159 ],
1160 privileges,
1161 ..Default::default()
1162 }
1163 }
1164 Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1165 ownership: vec![ObjectId::Item(*id)],
1166 item_usage: &CREATE_ITEM_USAGE,
1167 ..Default::default()
1168 },
1169 Plan::AlterRole(plan::AlterRolePlan {
1170 id,
1171 name: _,
1172 option,
1173 }) => match option {
1174 plan::PlannedAlterRoleOption::Attributes(attributes)
1176 if attributes.superuser.is_some() =>
1177 {
1178 RbacRequirements {
1179 superuser_action: Some("alter superuser role".to_string()),
1180 ..Default::default()
1181 }
1182 }
1183 plan::PlannedAlterRoleOption::Attributes(plan::PlannedRoleAttributes {
1186 password,
1187 scram_iterations: _,
1190 nopassword: _,
1191 superuser: None,
1194 inherit: None,
1195 login: None,
1196 }) if password.is_some() && role_id == *id => RbacRequirements::default(),
1197 plan::PlannedAlterRoleOption::Attributes(attributes)
1199 if attributes.password.is_some() && role_id != *id =>
1200 {
1201 RbacRequirements {
1202 superuser_action: Some("alter password of role".to_string()),
1203 ..Default::default()
1204 }
1205 }
1206 plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1208 RbacRequirements::default()
1209 }
1210 _ => RbacRequirements {
1212 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1213 item_usage: &CREATE_ITEM_USAGE,
1214 ..Default::default()
1215 },
1216 },
1217 Plan::AlterOwner(plan::AlterOwnerPlan {
1218 id,
1219 object_type: _,
1220 new_owner,
1221 }) => {
1222 let privileges = match id {
1223 ObjectId::ClusterReplica((cluster_id, _)) => {
1224 vec![(
1225 SystemObjectId::Object(cluster_id.into()),
1226 AclMode::CREATE,
1227 role_id,
1228 )]
1229 }
1230 ObjectId::Schema((database_spec, _)) => match database_spec {
1231 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1232 ResolvedDatabaseSpecifier::Id(database_id) => {
1233 vec![(
1234 SystemObjectId::Object(database_id.into()),
1235 AclMode::CREATE,
1236 role_id,
1237 )]
1238 }
1239 },
1240 ObjectId::Item(item_id) => {
1241 let item = catalog.get_item(item_id);
1242 vec![(
1243 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1244 AclMode::CREATE,
1245 role_id,
1246 )]
1247 }
1248 ObjectId::Cluster(_)
1249 | ObjectId::Database(_)
1250 | ObjectId::Role(_)
1251 | ObjectId::NetworkPolicy(_) => Vec::new(),
1252 };
1253 RbacRequirements {
1254 role_membership: BTreeSet::from([*new_owner]),
1255 ownership: vec![id.clone()],
1256 privileges,
1257 ..Default::default()
1258 }
1259 }
1260 Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1261 ownership: vec![ObjectId::Item(*relation_id)],
1262 item_usage: &CREATE_ITEM_USAGE,
1263 ..Default::default()
1264 },
1265 Plan::AlterMaterializedViewApplyReplacement(
1266 plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1267 ) => RbacRequirements {
1268 ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1269 item_usage: &CREATE_ITEM_USAGE,
1270 ..Default::default()
1271 },
1272 Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1273 ownership: vec![ObjectId::NetworkPolicy(*id)],
1274 item_usage: &CREATE_ITEM_USAGE,
1275 ..Default::default()
1276 },
1277 Plan::ReadThenWrite(plan::ReadThenWritePlan {
1278 id,
1279 selection,
1280 finishing: _,
1281 assignments,
1282 kind,
1283 returning,
1284 }) => {
1285 let acl_mode = match kind {
1286 MutationKind::Insert => AclMode::INSERT,
1287 MutationKind::Update => AclMode::UPDATE,
1288 MutationKind::Delete => AclMode::DELETE,
1289 };
1290 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1291 let mut privileges = vec![
1292 (
1293 SystemObjectId::Object(schema_id.clone()),
1294 AclMode::USAGE,
1295 role_id,
1296 ),
1297 (SystemObjectId::Object(id.into()), acl_mode, role_id),
1298 ];
1299 let mut seen = BTreeSet::from([(schema_id, role_id)]);
1300
1301 if assignments
1304 .values()
1305 .chain(returning.iter())
1306 .any(|assignment| assignment.contains_column())
1307 {
1308 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1309 seen.insert((id.into(), role_id));
1310 }
1311
1312 let items = selection
1319 .depends_on()
1320 .into_iter()
1321 .map(|gid| catalog.resolve_item_id(&gid));
1322 privileges.extend_from_slice(&generate_read_privileges_inner(
1323 catalog, items, role_id, &mut seen,
1324 ));
1325
1326 if let Some(privilege) = generate_cluster_usage_privileges(
1327 selection.as_const().is_some(),
1328 target_cluster_id,
1329 role_id,
1330 ) {
1331 privileges.push(privilege);
1332 }
1333 RbacRequirements {
1334 privileges,
1335 ..Default::default()
1336 }
1337 }
1338 Plan::GrantRole(plan::GrantRolePlan {
1339 role_ids: _,
1340 member_ids: _,
1341 grantor_id: _,
1342 })
1343 | Plan::RevokeRole(plan::RevokeRolePlan {
1344 role_ids: _,
1345 member_ids: _,
1346 grantor_id: _,
1347 }) => RbacRequirements {
1348 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1349 ..Default::default()
1350 },
1351 Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1352 update_privileges,
1353 grantees: _,
1354 })
1355 | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1356 update_privileges,
1357 revokees: _,
1358 }) => {
1359 let mut privileges = Vec::with_capacity(update_privileges.len());
1360 for UpdatePrivilege { target_id, .. } in update_privileges {
1361 match target_id {
1362 SystemObjectId::Object(object_id) => match object_id {
1363 ObjectId::ClusterReplica((cluster_id, _)) => {
1364 privileges.push((
1365 SystemObjectId::Object(cluster_id.into()),
1366 AclMode::USAGE,
1367 role_id,
1368 ));
1369 }
1370 ObjectId::Schema((database_spec, _)) => match database_spec {
1371 ResolvedDatabaseSpecifier::Ambient => {}
1372 ResolvedDatabaseSpecifier::Id(database_id) => {
1373 privileges.push((
1374 SystemObjectId::Object(database_id.into()),
1375 AclMode::USAGE,
1376 role_id,
1377 ));
1378 }
1379 },
1380 ObjectId::Item(item_id) => {
1381 let item = catalog.get_item(item_id);
1382 privileges.push((
1383 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1384 AclMode::USAGE,
1385 role_id,
1386 ))
1387 }
1388 ObjectId::Cluster(_)
1389 | ObjectId::Database(_)
1390 | ObjectId::Role(_)
1391 | ObjectId::NetworkPolicy(_) => {}
1392 },
1393 SystemObjectId::System => {}
1394 }
1395 }
1396 RbacRequirements {
1397 ownership: update_privileges
1398 .iter()
1399 .filter_map(|update_privilege| update_privilege.target_id.object_id())
1400 .cloned()
1401 .collect(),
1402 privileges,
1403 superuser_action: if update_privileges
1408 .iter()
1409 .any(|update_privilege| update_privilege.target_id.is_system())
1410 {
1411 Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1412 } else {
1413 None
1414 },
1415 ..Default::default()
1416 }
1417 }
1418 Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1419 privilege_objects,
1420 privilege_acl_items: _,
1421 is_grant: _,
1422 }) => RbacRequirements {
1423 role_membership: privilege_objects
1424 .iter()
1425 .map(|privilege_object| privilege_object.role_id)
1426 .collect(),
1427 privileges: privilege_objects
1428 .into_iter()
1429 .filter_map(|privilege_object| {
1430 if let (Some(database_id), Some(_)) =
1431 (privilege_object.database_id, privilege_object.schema_id)
1432 {
1433 Some((
1434 SystemObjectId::Object(database_id.into()),
1435 AclMode::USAGE,
1436 role_id,
1437 ))
1438 } else {
1439 None
1440 }
1441 })
1442 .collect(),
1443 superuser_action: if privilege_objects
1449 .iter()
1450 .any(|privilege_object| privilege_object.role_id.is_public())
1451 {
1452 Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1453 } else {
1454 None
1455 },
1456 ..Default::default()
1457 },
1458 Plan::ReassignOwned(plan::ReassignOwnedPlan {
1459 old_roles,
1460 new_role,
1461 reassign_ids: _,
1462 }) => RbacRequirements {
1463 role_membership: old_roles
1464 .into_iter()
1465 .cloned()
1466 .chain(iter::once(*new_role))
1467 .collect(),
1468 ..Default::default()
1469 },
1470 Plan::SideEffectingFunc(func) => {
1471 let role_membership = match func {
1472 SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1473 .expect("active_conns is required for Plan::SideEffectingFunc")(
1474 *connection_id
1475 )
1476 .map(|x| [x].into())
1477 .unwrap_or_default(),
1478 };
1479 RbacRequirements {
1480 role_membership,
1481 ..Default::default()
1482 }
1483 }
1484 Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1485 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1486 RbacRequirements {
1487 privileges: vec![
1488 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1489 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1490 ],
1491 ..Default::default()
1492 }
1493 }
1494 Plan::DiscardTemp
1495 | Plan::DiscardAll
1496 | Plan::EmptyQuery
1497 | Plan::ShowAllVariables
1498 | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1499 | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1500 | Plan::SetVariable(plan::SetVariablePlan {
1501 name: _,
1502 value: _,
1503 local: _,
1504 })
1505 | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1506 | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1507 | Plan::StartTransaction(plan::StartTransactionPlan {
1508 access: _,
1509 isolation_level: _,
1510 })
1511 | Plan::CommitTransaction(plan::CommitTransactionPlan {
1512 transaction_type: _,
1513 })
1514 | Plan::AbortTransaction(plan::AbortTransactionPlan {
1515 transaction_type: _,
1516 })
1517 | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1518 | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1519 | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1520 | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1521 | Plan::Declare(plan::DeclarePlan {
1522 name: _,
1523 stmt: _,
1524 sql: _,
1525 params: _,
1526 })
1527 | Plan::Fetch(plan::FetchPlan {
1528 name: _,
1529 count: _,
1530 timeout: _,
1531 })
1532 | Plan::Close(plan::ClosePlan { name: _ })
1533 | Plan::Prepare(plan::PreparePlan {
1534 name: _,
1535 stmt: _,
1536 desc: _,
1537 sql: _,
1538 })
1539 | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1540 | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1541 | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1542 }
1543}
1544
1545fn check_owner_roles(
1547 object_id: &ObjectId,
1548 role_ids: &BTreeSet<RoleId>,
1549 catalog: &impl SessionCatalog,
1550) -> bool {
1551 if let Some(owner_id) = catalog.get_owner_id(object_id) {
1552 role_ids.contains(&owner_id)
1553 } else {
1554 true
1555 }
1556}
1557
1558fn ownership_err(
1559 unheld_ownership: Vec<ObjectId>,
1560 catalog: &impl SessionCatalog,
1561) -> Result<(), UnauthorizedError> {
1562 if !unheld_ownership.is_empty() {
1563 let objects = unheld_ownership
1564 .into_iter()
1565 .map(|ownership| match ownership {
1566 ObjectId::Cluster(id) => (
1567 ObjectType::Cluster,
1568 catalog.get_cluster(id).name().to_string(),
1569 ),
1570 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1571 let cluster = catalog.get_cluster(cluster_id);
1572 let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1573 let name = QualifiedReplica {
1576 cluster: Ident::new_unchecked(cluster.name()),
1577 replica: Ident::new_unchecked(replica.name()),
1578 };
1579 (ObjectType::ClusterReplica, name.to_string())
1580 }
1581 ObjectId::Database(id) => (
1582 ObjectType::Database,
1583 catalog.get_database(&id).name().to_string(),
1584 ),
1585 ObjectId::Schema((database_spec, schema_spec)) => {
1586 let schema = catalog.get_schema(&database_spec, &schema_spec);
1587 let name = catalog.resolve_full_schema_name(schema.name());
1588 (ObjectType::Schema, name.to_string())
1589 }
1590 ObjectId::Item(id) => {
1591 let item = catalog.get_item(&id);
1592 let name = catalog.resolve_full_name(item.name());
1593 (item.item_type().into(), name.to_string())
1594 }
1595 ObjectId::NetworkPolicy(id) => (
1596 ObjectType::NetworkPolicy,
1597 catalog.get_network_policy(&id).name().to_string(),
1598 ),
1599 ObjectId::Role(_) => unreachable!("roles have no owner"),
1600 })
1601 .collect();
1602 Err(UnauthorizedError::Ownership { objects })
1603 } else {
1604 Ok(())
1605 }
1606}
1607
1608fn generate_required_source_privileges(
1609 name: &QualifiedItemName,
1610 data_source: &DataSourceDesc,
1611 in_cluster: Option<ClusterId>,
1612 role_id: RoleId,
1613) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1614 let mut privileges = vec![(
1615 SystemObjectId::Object(name.qualifiers.clone().into()),
1616 AclMode::CREATE,
1617 role_id,
1618 )];
1619 match (data_source, in_cluster) {
1620 (_, Some(id)) => {
1621 privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1622 }
1623 (DataSourceDesc::Ingestion(_), None) => {
1624 privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1625 }
1626 (_, None) => {}
1631 }
1632 privileges
1633}
1634
1635fn generate_read_privileges(
1643 catalog: &impl SessionCatalog,
1644 ids: impl Iterator<Item = CatalogItemId>,
1645 role_id: RoleId,
1646) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1647 generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1648}
1649
1650fn generate_read_privileges_inner(
1651 catalog: &impl SessionCatalog,
1652 ids: impl Iterator<Item = CatalogItemId>,
1653 role_id: RoleId,
1654 seen: &mut BTreeSet<(ObjectId, RoleId)>,
1655) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1656 let mut privileges = Vec::new();
1657 let mut views = Vec::new();
1658
1659 for id in ids {
1660 if seen.insert((id.into(), role_id)) {
1661 let item = catalog.get_item(&id);
1662 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1663 if seen.insert((schema_id.clone(), role_id)) {
1664 privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1665 }
1666 match item.item_type() {
1667 CatalogItemType::View | CatalogItemType::MaterializedView => {
1668 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1669 views.push((item.references().items().copied(), item.owner_id()));
1670 }
1671 CatalogItemType::Table | CatalogItemType::Source => {
1672 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1673 }
1674 CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1675 privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1676 }
1677 CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1678 }
1679 }
1680 }
1681
1682 for (view_ids, view_owner) in views {
1683 privileges.extend_from_slice(&generate_read_privileges_inner(
1684 catalog, view_ids, view_owner, seen,
1685 ));
1686 }
1687
1688 privileges
1689}
1690
1691fn generate_usage_privileges(
1692 catalog: &impl SessionCatalog,
1693 ids: &ResolvedIds,
1694 role_id: RoleId,
1695 item_types: &BTreeSet<CatalogItemType>,
1696) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1697 ids.items()
1699 .filter_map(move |id| {
1700 let item = catalog.get_item(id);
1701 if item_types.contains(&item.item_type()) {
1702 let schema_id = item.name().qualifiers.clone().into();
1703 Some([
1704 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1705 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1706 ])
1707 } else {
1708 None
1709 }
1710 })
1711 .flatten()
1712 .collect()
1713}
1714
1715fn generate_cluster_usage_privileges(
1716 expr_is_const: bool,
1717 target_cluster_id: Option<ClusterId>,
1718 role_id: RoleId,
1719) -> Option<(SystemObjectId, AclMode, RoleId)> {
1720 if !expr_is_const {
1723 if let Some(cluster_id) = target_cluster_id {
1724 return Some((
1725 SystemObjectId::Object(cluster_id.into()),
1726 AclMode::USAGE,
1727 role_id,
1728 ));
1729 }
1730 }
1731
1732 None
1733}
1734
1735fn check_object_privileges(
1736 catalog: &impl SessionCatalog,
1737 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1738 role_membership: BTreeSet<RoleId>,
1739 current_role_id: RoleId,
1740) -> Result<(), UnauthorizedError> {
1741 let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1742 role_memberships.insert(current_role_id, role_membership);
1743 for (object_id, acl_mode, role_id) in privileges {
1744 if matches!(
1748 &object_id,
1749 SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary)))
1750 ) {
1751 continue;
1752 }
1753
1754 let role_membership = role_memberships
1755 .entry(role_id)
1756 .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1757 let object_privileges = catalog
1758 .get_privileges(&object_id)
1759 .expect("only object types with privileges will generate required privileges");
1760 let role_privileges = role_membership
1761 .iter()
1762 .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1763 .map(|mz_acl_item| mz_acl_item.acl_mode)
1764 .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1765 if !role_privileges.contains(acl_mode) {
1766 let role_name = catalog.get_role(&role_id).name().to_string();
1767 let privileges = acl_mode.to_error_string();
1768 return Err(UnauthorizedError::Privilege {
1769 object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1770 role_name,
1771 privileges,
1772 });
1773 }
1774 }
1775
1776 Ok(())
1777}
1778
1779pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1780 const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1781 .union(AclMode::SELECT)
1782 .union(AclMode::UPDATE)
1783 .union(AclMode::DELETE);
1784 const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1785 const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1786 .union(AclMode::CREATE_DB)
1787 .union(AclMode::CREATE_CLUSTER)
1788 .union(AclMode::CREATE_NETWORK_POLICY);
1789
1790 const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1791 match object_type {
1792 SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1793 SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1794 SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1795 SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1796 SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1797 SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1798 SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1799 SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1800 SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1801 SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1802 SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1803 SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1804 SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1805 SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1806 SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1807 SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1808 SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1809 }
1810}
1811
1812pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1813 MzAclItem {
1814 grantee: owner_id,
1815 grantor: owner_id,
1816 acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1817 }
1818}
1819
1820const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1821 match object_type {
1822 ObjectType::Table
1823 | ObjectType::View
1824 | ObjectType::MaterializedView
1825 | ObjectType::Source => AclMode::SELECT,
1826 ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1827 ObjectType::Sink
1828 | ObjectType::Index
1829 | ObjectType::Role
1830 | ObjectType::Cluster
1831 | ObjectType::ClusterReplica
1832 | ObjectType::Secret
1833 | ObjectType::Connection
1834 | ObjectType::Database
1835 | ObjectType::Func
1836 | ObjectType::NetworkPolicy => AclMode::empty(),
1837 }
1838}
1839
1840pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1841 let acl_mode = default_builtin_object_acl_mode(object_type);
1842 MzAclItem {
1843 grantee: MZ_SUPPORT_ROLE_ID,
1844 grantor: MZ_SYSTEM_ROLE_ID,
1845 acl_mode,
1846 }
1847}
1848
1849pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1850 let acl_mode = default_builtin_object_acl_mode(object_type);
1851 MzAclItem {
1852 grantee: RoleId::Public,
1853 grantor: MZ_SYSTEM_ROLE_ID,
1854 acl_mode,
1855 }
1856}