1use std::collections::BTreeSet;
16
17use itertools::Itertools;
18use mz_sql_parser::ast::display::AstDisplay;
19
20use crate::ast::{Ident, UnresolvedDatabaseName};
21use crate::catalog::{
22 DefaultPrivilegeAclItem, DefaultPrivilegeObject, ErrorMessageObjectDescription, ObjectType,
23 SystemObjectType,
24};
25use crate::names::{
26 Aug, ObjectId, ResolvedDatabaseSpecifier, ResolvedRoleName, SchemaSpecifier, SystemObjectId,
27};
28use crate::plan::error::PlanError;
29use crate::plan::statement::ddl::{
30 resolve_cluster, resolve_database, resolve_item_or_type, resolve_network_policy, resolve_schema,
31};
32use crate::plan::statement::{StatementContext, StatementDesc};
33use crate::plan::{
34 AlterDefaultPrivilegesPlan, AlterNoopPlan, AlterOwnerPlan, GrantPrivilegesPlan, GrantRolePlan,
35 Plan, PlanNotice, ReassignOwnedPlan, RevokePrivilegesPlan, RevokeRolePlan, UpdatePrivilege,
36};
37use crate::session::user::SYSTEM_USER;
38use mz_ore::str::StrExt;
39use mz_repr::adt::mz_acl_item::AclMode;
40use mz_repr::role_id::RoleId;
41use mz_sql_parser::ast::{
42 AbbreviatedGrantOrRevokeStatement, AlterDefaultPrivilegesStatement, AlterOwnerStatement,
43 GrantPrivilegesStatement, GrantRoleStatement, GrantTargetAllSpecification,
44 GrantTargetSpecification, GrantTargetSpecificationInner, Privilege, PrivilegeSpecification,
45 ReassignOwnedStatement, RevokePrivilegesStatement, RevokeRoleStatement,
46 TargetRoleSpecification, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName,
47};
48
49pub fn describe_alter_owner(
50 _: &StatementContext,
51 _: AlterOwnerStatement<Aug>,
52) -> Result<StatementDesc, PlanError> {
53 Ok(StatementDesc::new(None))
54}
55
56pub fn plan_alter_owner(
57 scx: &StatementContext,
58 AlterOwnerStatement {
59 object_type,
60 if_exists,
61 name,
62 new_owner,
63 }: AlterOwnerStatement<Aug>,
64) -> Result<Plan, PlanError> {
65 let object_type = object_type.into();
66 match (object_type, name) {
67 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
68 plan_alter_cluster_owner(scx, if_exists, name, new_owner.id)
69 }
70 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(_)) => {
71 bail_never_supported!("altering the owner of a cluster replica");
72 }
73 (ObjectType::Database, UnresolvedObjectName::Database(name)) => {
74 plan_alter_database_owner(scx, if_exists, name, new_owner.id)
75 }
76 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
77 plan_alter_schema_owner(scx, if_exists, name, new_owner.id)
78 }
79 (ObjectType::NetworkPolicy, UnresolvedObjectName::NetworkPolicy(name)) => {
80 plan_alter_network_policy_owner(scx, if_exists, name, new_owner.id)
81 }
82 (ObjectType::Role, UnresolvedObjectName::Role(_)) => unreachable!("rejected by the parser"),
83 (
84 object_type @ ObjectType::Cluster
85 | object_type @ ObjectType::ClusterReplica
86 | object_type @ ObjectType::Database
87 | object_type @ ObjectType::Schema
88 | object_type @ ObjectType::Role,
89 name,
90 )
91 | (
92 object_type,
93 name @ UnresolvedObjectName::Cluster(_)
94 | name @ UnresolvedObjectName::ClusterReplica(_)
95 | name @ UnresolvedObjectName::Database(_)
96 | name @ UnresolvedObjectName::Schema(_)
97 | name @ UnresolvedObjectName::NetworkPolicy(_)
98 | name @ UnresolvedObjectName::Role(_),
99 ) => {
100 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
101 }
102 (object_type, UnresolvedObjectName::Item(name)) => {
103 plan_alter_item_owner(scx, object_type, if_exists, name, new_owner.id)
104 }
105 }
106}
107
108fn plan_alter_cluster_owner(
109 scx: &StatementContext,
110 if_exists: bool,
111 name: Ident,
112 new_owner: RoleId,
113) -> Result<Plan, PlanError> {
114 match resolve_cluster(scx, &name, if_exists)? {
115 Some(cluster) => Ok(Plan::AlterOwner(AlterOwnerPlan {
116 id: ObjectId::Cluster(cluster.id()),
117 object_type: ObjectType::Cluster,
118 new_owner,
119 })),
120 None => {
121 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
122 name: name.to_ast_string_simple(),
123 object_type: ObjectType::Cluster,
124 });
125 Ok(Plan::AlterNoop(AlterNoopPlan {
126 object_type: ObjectType::Cluster,
127 }))
128 }
129 }
130}
131
132fn plan_alter_database_owner(
133 scx: &StatementContext,
134 if_exists: bool,
135 name: UnresolvedDatabaseName,
136 new_owner: RoleId,
137) -> Result<Plan, PlanError> {
138 match resolve_database(scx, &name, if_exists)? {
139 Some(database) => Ok(Plan::AlterOwner(AlterOwnerPlan {
140 id: ObjectId::Database(database.id()),
141 object_type: ObjectType::Database,
142 new_owner,
143 })),
144 None => {
145 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
146 name: name.to_ast_string_simple(),
147 object_type: ObjectType::Database,
148 });
149
150 Ok(Plan::AlterNoop(AlterNoopPlan {
151 object_type: ObjectType::Database,
152 }))
153 }
154 }
155}
156
157fn plan_alter_schema_owner(
158 scx: &StatementContext,
159 if_exists: bool,
160 name: UnresolvedSchemaName,
161 new_owner: RoleId,
162) -> Result<Plan, PlanError> {
163 match resolve_schema(scx, name.clone(), if_exists)? {
164 Some((database_spec, schema_spec)) => {
165 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
166 sql_bail!(
167 "cannot alter schema {name} because it is required by the database system",
168 );
169 }
170 if let SchemaSpecifier::Temporary = schema_spec {
171 sql_bail!("cannot alter schema {name} because it is a temporary schema",)
172 }
173 Ok(Plan::AlterOwner(AlterOwnerPlan {
174 id: ObjectId::Schema((database_spec, schema_spec)),
175 object_type: ObjectType::Schema,
176 new_owner,
177 }))
178 }
179 None => {
180 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
181 name: name.to_ast_string_simple(),
182 object_type: ObjectType::Schema,
183 });
184
185 Ok(Plan::AlterNoop(AlterNoopPlan {
186 object_type: ObjectType::Schema,
187 }))
188 }
189 }
190}
191
192fn plan_alter_item_owner(
193 scx: &StatementContext,
194 object_type: ObjectType,
195 if_exists: bool,
196 name: UnresolvedItemName,
197 new_owner: RoleId,
198) -> Result<Plan, PlanError> {
199 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
200 Ok(r) => r,
201 Err(PlanError::MismatchedObjectType {
203 name,
204 is_type: ObjectType::MaterializedView,
205 expected_type: ObjectType::View,
206 }) => {
207 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
208 }
209 e => e?,
210 };
211
212 match resolved {
213 Some(item) => {
214 if item.id().is_system() {
215 sql_bail!(
216 "cannot alter item {} because it is required by the database system",
217 scx.catalog.resolve_full_name(item.name()),
218 );
219 }
220
221 Ok(Plan::AlterOwner(AlterOwnerPlan {
222 id: ObjectId::Item(item.id()),
223 object_type,
224 new_owner,
225 }))
226 }
227 None => {
228 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
229 name: name.to_ast_string_simple(),
230 object_type,
231 });
232
233 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
234 }
235 }
236}
237
238fn plan_alter_network_policy_owner(
239 scx: &StatementContext,
240 if_exists: bool,
241 name: Ident,
242 new_owner: RoleId,
243) -> Result<Plan, PlanError> {
244 match resolve_network_policy(scx, name.clone(), if_exists)? {
245 Some(policy_id) => Ok(Plan::AlterOwner(AlterOwnerPlan {
246 id: ObjectId::NetworkPolicy(policy_id.id),
247 object_type: ObjectType::Schema,
248 new_owner,
249 })),
250 None => {
251 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
252 name: name.to_ast_string_simple(),
253 object_type: ObjectType::NetworkPolicy,
254 });
255
256 Ok(Plan::AlterNoop(AlterNoopPlan {
257 object_type: ObjectType::NetworkPolicy,
258 }))
259 }
260 }
261}
262
263pub fn describe_grant_role(
264 _: &StatementContext,
265 _: GrantRoleStatement<Aug>,
266) -> Result<StatementDesc, PlanError> {
267 Ok(StatementDesc::new(None))
268}
269
270pub fn plan_grant_role(
271 scx: &StatementContext,
272 GrantRoleStatement {
273 role_names,
274 member_names,
275 }: GrantRoleStatement<Aug>,
276) -> Result<Plan, PlanError> {
277 let grantor_id = scx
283 .catalog
284 .resolve_role(&SYSTEM_USER.name)
285 .expect("system user must exist")
286 .id();
287 Ok(Plan::GrantRole(GrantRolePlan {
288 role_ids: role_names
289 .into_iter()
290 .map(|role_name| role_name.id)
291 .collect(),
292 member_ids: member_names
293 .into_iter()
294 .map(|member_name| member_name.id)
295 .collect(),
296 grantor_id,
297 }))
298}
299
300pub fn describe_revoke_role(
301 _: &StatementContext,
302 _: RevokeRoleStatement<Aug>,
303) -> Result<StatementDesc, PlanError> {
304 Ok(StatementDesc::new(None))
305}
306
307pub fn plan_revoke_role(
308 scx: &StatementContext,
309 RevokeRoleStatement {
310 role_names,
311 member_names,
312 }: RevokeRoleStatement<Aug>,
313) -> Result<Plan, PlanError> {
314 let grantor_id = scx
322 .catalog
323 .resolve_role(&SYSTEM_USER.name)
324 .expect("system user must exist")
325 .id();
326 Ok(Plan::RevokeRole(RevokeRolePlan {
327 role_ids: role_names
328 .into_iter()
329 .map(|role_name| role_name.id)
330 .collect(),
331 member_ids: member_names
332 .into_iter()
333 .map(|member_name| member_name.id)
334 .collect(),
335 grantor_id,
336 }))
337}
338
339pub fn describe_grant_privileges(
340 _: &StatementContext,
341 _: GrantPrivilegesStatement<Aug>,
342) -> Result<StatementDesc, PlanError> {
343 Ok(StatementDesc::new(None))
344}
345
346pub fn plan_grant_privileges(
347 scx: &StatementContext,
348 GrantPrivilegesStatement {
349 privileges,
350 target,
351 roles,
352 }: GrantPrivilegesStatement<Aug>,
353) -> Result<Plan, PlanError> {
354 let plan = plan_update_privilege(scx, privileges, target, roles)?;
355 Ok(Plan::GrantPrivileges(plan.into()))
356}
357
358pub fn describe_revoke_privileges(
359 _: &StatementContext,
360 _: RevokePrivilegesStatement<Aug>,
361) -> Result<StatementDesc, PlanError> {
362 Ok(StatementDesc::new(None))
363}
364
365pub fn plan_revoke_privileges(
366 scx: &StatementContext,
367 RevokePrivilegesStatement {
368 privileges,
369 target,
370 roles,
371 }: RevokePrivilegesStatement<Aug>,
372) -> Result<Plan, PlanError> {
373 let plan = plan_update_privilege(scx, privileges, target, roles)?;
374 Ok(Plan::RevokePrivileges(plan.into()))
375}
376
377struct UpdatePrivilegesPlan {
378 update_privileges: Vec<UpdatePrivilege>,
379 grantees: Vec<RoleId>,
380}
381
382impl From<UpdatePrivilegesPlan> for GrantPrivilegesPlan {
383 fn from(
384 UpdatePrivilegesPlan {
385 update_privileges,
386 grantees,
387 }: UpdatePrivilegesPlan,
388 ) -> GrantPrivilegesPlan {
389 GrantPrivilegesPlan {
390 update_privileges,
391 grantees,
392 }
393 }
394}
395
396impl From<UpdatePrivilegesPlan> for RevokePrivilegesPlan {
397 fn from(
398 UpdatePrivilegesPlan {
399 update_privileges,
400 grantees,
401 }: UpdatePrivilegesPlan,
402 ) -> RevokePrivilegesPlan {
403 RevokePrivilegesPlan {
404 update_privileges,
405 revokees: grantees,
406 }
407 }
408}
409
410fn plan_update_privilege(
411 scx: &StatementContext,
412 privileges: PrivilegeSpecification,
413 target: GrantTargetSpecification<Aug>,
414 roles: Vec<ResolvedRoleName>,
415) -> Result<UpdatePrivilegesPlan, PlanError> {
416 let (object_type, target_ids) = match target {
417 GrantTargetSpecification::Object {
418 object_type,
419 object_spec_inner,
420 } => {
421 fn object_type_filter(
422 object_id: &ObjectId,
423 object_type: &ObjectType,
424 scx: &StatementContext,
425 ) -> bool {
426 if object_type == &ObjectType::Table {
427 scx.get_object_type(object_id).is_relation()
428 } else {
429 object_type == &scx.get_object_type(object_id)
430 }
431 }
432 let object_type = object_type.into();
433 let object_ids: Vec<ObjectId> = match object_spec_inner {
434 GrantTargetSpecificationInner::All(GrantTargetAllSpecification::All) => {
435 let cluster_ids = scx
436 .catalog
437 .get_clusters()
438 .into_iter()
439 .map(|cluster| cluster.id().into());
440 let database_ids = scx
441 .catalog
442 .get_databases()
443 .into_iter()
444 .map(|database| database.id().into());
445 let schema_ids = scx
446 .catalog
447 .get_schemas()
448 .into_iter()
449 .filter(|schema| !schema.id().is_temporary())
450 .map(|schema| (schema.database().clone(), schema.id().clone()).into());
451 let item_ids = scx
452 .catalog
453 .get_items()
454 .into_iter()
455 .map(|item| item.id().into());
456 cluster_ids
457 .chain(database_ids)
458 .chain(schema_ids)
459 .chain(item_ids)
460 .filter(|object_id| object_type_filter(object_id, &object_type, scx))
461 .filter(|object_id| object_id.is_user())
462 .collect()
463 }
464 GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllDatabases {
465 databases,
466 }) => {
467 let schema_ids = databases
468 .iter()
469 .map(|database| scx.get_database(database.database_id()))
470 .flat_map(|database| database.schemas().into_iter())
471 .filter(|schema| !schema.id().is_temporary())
472 .map(|schema| (schema.database().clone(), schema.id().clone()).into());
473
474 let item_ids = databases
475 .iter()
476 .map(|database| scx.get_database(database.database_id()))
477 .flat_map(|database| database.schemas().into_iter())
478 .flat_map(|schema| schema.item_ids())
479 .map(|item_id| item_id.into());
480
481 item_ids
482 .chain(schema_ids)
483 .filter(|object_id| object_type_filter(object_id, &object_type, scx))
484 .collect()
485 }
486 GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllSchemas {
487 schemas,
488 }) => schemas
489 .into_iter()
490 .map(|schema| scx.get_schema(schema.database_spec(), schema.schema_spec()))
491 .flat_map(|schema| schema.item_ids())
492 .map(|item_id| item_id.into())
493 .filter(|object_id| object_type_filter(object_id, &object_type, scx))
494 .collect(),
495 GrantTargetSpecificationInner::Objects { names } => names
496 .into_iter()
497 .map(|name| {
498 name.try_into()
499 .expect("name resolution should handle invalid objects")
500 })
501 .collect(),
502 };
503 let target_ids = object_ids.into_iter().map(|id| id.into()).collect();
504 (SystemObjectType::Object(object_type), target_ids)
505 }
506 GrantTargetSpecification::System => {
507 (SystemObjectType::System, vec![SystemObjectId::System])
508 }
509 };
510
511 let mut update_privileges = Vec::with_capacity(target_ids.len());
512
513 for target_id in target_ids {
514 let actual_object_type = scx.get_system_object_type(&target_id);
516 let mut reference_object_type = actual_object_type.clone();
519
520 let acl_mode = privilege_spec_to_acl_mode(scx, &privileges, actual_object_type);
521
522 if let SystemObjectId::Object(ObjectId::Item(id)) = &target_id {
523 let item = scx.get_item(id);
524 let item_type: ObjectType = item.item_type().into();
525 if (item_type == ObjectType::View
526 || item_type == ObjectType::MaterializedView
527 || item_type == ObjectType::Source)
528 && object_type == SystemObjectType::Object(ObjectType::Table)
529 {
530 reference_object_type = SystemObjectType::Object(ObjectType::Table);
532 } else if SystemObjectType::Object(item_type) != object_type {
533 let object_name = scx.catalog.resolve_full_name(item.name()).to_string();
534 return Err(PlanError::InvalidObjectType {
535 expected_type: object_type,
536 actual_type: actual_object_type,
537 object_name,
538 });
539 }
540 }
541
542 let all_object_privileges = scx.catalog.all_object_privileges(reference_object_type);
543 let invalid_privileges = acl_mode.difference(all_object_privileges);
544 if !invalid_privileges.is_empty() {
545 let object_description =
546 ErrorMessageObjectDescription::from_sys_id(&target_id, scx.catalog);
547 return Err(PlanError::InvalidPrivilegeTypes {
548 invalid_privileges,
549 object_description,
550 });
551 }
552
553 let grantor = match &target_id {
560 SystemObjectId::Object(object_id) => scx
561 .catalog
562 .get_owner_id(object_id)
563 .expect("cannot revoke privileges on objects without owners"),
564 SystemObjectId::System => scx.catalog.mz_system_role_id(),
565 };
566
567 update_privileges.push(UpdatePrivilege {
568 acl_mode,
569 target_id,
570 grantor,
571 });
572 }
573
574 let grantees = roles.into_iter().map(|role| role.id).collect();
575
576 Ok(UpdatePrivilegesPlan {
577 update_privileges,
578 grantees,
579 })
580}
581
582fn privilege_spec_to_acl_mode(
583 scx: &StatementContext,
584 privilege_spec: &PrivilegeSpecification,
585 object_type: SystemObjectType,
586) -> AclMode {
587 match privilege_spec {
588 PrivilegeSpecification::All => scx.catalog.all_object_privileges(object_type),
589 PrivilegeSpecification::Privileges(privileges) => privileges
590 .into_iter()
591 .map(|privilege| privilege_to_acl_mode(privilege.clone()))
592 .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode)),
594 }
595}
596
597fn privilege_to_acl_mode(privilege: Privilege) -> AclMode {
598 match privilege {
599 Privilege::SELECT => AclMode::SELECT,
600 Privilege::INSERT => AclMode::INSERT,
601 Privilege::UPDATE => AclMode::UPDATE,
602 Privilege::DELETE => AclMode::DELETE,
603 Privilege::USAGE => AclMode::USAGE,
604 Privilege::CREATE => AclMode::CREATE,
605 Privilege::CREATEROLE => AclMode::CREATE_ROLE,
606 Privilege::CREATEDB => AclMode::CREATE_DB,
607 Privilege::CREATECLUSTER => AclMode::CREATE_CLUSTER,
608 Privilege::CREATENETWORKPOLICY => AclMode::CREATE_NETWORK_POLICY,
609 }
610}
611
612pub fn describe_alter_default_privileges(
613 _: &StatementContext,
614 _: AlterDefaultPrivilegesStatement<Aug>,
615) -> Result<StatementDesc, PlanError> {
616 Ok(StatementDesc::new(None))
617}
618
619pub fn plan_alter_default_privileges(
620 scx: &StatementContext,
621 AlterDefaultPrivilegesStatement {
622 target_roles,
623 target_objects,
624 grant_or_revoke,
625 }: AlterDefaultPrivilegesStatement<Aug>,
626) -> Result<Plan, PlanError> {
627 let object_type: ObjectType = (*grant_or_revoke.object_type()).into();
628 match object_type {
629 ObjectType::View
630 | ObjectType::MaterializedView
631 | ObjectType::Source
632 | ObjectType::ContinualTask => sql_bail!(
633 "{object_type}S is not valid for ALTER DEFAULT PRIVILEGES, use TABLES instead"
634 ),
635 ObjectType::Sink | ObjectType::ClusterReplica | ObjectType::Role | ObjectType::Func => {
636 sql_bail!("{object_type}S do not have privileges")
637 }
638 ObjectType::Cluster | ObjectType::Database
639 if matches!(
640 target_objects,
641 GrantTargetAllSpecification::AllDatabases { .. }
642 ) =>
643 {
644 sql_bail!("cannot specify {object_type}S and IN DATABASE")
645 }
646
647 ObjectType::Cluster | ObjectType::Database | ObjectType::Schema
648 if matches!(
649 target_objects,
650 GrantTargetAllSpecification::AllSchemas { .. }
651 ) =>
652 {
653 sql_bail!("cannot specify {object_type}S and IN SCHEMA")
654 }
655 ObjectType::Table
656 | ObjectType::Index
657 | ObjectType::Type
658 | ObjectType::Secret
659 | ObjectType::Connection
660 | ObjectType::Cluster
661 | ObjectType::Database
662 | ObjectType::Schema
663 | ObjectType::NetworkPolicy => {}
664 }
665
666 let acl_mode = privilege_spec_to_acl_mode(
667 scx,
668 grant_or_revoke.privileges(),
669 SystemObjectType::Object(object_type),
670 );
671 let all_object_privileges = scx
672 .catalog
673 .all_object_privileges(SystemObjectType::Object(object_type));
674 let invalid_privileges = acl_mode.difference(all_object_privileges);
675 if !invalid_privileges.is_empty() {
676 let object_description =
677 ErrorMessageObjectDescription::from_object_type(SystemObjectType::Object(object_type));
678 return Err(PlanError::InvalidPrivilegeTypes {
679 invalid_privileges,
680 object_description,
681 });
682 }
683
684 let target_roles = match target_roles {
685 TargetRoleSpecification::Roles(roles) => roles.into_iter().map(|role| role.id).collect(),
686 TargetRoleSpecification::AllRoles => vec![RoleId::Public],
687 };
688 let mut privilege_objects = Vec::with_capacity(target_roles.len() * target_objects.len());
689 for target_role in target_roles {
690 match &target_objects {
691 GrantTargetAllSpecification::All => privilege_objects.push(DefaultPrivilegeObject {
692 role_id: target_role,
693 database_id: None,
694 schema_id: None,
695 object_type,
696 }),
697 GrantTargetAllSpecification::AllDatabases { databases } => {
698 for database in databases {
699 privilege_objects.push(DefaultPrivilegeObject {
700 role_id: target_role,
701 database_id: Some(*database.database_id()),
702 schema_id: None,
703 object_type,
704 });
705 }
706 }
707 GrantTargetAllSpecification::AllSchemas { schemas } => {
708 for schema in schemas {
709 privilege_objects.push(DefaultPrivilegeObject {
710 role_id: target_role,
711 database_id: schema.database_spec().id(),
712 schema_id: Some(schema.schema_spec().into()),
713 object_type,
714 });
715 }
716 }
717 }
718 }
719
720 let privilege_acl_items = grant_or_revoke
721 .roles()
722 .into_iter()
723 .map(|grantee| DefaultPrivilegeAclItem {
724 grantee: grantee.id,
725 acl_mode,
726 })
727 .collect();
728
729 let is_grant = match grant_or_revoke {
730 AbbreviatedGrantOrRevokeStatement::Grant(_) => true,
731 AbbreviatedGrantOrRevokeStatement::Revoke(_) => false,
732 };
733
734 Ok(Plan::AlterDefaultPrivileges(AlterDefaultPrivilegesPlan {
735 privilege_objects,
736 privilege_acl_items,
737 is_grant,
738 }))
739}
740
741pub fn describe_reassign_owned(
742 _: &StatementContext,
743 _: ReassignOwnedStatement<Aug>,
744) -> Result<StatementDesc, PlanError> {
745 Ok(StatementDesc::new(None))
746}
747
748pub fn plan_reassign_owned(
749 scx: &StatementContext,
750 ReassignOwnedStatement {
751 old_roles,
752 new_role,
753 }: ReassignOwnedStatement<Aug>,
754) -> Result<Plan, PlanError> {
755 let old_roles: BTreeSet<_> = old_roles.into_iter().map(|role| role.id).collect();
756 let mut reassign_ids: Vec<ObjectId> = Vec::new();
757
758 for replica in scx.catalog.get_cluster_replicas() {
760 if old_roles.contains(&replica.owner_id()) {
761 reassign_ids.push((replica.cluster_id(), replica.replica_id()).into());
762 }
763 }
764 for cluster in scx.catalog.get_clusters() {
766 if old_roles.contains(&cluster.owner_id()) {
767 reassign_ids.push(cluster.id().into());
768 }
769 }
770 for item in scx.catalog.get_items() {
772 if old_roles.contains(&item.owner_id()) {
773 reassign_ids.push(item.id().into());
774 }
775 }
776 for schema in scx.catalog.get_schemas() {
778 if !schema.id().is_temporary() {
779 if old_roles.contains(&schema.owner_id()) {
780 reassign_ids.push((*schema.database(), *schema.id()).into())
781 }
782 }
783 }
784 for database in scx.catalog.get_databases() {
786 if old_roles.contains(&database.owner_id()) {
787 reassign_ids.push(database.id().into());
788 }
789 }
790
791 let system_ids: Vec<_> = reassign_ids.iter().filter(|id| id.is_system()).collect();
792 if !system_ids.is_empty() {
793 let mut owners = system_ids
794 .into_iter()
795 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
796 .collect::<BTreeSet<_>>()
797 .into_iter()
798 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
799 sql_bail!(
800 "cannot reassign objects owned by role {} because they are required by the database system",
801 owners.join(", "),
802 );
803 }
804
805 Ok(Plan::ReassignOwned(ReassignOwnedPlan {
806 old_roles: old_roles.into_iter().collect(),
807 new_role: new_role.id,
808 reassign_ids,
809 }))
810}