1use std::collections::{BTreeMap, BTreeSet};
11use std::iter;
12use std::sync::LazyLock;
13
14use itertools::Itertools;
15use maplit::btreeset;
16use mz_controller_types::ClusterId;
17use mz_expr::CollectionPlan;
18use mz_ore::str::StrExt;
19use mz_repr::CatalogItemId;
20use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
21use mz_repr::role_id::RoleId;
22use mz_sql_parser::ast::{Ident, QualifiedReplica};
23use tracing::debug;
24
25use crate::catalog::{
26 CatalogItemType, ErrorMessageObjectDescription, ObjectType, SessionCatalog, SystemObjectType,
27};
28use crate::names::{
29 CommentObjectId, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds,
30 SchemaSpecifier, SystemObjectId,
31};
32use crate::plan::{self, PlanKind};
33use crate::plan::{
34 DataSourceDesc, Explainee, MutationKind, Plan, SideEffectingFunc, UpdatePrivilege,
35};
36use crate::session::metadata::SessionMetadata;
37use crate::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
38use crate::session::vars::SystemVars;
39
40fn rbac_check_preamble(
42 catalog: &impl SessionCatalog,
43 session_meta: &dyn SessionMetadata,
44) -> Result<(), UnauthorizedError> {
45 if catalog
50 .try_get_role(&session_meta.role_metadata().current_role)
51 .is_none()
52 {
53 return Err(UnauthorizedError::ConcurrentRoleDrop(
54 session_meta.role_metadata().current_role.clone(),
55 ));
56 };
57 if catalog
58 .try_get_role(&session_meta.role_metadata().session_role)
59 .is_none()
60 {
61 return Err(UnauthorizedError::ConcurrentRoleDrop(
62 session_meta.role_metadata().session_role.clone(),
63 ));
64 };
65 if catalog
66 .try_get_role(&session_meta.role_metadata().authenticated_role)
67 .is_none()
68 {
69 return Err(UnauthorizedError::ConcurrentRoleDrop(
70 session_meta.role_metadata().authenticated_role.clone(),
71 ));
72 };
73
74 Ok(())
75}
76
77fn filter_requirements(
79 catalog: &impl SessionCatalog,
80 session_meta: &dyn SessionMetadata,
81 rbac_requirements: RbacRequirements,
82) -> RbacRequirements {
83 let is_rbac_disabled = !is_rbac_enabled_for_session(catalog.system_vars(), session_meta)
86 && !session_meta.role_metadata().current_role.is_system()
87 && !session_meta.role_metadata().session_role.is_system();
88 let is_superuser = session_meta.is_superuser();
90 if is_rbac_disabled || is_superuser {
91 return rbac_requirements.filter_to_mandatory_requirements();
92 }
93
94 rbac_requirements
95}
96
97static DEFAULT_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
99 btreeset! {CatalogItemType::Secret, CatalogItemType::Connection}
100});
101pub static CREATE_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(|| {
104 let mut items = DEFAULT_ITEM_USAGE.clone();
105 items.insert(CatalogItemType::Type);
106 items
107});
108pub static EMPTY_ITEM_USAGE: LazyLock<BTreeSet<CatalogItemType>> = LazyLock::new(BTreeSet::new);
109
110#[derive(Debug, thiserror::Error)]
112pub enum UnauthorizedError {
113 #[error("permission denied to {action}")]
115 Superuser { action: String },
116 #[error("must be owner of {}", objects.iter().map(|(object_type, object_name)| format!("{object_type} {object_name}")).join(", "))]
118 Ownership { objects: Vec<(ObjectType, String)> },
119 #[error("must be a member of {}", role_names.iter().map(|role| role.quoted()).join(", "))]
121 RoleMembership { role_names: Vec<String> },
122 #[error("permission denied for {object_description}")]
124 Privilege {
125 object_description: ErrorMessageObjectDescription,
126 role_name: String,
127 privileges: String,
128 },
129 #[error("permission denied to {action}")]
133 MzSystem { action: String },
134 #[error("permission denied to {action}")]
136 MzSupport { action: String },
137 #[error("role {0} was concurrently dropped")]
139 ConcurrentRoleDrop(RoleId),
140}
141
142impl UnauthorizedError {
143 pub fn detail(&self) -> Option<String> {
144 match &self {
145 UnauthorizedError::Superuser { action } => {
146 Some(format!("You must be a superuser to {}", action))
147 }
148 UnauthorizedError::Privilege {
149 object_description,
150 role_name,
151 privileges,
152 } => Some(format!(
153 "The '{role_name}' role needs {privileges} privileges on {object_description}"
154 )),
155 UnauthorizedError::MzSystem { .. } => {
156 Some(format!("You must be the '{}' role", SYSTEM_USER.name))
157 }
158 UnauthorizedError::MzSupport { .. } => Some(format!(
159 "The '{}' role has very limited privileges",
160 SUPPORT_USER.name
161 )),
162 UnauthorizedError::ConcurrentRoleDrop(_) => {
163 Some("Please disconnect and re-connect with a valid role.".into())
164 }
165 UnauthorizedError::Ownership { .. } | UnauthorizedError::RoleMembership { .. } => None,
166 }
167 }
168}
169
170#[derive(Debug)]
172struct RbacRequirements {
173 role_membership: BTreeSet<RoleId>,
175 ownership: Vec<ObjectId>,
177 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
180 item_usage: &'static BTreeSet<CatalogItemType>,
185 superuser_action: Option<String>,
187}
188
189impl RbacRequirements {
190 fn empty() -> RbacRequirements {
191 RbacRequirements {
192 role_membership: BTreeSet::new(),
193 ownership: Vec::new(),
194 privileges: Vec::new(),
195 item_usage: &EMPTY_ITEM_USAGE,
196 superuser_action: None,
197 }
198 }
199
200 fn validate(
201 self,
202 catalog: &impl SessionCatalog,
203 session: &dyn SessionMetadata,
204 resolved_ids: &ResolvedIds,
205 ) -> Result<(), UnauthorizedError> {
206 let role_membership =
208 catalog.collect_role_membership(&session.role_metadata().current_role);
209
210 check_usage(catalog, session, resolved_ids, self.item_usage)?;
211
212 let unheld_membership: Vec<_> = self.role_membership.difference(&role_membership).collect();
215 if !unheld_membership.is_empty() {
216 let role_names = unheld_membership
217 .into_iter()
218 .map(|role_id| {
219 catalog
221 .try_get_role(role_id)
222 .map(|role| role.name().to_string())
223 .unwrap_or_else(|| role_id.to_string())
224 })
225 .collect();
226 return Err(UnauthorizedError::RoleMembership { role_names });
227 }
228
229 let unheld_ownership = self
232 .ownership
233 .into_iter()
234 .filter(|ownership| !check_owner_roles(ownership, &role_membership, catalog))
235 .collect();
236 ownership_err(unheld_ownership, catalog)?;
237
238 check_object_privileges(
239 catalog,
240 self.privileges,
241 role_membership,
242 session.role_metadata().current_role,
243 )?;
244
245 if let Some(action) = self.superuser_action {
246 return Err(UnauthorizedError::Superuser { action });
247 }
248
249 Ok(())
250 }
251
252 fn filter_to_mandatory_requirements(self) -> RbacRequirements {
253 let RbacRequirements {
254 role_membership,
255 ownership,
256 privileges,
257 item_usage,
258 superuser_action: _,
259 } = self;
260 let role_membership = role_membership
261 .into_iter()
262 .filter(|id| id.is_system())
263 .collect();
264 let ownership = ownership.into_iter().filter(|id| id.is_system()).collect();
265 let privileges = privileges
266 .into_iter()
267 .filter(|(id, _, _)| matches!(id, SystemObjectId::Object(oid) if oid.is_system()))
268 .update(|(_, acl_mode, _)| acl_mode.remove(AclMode::SELECT))
270 .filter(|(_, acl_mode, _)| !acl_mode.is_empty())
271 .collect();
272 let superuser_action = None;
273 RbacRequirements {
274 role_membership,
275 ownership,
276 privileges,
277 item_usage,
278 superuser_action,
279 }
280 }
281}
282
283impl Default for RbacRequirements {
284 fn default() -> Self {
285 RbacRequirements {
286 role_membership: BTreeSet::new(),
287 ownership: Vec::new(),
288 privileges: Vec::new(),
289 item_usage: &DEFAULT_ITEM_USAGE,
290 superuser_action: None,
291 }
292 }
293}
294
295pub fn check_usage(
297 catalog: &impl SessionCatalog,
298 session: &dyn SessionMetadata,
299 resolved_ids: &ResolvedIds,
300 item_types: &BTreeSet<CatalogItemType>,
301) -> Result<(), UnauthorizedError> {
302 rbac_check_preamble(catalog, session)?;
303
304 let role_membership = catalog.collect_role_membership(&session.role_metadata().current_role);
306
307 let existing_resolved_ids =
310 resolved_ids.retain_items(|item_id| catalog.try_get_item(item_id).is_some());
311
312 let required_privileges = generate_usage_privileges(
313 catalog,
314 &existing_resolved_ids,
315 session.role_metadata().current_role,
316 item_types,
317 )
318 .into_iter()
319 .collect();
320
321 let mut rbac_requirements = RbacRequirements::empty();
322 rbac_requirements.privileges = required_privileges;
323 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
324 let required_privileges = rbac_requirements.privileges;
325
326 check_object_privileges(
327 catalog,
328 required_privileges,
329 role_membership,
330 session.role_metadata().current_role,
331 )?;
332
333 Ok(())
334}
335
336pub fn check_plan(
338 catalog: &impl SessionCatalog,
339 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
345 session: &dyn SessionMetadata,
346 plan: &Plan,
347 target_cluster_id: Option<ClusterId>,
348 resolved_ids: &ResolvedIds,
349) -> Result<(), UnauthorizedError> {
350 rbac_check_preamble(catalog, session)?;
351
352 let rbac_requirements = generate_rbac_requirements(
353 catalog,
354 plan,
355 active_conns,
356 target_cluster_id,
357 session.role_metadata().current_role,
358 );
359 let rbac_requirements = filter_requirements(catalog, session, rbac_requirements);
360 debug!(
361 "rbac requirements {rbac_requirements:?} for plan {:?}",
362 PlanKind::from(plan)
363 );
364 rbac_requirements.validate(catalog, session, resolved_ids)
365}
366
367pub fn is_rbac_enabled_for_session(
369 system_vars: &SystemVars,
370 session: &dyn SessionMetadata,
371) -> bool {
372 let server_enabled = system_vars.enable_rbac_checks();
373 let session_enabled = session.enable_session_rbac_checks();
374
375 server_enabled || session_enabled
378}
379
380fn generate_rbac_requirements(
382 catalog: &impl SessionCatalog,
383 plan: &Plan,
384 active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
385 target_cluster_id: Option<ClusterId>,
386 role_id: RoleId,
387) -> RbacRequirements {
388 match plan {
389 Plan::CreateConnection(plan::CreateConnectionPlan {
390 name,
391 if_not_exists: _,
392 connection: _,
393 validate: _,
394 }) => RbacRequirements {
395 privileges: vec![(
396 SystemObjectId::Object(name.qualifiers.clone().into()),
397 AclMode::CREATE,
398 role_id,
399 )],
400 item_usage: &CREATE_ITEM_USAGE,
401 ..Default::default()
402 },
403 Plan::CreateDatabase(plan::CreateDatabasePlan {
404 name: _,
405 if_not_exists: _,
406 }) => RbacRequirements {
407 privileges: vec![(SystemObjectId::System, AclMode::CREATE_DB, role_id)],
408 item_usage: &CREATE_ITEM_USAGE,
409 ..Default::default()
410 },
411 Plan::CreateSchema(plan::CreateSchemaPlan {
412 database_spec,
413 schema_name: _,
414 if_not_exists: _,
415 }) => {
416 let privileges = match database_spec {
417 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
418 ResolvedDatabaseSpecifier::Id(database_id) => {
419 vec![(
420 SystemObjectId::Object(database_id.into()),
421 AclMode::CREATE,
422 role_id,
423 )]
424 }
425 };
426 RbacRequirements {
427 privileges,
428 item_usage: &CREATE_ITEM_USAGE,
429 ..Default::default()
430 }
431 }
432 Plan::CreateRole(plan::CreateRolePlan {
433 name: _,
434 attributes,
435 }) => {
436 if attributes.superuser.unwrap_or(false) {
437 RbacRequirements {
438 superuser_action: Some("create superuser role".to_string()),
439 ..Default::default()
440 }
441 } else {
442 RbacRequirements {
443 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
444 item_usage: &CREATE_ITEM_USAGE,
445 ..Default::default()
446 }
447 }
448 }
449 Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements {
450 privileges: vec![(
451 SystemObjectId::System,
452 AclMode::CREATE_NETWORK_POLICY,
453 role_id,
454 )],
455 item_usage: &CREATE_ITEM_USAGE,
456 ..Default::default()
457 },
458 Plan::CreateCluster(plan::CreateClusterPlan {
459 name: _,
460 variant: _,
461 workload_class: _,
462 }) => RbacRequirements {
463 privileges: vec![(SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id)],
464 item_usage: &CREATE_ITEM_USAGE,
465 ..Default::default()
466 },
467 Plan::CreateClusterReplica(plan::CreateClusterReplicaPlan {
468 cluster_id,
469 name: _,
470 config: _,
471 }) => RbacRequirements {
472 ownership: vec![ObjectId::Cluster(*cluster_id)],
473 item_usage: &CREATE_ITEM_USAGE,
474 ..Default::default()
475 },
476 Plan::CreateSource(plan::CreateSourcePlan {
477 name,
478 source,
479 if_not_exists: _,
480 timeline: _,
481 in_cluster,
482 }) => RbacRequirements {
483 privileges: generate_required_source_privileges(
484 name,
485 &source.data_source,
486 *in_cluster,
487 role_id,
488 ),
489 item_usage: &CREATE_ITEM_USAGE,
490 ..Default::default()
491 },
492 Plan::CreateSources(plans) => RbacRequirements {
493 privileges: plans
494 .iter()
495 .flat_map(
496 |plan::CreateSourcePlanBundle {
497 item_id: _,
498 global_id: _,
499 plan:
500 plan::CreateSourcePlan {
501 name,
502 source,
503 if_not_exists: _,
504 timeline: _,
505 in_cluster,
506 },
507 resolved_ids: _,
508 available_source_references: _,
509 }| {
510 generate_required_source_privileges(
511 name,
512 &source.data_source,
513 *in_cluster,
514 role_id,
515 )
516 .into_iter()
517 },
518 )
519 .collect(),
520 item_usage: &CREATE_ITEM_USAGE,
521 ..Default::default()
522 },
523 Plan::CreateSecret(plan::CreateSecretPlan {
524 name,
525 secret: _,
526 if_not_exists: _,
527 }) => RbacRequirements {
528 privileges: vec![(
529 SystemObjectId::Object(name.qualifiers.clone().into()),
530 AclMode::CREATE,
531 role_id,
532 )],
533 item_usage: &CREATE_ITEM_USAGE,
534 ..Default::default()
535 },
536 Plan::CreateSink(plan::CreateSinkPlan {
537 name,
538 sink,
539 with_snapshot: _,
540 if_not_exists: _,
541 in_cluster,
542 }) => {
543 let mut privileges = vec![(
544 SystemObjectId::Object(name.qualifiers.clone().into()),
545 AclMode::CREATE,
546 role_id,
547 )];
548 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
549 privileges.extend_from_slice(&generate_read_privileges(catalog, items, role_id));
550 privileges.push((
551 SystemObjectId::Object(in_cluster.into()),
552 AclMode::CREATE,
553 role_id,
554 ));
555 RbacRequirements {
556 privileges,
557 item_usage: &CREATE_ITEM_USAGE,
558 ..Default::default()
559 }
560 }
561 Plan::CreateTable(plan::CreateTablePlan {
562 name,
563 table: _,
564 if_not_exists: _,
565 }) => RbacRequirements {
566 privileges: vec![(
567 SystemObjectId::Object(name.qualifiers.clone().into()),
568 AclMode::CREATE,
569 role_id,
570 )],
571 item_usage: &CREATE_ITEM_USAGE,
572 ..Default::default()
573 },
574 Plan::CreateView(plan::CreateViewPlan {
575 name,
576 view: _,
577 replace,
578 drop_ids: _,
579 if_not_exists: _,
580 ambiguous_columns: _,
581 }) => RbacRequirements {
582 ownership: replace
583 .map(|id| vec![ObjectId::Item(id)])
584 .unwrap_or_default(),
585 privileges: vec![(
586 SystemObjectId::Object(name.qualifiers.clone().into()),
587 AclMode::CREATE,
588 role_id,
589 )],
590 item_usage: &CREATE_ITEM_USAGE,
591 ..Default::default()
592 },
593 Plan::CreateMaterializedView(plan::CreateMaterializedViewPlan {
594 name,
595 materialized_view,
596 replace,
597 drop_ids: _,
598 if_not_exists: _,
599 ambiguous_columns: _,
600 }) => RbacRequirements {
601 ownership: replace
602 .map(|id| vec![ObjectId::Item(id)])
603 .unwrap_or_default(),
604 privileges: vec![
605 (
606 SystemObjectId::Object(name.qualifiers.clone().into()),
607 AclMode::CREATE,
608 role_id,
609 ),
610 (
611 SystemObjectId::Object(materialized_view.cluster_id.into()),
612 AclMode::CREATE,
613 role_id,
614 ),
615 ],
616 item_usage: &CREATE_ITEM_USAGE,
617 ..Default::default()
618 },
619 Plan::CreateContinualTask(plan::CreateContinualTaskPlan {
620 name,
621 placeholder_id: _,
622 desc: _,
623 input_id: _,
624 with_snapshot: _,
625 continual_task,
626 }) => RbacRequirements {
627 privileges: vec![
628 (
629 SystemObjectId::Object(name.qualifiers.clone().into()),
630 AclMode::CREATE,
631 role_id,
632 ),
633 (
634 SystemObjectId::Object(continual_task.cluster_id.into()),
635 AclMode::CREATE,
636 role_id,
637 ),
638 ],
639 item_usage: &CREATE_ITEM_USAGE,
640 ..Default::default()
641 },
642 Plan::CreateIndex(plan::CreateIndexPlan {
643 name,
644 index,
645 if_not_exists: _,
646 }) => {
647 let index_on_item = catalog.resolve_item_id(&index.on);
648 RbacRequirements {
649 ownership: vec![ObjectId::Item(index_on_item)],
650 privileges: vec![
651 (
652 SystemObjectId::Object(name.qualifiers.clone().into()),
653 AclMode::CREATE,
654 role_id,
655 ),
656 (
657 SystemObjectId::Object(index.cluster_id.into()),
658 AclMode::CREATE,
659 role_id,
660 ),
661 ],
662 item_usage: &CREATE_ITEM_USAGE,
663 ..Default::default()
664 }
665 }
666 Plan::CreateType(plan::CreateTypePlan { name, typ: _ }) => RbacRequirements {
667 privileges: vec![(
668 SystemObjectId::Object(name.qualifiers.clone().into()),
669 AclMode::CREATE,
670 role_id,
671 )],
672 item_usage: &CREATE_ITEM_USAGE,
673 ..Default::default()
674 },
675 Plan::Comment(plan::CommentPlan {
676 object_id,
677 sub_component: _,
678 comment: _,
679 }) => {
680 let (ownership, privileges) = match object_id {
681 CommentObjectId::Role(_) => (
684 Vec::new(),
685 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
686 ),
687 _ => (vec![ObjectId::from(*object_id)], Vec::new()),
688 };
689 RbacRequirements {
690 ownership,
691 privileges,
692 ..Default::default()
693 }
694 }
695 Plan::DropObjects(plan::DropObjectsPlan {
696 referenced_ids,
697 drop_ids: _,
698 object_type,
699 }) => {
700 let privileges = if object_type == &ObjectType::Role {
701 vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)]
702 } else {
703 referenced_ids
704 .iter()
705 .filter_map(|id| match id {
706 ObjectId::ClusterReplica((cluster_id, _)) => Some((
707 SystemObjectId::Object(cluster_id.into()),
708 AclMode::USAGE,
709 role_id,
710 )),
711 ObjectId::Schema((database_spec, _)) => match database_spec {
712 ResolvedDatabaseSpecifier::Ambient => None,
713 ResolvedDatabaseSpecifier::Id(database_id) => Some((
714 SystemObjectId::Object(database_id.into()),
715 AclMode::USAGE,
716 role_id,
717 )),
718 },
719 ObjectId::Item(item_id) => {
720 let item = catalog.get_item(item_id);
721 Some((
722 SystemObjectId::Object(item.name().qualifiers.clone().into()),
723 AclMode::USAGE,
724 role_id,
725 ))
726 }
727 ObjectId::Cluster(_)
728 | ObjectId::Database(_)
729 | ObjectId::Role(_)
730 | ObjectId::NetworkPolicy(_) => None,
731 })
732 .collect()
733 };
734 RbacRequirements {
735 ownership: referenced_ids.clone(),
737 privileges,
738 ..Default::default()
739 }
740 }
741 Plan::DropOwned(plan::DropOwnedPlan {
742 role_ids,
743 drop_ids: _,
744 privilege_revokes: _,
745 default_privilege_revokes: _,
746 }) => RbacRequirements {
747 role_membership: role_ids.into_iter().cloned().collect(),
748 ..Default::default()
749 },
750 Plan::ShowCreate(plan::ShowCreatePlan { id, row: _ }) => {
751 let container_id = match id {
752 ObjectId::Item(id) => Some(SystemObjectId::Object(
753 catalog.get_item(id).name().qualifiers.clone().into(),
754 )),
755 ObjectId::Schema((database_id, _schema_id)) => match database_id {
756 ResolvedDatabaseSpecifier::Ambient => None,
757 ResolvedDatabaseSpecifier::Id(id) => Some(SystemObjectId::Object(id.into())),
758 },
759 ObjectId::Cluster(_)
760 | ObjectId::ClusterReplica(_)
761 | ObjectId::Database(_)
762 | ObjectId::Role(_)
763 | ObjectId::NetworkPolicy(_) => None,
764 };
765 let privileges = match container_id {
766 Some(id) => vec![(id, AclMode::USAGE, role_id)],
767 None => Vec::new(),
768 };
769 RbacRequirements {
770 privileges,
771 item_usage: &EMPTY_ITEM_USAGE,
772 ..Default::default()
773 }
774 }
775 Plan::ShowColumns(plan::ShowColumnsPlan {
776 id,
777 select_plan,
778 new_resolved_ids: _,
779 }) => {
780 let mut privileges = vec![(
781 SystemObjectId::Object(catalog.get_item(id).name().qualifiers.clone().into()),
782 AclMode::USAGE,
783 role_id,
784 )];
785
786 for privilege in generate_rbac_requirements(
787 catalog,
788 &Plan::Select(select_plan.clone()),
789 active_conns,
790 target_cluster_id,
791 role_id,
792 )
793 .privileges
794 {
795 privileges.push(privilege);
796 }
797 RbacRequirements {
798 privileges,
799 ..Default::default()
800 }
801 }
802 Plan::Select(plan::SelectPlan {
803 source,
804 select: _,
805 when: _,
806 finishing: _,
807 copy_to: _,
808 }) => {
809 let items = source
810 .depends_on()
811 .into_iter()
812 .map(|gid| catalog.resolve_item_id(&gid));
813 let mut privileges = generate_read_privileges(catalog, items, role_id);
814 if let Some(privilege) = generate_cluster_usage_privileges(
815 source.as_const().is_some(),
816 target_cluster_id,
817 role_id,
818 ) {
819 privileges.push(privilege);
820 }
821 RbacRequirements {
822 privileges,
823 ..Default::default()
824 }
825 }
826 Plan::Subscribe(plan::SubscribePlan {
827 from,
828 with_snapshot: _,
829 when: _,
830 up_to: _,
831 copy_to: _,
832 emit_progress: _,
833 output: _,
834 }) => {
835 let items = from
836 .depends_on()
837 .into_iter()
838 .map(|gid| catalog.resolve_item_id(&gid));
839 let mut privileges = generate_read_privileges(catalog, items, role_id);
840 if let Some(cluster_id) = target_cluster_id {
841 privileges.push((
842 SystemObjectId::Object(cluster_id.into()),
843 AclMode::USAGE,
844 role_id,
845 ));
846 }
847 RbacRequirements {
848 privileges,
849 ..Default::default()
850 }
851 }
852 Plan::CopyFrom(plan::CopyFromPlan {
853 target_name: _,
854 target_id,
855 source: _,
856 columns: _,
857 source_desc: _,
858 mfp: _,
859 params: _,
860 filter: _,
861 }) => RbacRequirements {
862 privileges: vec![
863 (
864 SystemObjectId::Object(
865 catalog.get_item(target_id).name().qualifiers.clone().into(),
866 ),
867 AclMode::USAGE,
868 role_id,
869 ),
870 (
871 SystemObjectId::Object(target_id.into()),
872 AclMode::INSERT,
873 role_id,
874 ),
875 ],
876 ..Default::default()
877 },
878 Plan::CopyTo(plan::CopyToPlan {
879 select_plan,
880 desc: _,
881 to: _,
882 connection: _,
883 connection_id: _,
884 format: _,
885 max_file_size: _,
886 }) => {
887 let items = select_plan
888 .source
889 .depends_on()
890 .into_iter()
891 .map(|gid| catalog.resolve_item_id(&gid));
892 let mut privileges = generate_read_privileges(catalog, items, role_id);
893 if let Some(cluster_id) = target_cluster_id {
894 privileges.push((
895 SystemObjectId::Object(cluster_id.into()),
896 AclMode::USAGE,
897 role_id,
898 ));
899 }
900 RbacRequirements {
901 privileges,
902 ..Default::default()
903 }
904 }
905 Plan::ExplainPlan(plan::ExplainPlanPlan {
906 stage: _,
907 format: _,
908 config: _,
909 explainee,
910 })
911 | Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => RbacRequirements {
912 privileges: match explainee {
913 Explainee::View(id)
914 | Explainee::MaterializedView(id)
915 | Explainee::Index(id)
916 | Explainee::ReplanView(id)
917 | Explainee::ReplanMaterializedView(id)
918 | Explainee::ReplanIndex(id) => {
919 let item = catalog.get_item(id);
920 let schema_id: ObjectId = item.name().qualifiers.clone().into();
921 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
922 }
923 Explainee::Statement(stmt) => stmt
924 .depends_on()
925 .into_iter()
926 .map(|id| {
927 let item = catalog.get_item_by_global_id(&id);
928 let schema_id: ObjectId = item.name().qualifiers.clone().into();
929 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
930 })
931 .collect(),
932 },
933 item_usage: match explainee {
934 Explainee::View(..)
935 | Explainee::MaterializedView(..)
936 | Explainee::Index(..)
937 | Explainee::ReplanView(..)
938 | Explainee::ReplanMaterializedView(..)
939 | Explainee::ReplanIndex(..) => &EMPTY_ITEM_USAGE,
940 Explainee::Statement(_) => &DEFAULT_ITEM_USAGE,
941 },
942 ..Default::default()
943 },
944 Plan::ExplainSinkSchema(plan::ExplainSinkSchemaPlan { sink_from, .. }) => {
945 RbacRequirements {
946 privileges: {
947 let item = catalog.get_item_by_global_id(sink_from);
948 let schema_id: ObjectId = item.name().qualifiers.clone().into();
949 vec![(SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)]
950 },
951 item_usage: &EMPTY_ITEM_USAGE,
952 ..Default::default()
953 }
954 }
955 Plan::ExplainTimestamp(plan::ExplainTimestampPlan {
956 format: _,
957 raw_plan,
958 when: _,
959 }) => RbacRequirements {
960 privileges: raw_plan
961 .depends_on()
962 .into_iter()
963 .map(|id| {
964 let item = catalog.get_item_by_global_id(&id);
965 let schema_id: ObjectId = item.name().qualifiers.clone().into();
966 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id)
967 })
968 .collect(),
969 ..Default::default()
970 },
971 Plan::Insert(plan::InsertPlan {
972 id,
973 values,
974 returning,
975 }) => {
976 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
977 let mut privileges = vec![
978 (
979 SystemObjectId::Object(schema_id.clone()),
980 AclMode::USAGE,
981 role_id,
982 ),
983 (SystemObjectId::Object(id.into()), AclMode::INSERT, role_id),
984 ];
985 let mut seen = BTreeSet::from([(schema_id, role_id)]);
986
987 if returning
990 .iter()
991 .any(|assignment| assignment.contains_column())
992 {
993 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
994 seen.insert((id.into(), role_id));
995 }
996
997 let items = values
998 .depends_on()
999 .into_iter()
1000 .map(|gid| catalog.resolve_item_id(&gid));
1001 privileges.extend_from_slice(&generate_read_privileges_inner(
1002 catalog, items, role_id, &mut seen,
1003 ));
1004
1005 if let Some(privilege) = generate_cluster_usage_privileges(
1006 values.as_const().is_some(),
1007 target_cluster_id,
1008 role_id,
1009 ) {
1010 privileges.push(privilege);
1011 } else if !returning.is_empty() {
1012 if let Some(cluster_id) = target_cluster_id {
1015 privileges.push((
1016 SystemObjectId::Object(cluster_id.into()),
1017 AclMode::USAGE,
1018 role_id,
1019 ));
1020 }
1021 }
1022 RbacRequirements {
1023 privileges,
1024 ..Default::default()
1025 }
1026 }
1027 Plan::AlterCluster(plan::AlterClusterPlan {
1028 id,
1029 name: _,
1030 options: _,
1031 strategy: _,
1032 }) => RbacRequirements {
1033 ownership: vec![ObjectId::Cluster(*id)],
1034 item_usage: &CREATE_ITEM_USAGE,
1035 ..Default::default()
1036 },
1037 Plan::AlterSetCluster(plan::AlterSetClusterPlan { id, set_cluster }) => RbacRequirements {
1038 ownership: vec![ObjectId::Item(*id)],
1039 privileges: vec![(
1040 SystemObjectId::Object(set_cluster.into()),
1041 AclMode::CREATE,
1042 role_id,
1043 )],
1044 item_usage: &CREATE_ITEM_USAGE,
1045 ..Default::default()
1046 },
1047 Plan::AlterRetainHistory(plan::AlterRetainHistoryPlan {
1048 id,
1049 window: _,
1050 value: _,
1051 object_type: _,
1052 }) => RbacRequirements {
1053 ownership: vec![ObjectId::Item(*id)],
1054 item_usage: &CREATE_ITEM_USAGE,
1055 ..Default::default()
1056 },
1057 Plan::AlterSourceTimestampInterval(plan::AlterSourceTimestampIntervalPlan {
1058 id,
1059 value: _,
1060 interval: _,
1061 }) => RbacRequirements {
1062 ownership: vec![ObjectId::Item(*id)],
1063 item_usage: &CREATE_ITEM_USAGE,
1064 ..Default::default()
1065 },
1066 Plan::AlterConnection(plan::AlterConnectionPlan { id, action: _ }) => RbacRequirements {
1067 ownership: vec![ObjectId::Item(*id)],
1068 ..Default::default()
1069 },
1070 Plan::AlterSource(plan::AlterSourcePlan {
1071 item_id,
1072 ingestion_id: _,
1073 action: _,
1074 }) => RbacRequirements {
1075 ownership: vec![ObjectId::Item(*item_id)],
1076 item_usage: &CREATE_ITEM_USAGE,
1077 ..Default::default()
1078 },
1079 Plan::AlterSink(plan::AlterSinkPlan {
1080 item_id,
1081 global_id: _,
1082 sink,
1083 with_snapshot: _,
1084 in_cluster,
1085 }) => {
1086 let items = iter::once(sink.from).map(|gid| catalog.resolve_item_id(&gid));
1087 let mut privileges = generate_read_privileges(catalog, items, role_id);
1088 privileges.push((
1089 SystemObjectId::Object(in_cluster.into()),
1090 AclMode::CREATE,
1091 role_id,
1092 ));
1093 RbacRequirements {
1094 ownership: vec![ObjectId::Item(*item_id)],
1095 privileges,
1096 item_usage: &CREATE_ITEM_USAGE,
1097 ..Default::default()
1098 }
1099 }
1100 Plan::AlterClusterRename(plan::AlterClusterRenamePlan {
1101 id,
1102 name: _,
1103 to_name: _,
1104 }) => RbacRequirements {
1105 ownership: vec![ObjectId::Cluster(*id)],
1106 ..Default::default()
1107 },
1108 Plan::AlterClusterSwap(plan::AlterClusterSwapPlan {
1109 id_a,
1110 id_b,
1111 name_a: _,
1112 name_b: _,
1113 name_temp: _,
1114 }) => RbacRequirements {
1115 ownership: vec![ObjectId::Cluster(*id_a), ObjectId::Cluster(*id_b)],
1116 ..Default::default()
1117 },
1118 Plan::AlterClusterReplicaRename(plan::AlterClusterReplicaRenamePlan {
1119 cluster_id,
1120 replica_id,
1121 name: _,
1122 to_name: _,
1123 }) => RbacRequirements {
1124 ownership: vec![ObjectId::ClusterReplica((*cluster_id, *replica_id))],
1125 ..Default::default()
1126 },
1127 Plan::AlterItemRename(plan::AlterItemRenamePlan {
1128 id,
1129 current_full_name: _,
1130 to_name: _,
1131 object_type: _,
1132 }) => RbacRequirements {
1133 ownership: vec![ObjectId::Item(*id)],
1134 ..Default::default()
1135 },
1136 Plan::AlterSchemaRename(plan::AlterSchemaRenamePlan {
1137 cur_schema_spec,
1138 new_schema_name: _,
1139 }) => {
1140 let privileges = match cur_schema_spec.0 {
1141 ResolvedDatabaseSpecifier::Id(db_id) => vec![(
1142 SystemObjectId::Object(ObjectId::Database(db_id)),
1143 AclMode::CREATE,
1144 role_id,
1145 )],
1146 ResolvedDatabaseSpecifier::Ambient => vec![],
1147 };
1148
1149 RbacRequirements {
1150 ownership: vec![ObjectId::Schema(*cur_schema_spec)],
1151 privileges,
1152 ..Default::default()
1153 }
1154 }
1155 Plan::AlterSchemaSwap(plan::AlterSchemaSwapPlan {
1156 schema_a_spec,
1157 schema_a_name: _,
1158 schema_b_spec,
1159 schema_b_name: _,
1160 name_temp: _,
1161 }) => {
1162 let mut privileges = vec![];
1163 if let ResolvedDatabaseSpecifier::Id(id_a) = schema_a_spec.0 {
1164 privileges.push((
1165 SystemObjectId::Object(ObjectId::Database(id_a)),
1166 AclMode::CREATE,
1167 role_id,
1168 ));
1169 }
1170 if let ResolvedDatabaseSpecifier::Id(id_b) = schema_b_spec.0 {
1171 privileges.push((
1172 SystemObjectId::Object(ObjectId::Database(id_b)),
1173 AclMode::CREATE,
1174 role_id,
1175 ));
1176 }
1177
1178 RbacRequirements {
1179 ownership: vec![
1180 ObjectId::Schema(*schema_a_spec),
1181 ObjectId::Schema(*schema_b_spec),
1182 ],
1183 privileges,
1184 ..Default::default()
1185 }
1186 }
1187 Plan::AlterSecret(plan::AlterSecretPlan { id, secret_as: _ }) => RbacRequirements {
1188 ownership: vec![ObjectId::Item(*id)],
1189 item_usage: &CREATE_ITEM_USAGE,
1190 ..Default::default()
1191 },
1192 Plan::AlterRole(plan::AlterRolePlan {
1193 id,
1194 name: _,
1195 option,
1196 }) => match option {
1197 plan::PlannedAlterRoleOption::Attributes(attributes)
1199 if attributes.superuser.is_some() =>
1200 {
1201 RbacRequirements {
1202 superuser_action: Some("alter superuser role".to_string()),
1203 ..Default::default()
1204 }
1205 }
1206 plan::PlannedAlterRoleOption::Attributes(plan::PlannedRoleAttributes {
1209 password,
1210 scram_iterations: _,
1213 nopassword: _,
1214 superuser: None,
1217 inherit: None,
1218 login: None,
1219 }) if password.is_some() && role_id == *id => RbacRequirements::default(),
1220 plan::PlannedAlterRoleOption::Attributes(attributes)
1222 if attributes.password.is_some() && role_id != *id =>
1223 {
1224 RbacRequirements {
1225 superuser_action: Some("alter password of role".to_string()),
1226 ..Default::default()
1227 }
1228 }
1229 plan::PlannedAlterRoleOption::Variable(_) if role_id == *id => {
1231 RbacRequirements::default()
1232 }
1233 _ => RbacRequirements {
1235 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1236 item_usage: &CREATE_ITEM_USAGE,
1237 ..Default::default()
1238 },
1239 },
1240 Plan::AlterOwner(plan::AlterOwnerPlan {
1241 id,
1242 object_type: _,
1243 new_owner,
1244 }) => {
1245 let privileges = match id {
1246 ObjectId::ClusterReplica((cluster_id, _)) => {
1247 vec![(
1248 SystemObjectId::Object(cluster_id.into()),
1249 AclMode::CREATE,
1250 role_id,
1251 )]
1252 }
1253 ObjectId::Schema((database_spec, _)) => match database_spec {
1254 ResolvedDatabaseSpecifier::Ambient => Vec::new(),
1255 ResolvedDatabaseSpecifier::Id(database_id) => {
1256 vec![(
1257 SystemObjectId::Object(database_id.into()),
1258 AclMode::CREATE,
1259 role_id,
1260 )]
1261 }
1262 },
1263 ObjectId::Item(item_id) => {
1264 let item = catalog.get_item(item_id);
1265 vec![(
1266 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1267 AclMode::CREATE,
1268 role_id,
1269 )]
1270 }
1271 ObjectId::Cluster(_)
1272 | ObjectId::Database(_)
1273 | ObjectId::Role(_)
1274 | ObjectId::NetworkPolicy(_) => Vec::new(),
1275 };
1276 RbacRequirements {
1277 role_membership: BTreeSet::from([*new_owner]),
1278 ownership: vec![id.clone()],
1279 privileges,
1280 ..Default::default()
1281 }
1282 }
1283 Plan::AlterTableAddColumn(plan::AlterTablePlan { relation_id, .. }) => RbacRequirements {
1284 ownership: vec![ObjectId::Item(*relation_id)],
1285 item_usage: &CREATE_ITEM_USAGE,
1286 ..Default::default()
1287 },
1288 Plan::AlterMaterializedViewApplyReplacement(
1289 plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id },
1290 ) => RbacRequirements {
1291 ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)],
1292 item_usage: &CREATE_ITEM_USAGE,
1293 ..Default::default()
1294 },
1295 Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements {
1296 ownership: vec![ObjectId::NetworkPolicy(*id)],
1297 item_usage: &CREATE_ITEM_USAGE,
1298 ..Default::default()
1299 },
1300 Plan::ReadThenWrite(plan::ReadThenWritePlan {
1301 id,
1302 selection,
1303 finishing: _,
1304 assignments,
1305 kind,
1306 returning,
1307 }) => {
1308 let acl_mode = match kind {
1309 MutationKind::Insert => AclMode::INSERT,
1310 MutationKind::Update => AclMode::UPDATE,
1311 MutationKind::Delete => AclMode::DELETE,
1312 };
1313 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1314 let mut privileges = vec![
1315 (
1316 SystemObjectId::Object(schema_id.clone()),
1317 AclMode::USAGE,
1318 role_id,
1319 ),
1320 (SystemObjectId::Object(id.into()), acl_mode, role_id),
1321 ];
1322 let mut seen = BTreeSet::from([(schema_id, role_id)]);
1323
1324 if assignments
1327 .values()
1328 .chain(returning.iter())
1329 .any(|assignment| assignment.contains_column())
1330 {
1331 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1332 seen.insert((id.into(), role_id));
1333 }
1334
1335 let items = selection
1342 .depends_on()
1343 .into_iter()
1344 .map(|gid| catalog.resolve_item_id(&gid));
1345 privileges.extend_from_slice(&generate_read_privileges_inner(
1346 catalog, items, role_id, &mut seen,
1347 ));
1348
1349 if let Some(privilege) = generate_cluster_usage_privileges(
1350 selection.as_const().is_some(),
1351 target_cluster_id,
1352 role_id,
1353 ) {
1354 privileges.push(privilege);
1355 }
1356 RbacRequirements {
1357 privileges,
1358 ..Default::default()
1359 }
1360 }
1361 Plan::GrantRole(plan::GrantRolePlan {
1362 role_ids: _,
1363 member_ids: _,
1364 grantor_id: _,
1365 })
1366 | Plan::RevokeRole(plan::RevokeRolePlan {
1367 role_ids: _,
1368 member_ids: _,
1369 grantor_id: _,
1370 }) => RbacRequirements {
1371 privileges: vec![(SystemObjectId::System, AclMode::CREATE_ROLE, role_id)],
1372 ..Default::default()
1373 },
1374 Plan::GrantPrivileges(plan::GrantPrivilegesPlan {
1375 update_privileges,
1376 grantees: _,
1377 })
1378 | Plan::RevokePrivileges(plan::RevokePrivilegesPlan {
1379 update_privileges,
1380 revokees: _,
1381 }) => {
1382 let mut privileges = Vec::with_capacity(update_privileges.len());
1383 for UpdatePrivilege { target_id, .. } in update_privileges {
1384 match target_id {
1385 SystemObjectId::Object(object_id) => match object_id {
1386 ObjectId::ClusterReplica((cluster_id, _)) => {
1387 privileges.push((
1388 SystemObjectId::Object(cluster_id.into()),
1389 AclMode::USAGE,
1390 role_id,
1391 ));
1392 }
1393 ObjectId::Schema((database_spec, _)) => match database_spec {
1394 ResolvedDatabaseSpecifier::Ambient => {}
1395 ResolvedDatabaseSpecifier::Id(database_id) => {
1396 privileges.push((
1397 SystemObjectId::Object(database_id.into()),
1398 AclMode::USAGE,
1399 role_id,
1400 ));
1401 }
1402 },
1403 ObjectId::Item(item_id) => {
1404 let item = catalog.get_item(item_id);
1405 privileges.push((
1406 SystemObjectId::Object(item.name().qualifiers.clone().into()),
1407 AclMode::USAGE,
1408 role_id,
1409 ))
1410 }
1411 ObjectId::Cluster(_)
1412 | ObjectId::Database(_)
1413 | ObjectId::Role(_)
1414 | ObjectId::NetworkPolicy(_) => {}
1415 },
1416 SystemObjectId::System => {}
1417 }
1418 }
1419 RbacRequirements {
1420 ownership: update_privileges
1421 .iter()
1422 .filter_map(|update_privilege| update_privilege.target_id.object_id())
1423 .cloned()
1424 .collect(),
1425 privileges,
1426 superuser_action: if update_privileges
1431 .iter()
1432 .any(|update_privilege| update_privilege.target_id.is_system())
1433 {
1434 Some("GRANT/REVOKE SYSTEM PRIVILEGES".to_string())
1435 } else {
1436 None
1437 },
1438 ..Default::default()
1439 }
1440 }
1441 Plan::AlterDefaultPrivileges(plan::AlterDefaultPrivilegesPlan {
1442 privilege_objects,
1443 privilege_acl_items: _,
1444 is_grant: _,
1445 }) => RbacRequirements {
1446 role_membership: privilege_objects
1447 .iter()
1448 .map(|privilege_object| privilege_object.role_id)
1449 .collect(),
1450 privileges: privilege_objects
1451 .into_iter()
1452 .filter_map(|privilege_object| {
1453 if let (Some(database_id), Some(_)) =
1454 (privilege_object.database_id, privilege_object.schema_id)
1455 {
1456 Some((
1457 SystemObjectId::Object(database_id.into()),
1458 AclMode::USAGE,
1459 role_id,
1460 ))
1461 } else {
1462 None
1463 }
1464 })
1465 .collect(),
1466 superuser_action: if privilege_objects
1472 .iter()
1473 .any(|privilege_object| privilege_object.role_id.is_public())
1474 {
1475 Some("ALTER DEFAULT PRIVILEGES FOR ALL ROLES".to_string())
1476 } else {
1477 None
1478 },
1479 ..Default::default()
1480 },
1481 Plan::ReassignOwned(plan::ReassignOwnedPlan {
1482 old_roles,
1483 new_role,
1484 reassign_ids: _,
1485 }) => RbacRequirements {
1486 role_membership: old_roles
1487 .into_iter()
1488 .cloned()
1489 .chain(iter::once(*new_role))
1490 .collect(),
1491 ..Default::default()
1492 },
1493 Plan::SideEffectingFunc(func) => {
1494 let role_membership = match func {
1495 SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1496 .expect("active_conns is required for Plan::SideEffectingFunc")(
1497 *connection_id
1498 )
1499 .map(|x| [x].into())
1500 .unwrap_or_default(),
1501 };
1502 RbacRequirements {
1503 role_membership,
1504 ..Default::default()
1505 }
1506 }
1507 Plan::ValidateConnection(plan::ValidateConnectionPlan { id, connection: _ }) => {
1508 let schema_id: ObjectId = catalog.get_item(id).name().qualifiers.clone().into();
1509 RbacRequirements {
1510 privileges: vec![
1511 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1512 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1513 ],
1514 ..Default::default()
1515 }
1516 }
1517 Plan::DiscardTemp
1518 | Plan::DiscardAll
1519 | Plan::EmptyQuery
1520 | Plan::ShowAllVariables
1521 | Plan::ShowVariable(plan::ShowVariablePlan { name: _ })
1522 | Plan::InspectShard(plan::InspectShardPlan { id: _ })
1523 | Plan::SetVariable(plan::SetVariablePlan {
1524 name: _,
1525 value: _,
1526 local: _,
1527 })
1528 | Plan::ResetVariable(plan::ResetVariablePlan { name: _ })
1529 | Plan::SetTransaction(plan::SetTransactionPlan { local: _, modes: _ })
1530 | Plan::StartTransaction(plan::StartTransactionPlan {
1531 access: _,
1532 isolation_level: _,
1533 })
1534 | Plan::CommitTransaction(plan::CommitTransactionPlan {
1535 transaction_type: _,
1536 })
1537 | Plan::AbortTransaction(plan::AbortTransactionPlan {
1538 transaction_type: _,
1539 })
1540 | Plan::AlterNoop(plan::AlterNoopPlan { object_type: _ })
1541 | Plan::AlterSystemSet(plan::AlterSystemSetPlan { name: _, value: _ })
1542 | Plan::AlterSystemReset(plan::AlterSystemResetPlan { name: _ })
1543 | Plan::AlterSystemResetAll(plan::AlterSystemResetAllPlan {})
1544 | Plan::Declare(plan::DeclarePlan {
1545 name: _,
1546 stmt: _,
1547 sql: _,
1548 params: _,
1549 })
1550 | Plan::Fetch(plan::FetchPlan {
1551 name: _,
1552 count: _,
1553 timeout: _,
1554 })
1555 | Plan::Close(plan::ClosePlan { name: _ })
1556 | Plan::Prepare(plan::PreparePlan {
1557 name: _,
1558 stmt: _,
1559 desc: _,
1560 sql: _,
1561 })
1562 | Plan::Execute(plan::ExecutePlan { name: _, params: _ })
1563 | Plan::Deallocate(plan::DeallocatePlan { name: _ })
1564 | Plan::Raise(plan::RaisePlan { severity: _ }) => Default::default(),
1565 }
1566}
1567
1568fn check_owner_roles(
1570 object_id: &ObjectId,
1571 role_ids: &BTreeSet<RoleId>,
1572 catalog: &impl SessionCatalog,
1573) -> bool {
1574 if let Some(owner_id) = catalog.get_owner_id(object_id) {
1575 role_ids.contains(&owner_id)
1576 } else {
1577 true
1578 }
1579}
1580
1581fn ownership_err(
1582 unheld_ownership: Vec<ObjectId>,
1583 catalog: &impl SessionCatalog,
1584) -> Result<(), UnauthorizedError> {
1585 if !unheld_ownership.is_empty() {
1586 let objects = unheld_ownership
1587 .into_iter()
1588 .map(|ownership| match ownership {
1589 ObjectId::Cluster(id) => (
1590 ObjectType::Cluster,
1591 catalog.get_cluster(id).name().to_string(),
1592 ),
1593 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1594 let cluster = catalog.get_cluster(cluster_id);
1595 let replica = catalog.get_cluster_replica(cluster_id, replica_id);
1596 let name = QualifiedReplica {
1599 cluster: Ident::new_unchecked(cluster.name()),
1600 replica: Ident::new_unchecked(replica.name()),
1601 };
1602 (ObjectType::ClusterReplica, name.to_string())
1603 }
1604 ObjectId::Database(id) => (
1605 ObjectType::Database,
1606 catalog.get_database(&id).name().to_string(),
1607 ),
1608 ObjectId::Schema((database_spec, schema_spec)) => {
1609 let schema = catalog.get_schema(&database_spec, &schema_spec);
1610 let name = catalog.resolve_full_schema_name(schema.name());
1611 (ObjectType::Schema, name.to_string())
1612 }
1613 ObjectId::Item(id) => {
1614 let item = catalog.get_item(&id);
1615 let name = catalog.resolve_full_name(item.name());
1616 (item.item_type().into(), name.to_string())
1617 }
1618 ObjectId::NetworkPolicy(id) => (
1619 ObjectType::NetworkPolicy,
1620 catalog.get_network_policy(&id).name().to_string(),
1621 ),
1622 ObjectId::Role(_) => unreachable!("roles have no owner"),
1623 })
1624 .collect();
1625 Err(UnauthorizedError::Ownership { objects })
1626 } else {
1627 Ok(())
1628 }
1629}
1630
1631fn generate_required_source_privileges(
1632 name: &QualifiedItemName,
1633 data_source: &DataSourceDesc,
1634 in_cluster: Option<ClusterId>,
1635 role_id: RoleId,
1636) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1637 let mut privileges = vec![(
1638 SystemObjectId::Object(name.qualifiers.clone().into()),
1639 AclMode::CREATE,
1640 role_id,
1641 )];
1642 match (data_source, in_cluster) {
1643 (_, Some(id)) => {
1644 privileges.push((SystemObjectId::Object(id.into()), AclMode::CREATE, role_id))
1645 }
1646 (DataSourceDesc::Ingestion(_), None) => {
1647 privileges.push((SystemObjectId::System, AclMode::CREATE_CLUSTER, role_id))
1648 }
1649 (_, None) => {}
1654 }
1655 privileges
1656}
1657
1658fn generate_read_privileges(
1666 catalog: &impl SessionCatalog,
1667 ids: impl Iterator<Item = CatalogItemId>,
1668 role_id: RoleId,
1669) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1670 generate_read_privileges_inner(catalog, ids, role_id, &mut BTreeSet::new())
1671}
1672
1673fn generate_read_privileges_inner(
1674 catalog: &impl SessionCatalog,
1675 ids: impl Iterator<Item = CatalogItemId>,
1676 role_id: RoleId,
1677 seen: &mut BTreeSet<(ObjectId, RoleId)>,
1678) -> Vec<(SystemObjectId, AclMode, RoleId)> {
1679 let mut privileges = Vec::new();
1680 let mut views = Vec::new();
1681
1682 for id in ids {
1683 if seen.insert((id.into(), role_id)) {
1684 let item = catalog.get_item(&id);
1685 let schema_id: ObjectId = item.name().qualifiers.clone().into();
1686 if seen.insert((schema_id.clone(), role_id)) {
1687 privileges.push((SystemObjectId::Object(schema_id), AclMode::USAGE, role_id))
1688 }
1689 match item.item_type() {
1690 CatalogItemType::View
1691 | CatalogItemType::MaterializedView
1692 | CatalogItemType::ContinualTask => {
1693 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1694 views.push((item.references().items().copied(), item.owner_id()));
1695 }
1696 CatalogItemType::Table | CatalogItemType::Source => {
1697 privileges.push((SystemObjectId::Object(id.into()), AclMode::SELECT, role_id));
1698 }
1699 CatalogItemType::Type | CatalogItemType::Secret | CatalogItemType::Connection => {
1700 privileges.push((SystemObjectId::Object(id.into()), AclMode::USAGE, role_id));
1701 }
1702 CatalogItemType::Sink | CatalogItemType::Index | CatalogItemType::Func => {}
1703 }
1704 }
1705 }
1706
1707 for (view_ids, view_owner) in views {
1708 privileges.extend_from_slice(&generate_read_privileges_inner(
1709 catalog, view_ids, view_owner, seen,
1710 ));
1711 }
1712
1713 privileges
1714}
1715
1716fn generate_usage_privileges(
1717 catalog: &impl SessionCatalog,
1718 ids: &ResolvedIds,
1719 role_id: RoleId,
1720 item_types: &BTreeSet<CatalogItemType>,
1721) -> BTreeSet<(SystemObjectId, AclMode, RoleId)> {
1722 ids.items()
1724 .filter_map(move |id| {
1725 let item = catalog.get_item(id);
1726 if item_types.contains(&item.item_type()) {
1727 let schema_id = item.name().qualifiers.clone().into();
1728 Some([
1729 (SystemObjectId::Object(schema_id), AclMode::USAGE, role_id),
1730 (SystemObjectId::Object(id.into()), AclMode::USAGE, role_id),
1731 ])
1732 } else {
1733 None
1734 }
1735 })
1736 .flatten()
1737 .collect()
1738}
1739
1740fn generate_cluster_usage_privileges(
1741 expr_is_const: bool,
1742 target_cluster_id: Option<ClusterId>,
1743 role_id: RoleId,
1744) -> Option<(SystemObjectId, AclMode, RoleId)> {
1745 if !expr_is_const {
1748 if let Some(cluster_id) = target_cluster_id {
1749 return Some((
1750 SystemObjectId::Object(cluster_id.into()),
1751 AclMode::USAGE,
1752 role_id,
1753 ));
1754 }
1755 }
1756
1757 None
1758}
1759
1760fn check_object_privileges(
1761 catalog: &impl SessionCatalog,
1762 privileges: Vec<(SystemObjectId, AclMode, RoleId)>,
1763 role_membership: BTreeSet<RoleId>,
1764 current_role_id: RoleId,
1765) -> Result<(), UnauthorizedError> {
1766 let mut role_memberships: BTreeMap<RoleId, BTreeSet<RoleId>> = BTreeMap::new();
1767 role_memberships.insert(current_role_id, role_membership);
1768 for (object_id, acl_mode, role_id) in privileges {
1769 if matches!(
1773 &object_id,
1774 SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary)))
1775 ) {
1776 continue;
1777 }
1778
1779 let role_membership = role_memberships
1780 .entry(role_id)
1781 .or_insert_with_key(|role_id| catalog.collect_role_membership(role_id));
1782 let object_privileges = catalog
1783 .get_privileges(&object_id)
1784 .expect("only object types with privileges will generate required privileges");
1785 let role_privileges = role_membership
1786 .iter()
1787 .flat_map(|role_id| object_privileges.get_acl_items_for_grantee(role_id))
1788 .map(|mz_acl_item| mz_acl_item.acl_mode)
1789 .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode));
1790 if !role_privileges.contains(acl_mode) {
1791 let role_name = catalog.get_role(&role_id).name().to_string();
1792 let privileges = acl_mode.to_error_string();
1793 return Err(UnauthorizedError::Privilege {
1794 object_description: ErrorMessageObjectDescription::from_sys_id(&object_id, catalog),
1795 role_name,
1796 privileges,
1797 });
1798 }
1799 }
1800
1801 Ok(())
1802}
1803
1804pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode {
1805 const TABLE_ACL_MODE: AclMode = AclMode::INSERT
1806 .union(AclMode::SELECT)
1807 .union(AclMode::UPDATE)
1808 .union(AclMode::DELETE);
1809 const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE);
1810 const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE
1811 .union(AclMode::CREATE_DB)
1812 .union(AclMode::CREATE_CLUSTER)
1813 .union(AclMode::CREATE_NETWORK_POLICY);
1814
1815 const EMPTY_ACL_MODE: AclMode = AclMode::empty();
1816 match object_type {
1817 SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE,
1818 SystemObjectType::Object(ObjectType::View) => AclMode::SELECT,
1819 SystemObjectType::Object(ObjectType::MaterializedView) => AclMode::SELECT,
1820 SystemObjectType::Object(ObjectType::Source) => AclMode::SELECT,
1821 SystemObjectType::Object(ObjectType::Sink) => EMPTY_ACL_MODE,
1822 SystemObjectType::Object(ObjectType::Index) => EMPTY_ACL_MODE,
1823 SystemObjectType::Object(ObjectType::Type) => AclMode::USAGE,
1824 SystemObjectType::Object(ObjectType::Role) => EMPTY_ACL_MODE,
1825 SystemObjectType::Object(ObjectType::Cluster) => USAGE_CREATE_ACL_MODE,
1826 SystemObjectType::Object(ObjectType::ClusterReplica) => EMPTY_ACL_MODE,
1827 SystemObjectType::Object(ObjectType::Secret) => AclMode::USAGE,
1828 SystemObjectType::Object(ObjectType::NetworkPolicy) => AclMode::USAGE,
1829 SystemObjectType::Object(ObjectType::Connection) => AclMode::USAGE,
1830 SystemObjectType::Object(ObjectType::Database) => USAGE_CREATE_ACL_MODE,
1831 SystemObjectType::Object(ObjectType::Schema) => USAGE_CREATE_ACL_MODE,
1832 SystemObjectType::Object(ObjectType::Func) => EMPTY_ACL_MODE,
1833 SystemObjectType::Object(ObjectType::ContinualTask) => AclMode::SELECT,
1834 SystemObjectType::System => ALL_SYSTEM_PRIVILEGES,
1835 }
1836}
1837
1838pub const fn owner_privilege(object_type: ObjectType, owner_id: RoleId) -> MzAclItem {
1839 MzAclItem {
1840 grantee: owner_id,
1841 grantor: owner_id,
1842 acl_mode: all_object_privileges(SystemObjectType::Object(object_type)),
1843 }
1844}
1845
1846const fn default_builtin_object_acl_mode(object_type: ObjectType) -> AclMode {
1847 match object_type {
1848 ObjectType::Table
1849 | ObjectType::View
1850 | ObjectType::MaterializedView
1851 | ObjectType::Source
1852 | ObjectType::ContinualTask => AclMode::SELECT,
1853 ObjectType::Type | ObjectType::Schema => AclMode::USAGE,
1854 ObjectType::Sink
1855 | ObjectType::Index
1856 | ObjectType::Role
1857 | ObjectType::Cluster
1858 | ObjectType::ClusterReplica
1859 | ObjectType::Secret
1860 | ObjectType::Connection
1861 | ObjectType::Database
1862 | ObjectType::Func
1863 | ObjectType::NetworkPolicy => AclMode::empty(),
1864 }
1865}
1866
1867pub const fn support_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1868 let acl_mode = default_builtin_object_acl_mode(object_type);
1869 MzAclItem {
1870 grantee: MZ_SUPPORT_ROLE_ID,
1871 grantor: MZ_SYSTEM_ROLE_ID,
1872 acl_mode,
1873 }
1874}
1875
1876pub const fn default_builtin_object_privilege(object_type: ObjectType) -> MzAclItem {
1877 let acl_mode = default_builtin_object_acl_mode(object_type);
1878 MzAclItem {
1879 grantee: RoleId::Public,
1880 grantor: MZ_SYSTEM_ROLE_ID,
1881 acl_mode,
1882 }
1883}