1use std::collections::BTreeSet;
16
17use itertools::Itertools;
18use mz_sql_parser::ast::display::AstDisplay;
19
20use crate::ast::{Ident, UnresolvedDatabaseName};
21use crate::catalog::{
22 DefaultPrivilegeAclItem, DefaultPrivilegeObject, ErrorMessageObjectDescription, ObjectType,
23 SystemObjectType,
24};
25use crate::names::{
26 Aug, ObjectId, ResolvedDatabaseSpecifier, ResolvedRoleName, SchemaSpecifier, SystemObjectId,
27};
28use crate::plan::error::PlanError;
29use crate::plan::statement::ddl::{
30 resolve_cluster, resolve_database, resolve_item_or_type, resolve_network_policy, resolve_schema,
31};
32use crate::plan::statement::{StatementContext, StatementDesc};
33use crate::plan::{
34 AlterDefaultPrivilegesPlan, AlterNoopPlan, AlterOwnerPlan, GrantPrivilegesPlan, GrantRolePlan,
35 Plan, PlanNotice, ReassignOwnedPlan, RevokePrivilegesPlan, RevokeRolePlan, UpdatePrivilege,
36};
37use crate::session::user::SYSTEM_USER;
38use mz_ore::str::StrExt;
39use mz_repr::adt::mz_acl_item::AclMode;
40use mz_repr::role_id::RoleId;
41use mz_sql_parser::ast::{
42 AbbreviatedGrantOrRevokeStatement, AlterDefaultPrivilegesStatement, AlterOwnerStatement,
43 GrantPrivilegesStatement, GrantRoleStatement, GrantTargetAllSpecification,
44 GrantTargetSpecification, GrantTargetSpecificationInner, Privilege, PrivilegeSpecification,
45 ReassignOwnedStatement, RevokePrivilegesStatement, RevokeRoleStatement,
46 TargetRoleSpecification, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName,
47};
48
49pub fn describe_alter_owner(
50 _: &StatementContext,
51 _: AlterOwnerStatement<Aug>,
52) -> Result<StatementDesc, PlanError> {
53 Ok(StatementDesc::new(None))
54}
55
56pub fn plan_alter_owner(
57 scx: &StatementContext,
58 AlterOwnerStatement {
59 object_type,
60 if_exists,
61 name,
62 new_owner,
63 }: AlterOwnerStatement<Aug>,
64) -> Result<Plan, PlanError> {
65 let object_type = object_type.into();
66 match (object_type, name) {
67 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
68 plan_alter_cluster_owner(scx, if_exists, name, new_owner.id)
69 }
70 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(_)) => {
71 bail_never_supported!("altering the owner of a cluster replica");
72 }
73 (ObjectType::Database, UnresolvedObjectName::Database(name)) => {
74 plan_alter_database_owner(scx, if_exists, name, new_owner.id)
75 }
76 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
77 plan_alter_schema_owner(scx, if_exists, name, new_owner.id)
78 }
79 (ObjectType::NetworkPolicy, UnresolvedObjectName::NetworkPolicy(name)) => {
80 plan_alter_network_policy_owner(scx, if_exists, name, new_owner.id)
81 }
82 (ObjectType::Role, UnresolvedObjectName::Role(_)) => unreachable!("rejected by the parser"),
83 (
84 object_type @ ObjectType::Cluster
85 | object_type @ ObjectType::ClusterReplica
86 | object_type @ ObjectType::Database
87 | object_type @ ObjectType::Schema
88 | object_type @ ObjectType::Role,
89 name,
90 )
91 | (
92 object_type,
93 name @ UnresolvedObjectName::Cluster(_)
94 | name @ UnresolvedObjectName::ClusterReplica(_)
95 | name @ UnresolvedObjectName::Database(_)
96 | name @ UnresolvedObjectName::Schema(_)
97 | name @ UnresolvedObjectName::NetworkPolicy(_)
98 | name @ UnresolvedObjectName::Role(_),
99 ) => {
100 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
101 }
102 (object_type, UnresolvedObjectName::Item(name)) => {
103 plan_alter_item_owner(scx, object_type, if_exists, name, new_owner.id)
104 }
105 }
106}
107
108fn plan_alter_cluster_owner(
109 scx: &StatementContext,
110 if_exists: bool,
111 name: Ident,
112 new_owner: RoleId,
113) -> Result<Plan, PlanError> {
114 match resolve_cluster(scx, &name, if_exists)? {
115 Some(cluster) => Ok(Plan::AlterOwner(AlterOwnerPlan {
116 id: ObjectId::Cluster(cluster.id()),
117 object_type: ObjectType::Cluster,
118 new_owner,
119 })),
120 None => {
121 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
122 name: name.to_ast_string_simple(),
123 object_type: ObjectType::Cluster,
124 });
125 Ok(Plan::AlterNoop(AlterNoopPlan {
126 object_type: ObjectType::Cluster,
127 }))
128 }
129 }
130}
131
132fn plan_alter_database_owner(
133 scx: &StatementContext,
134 if_exists: bool,
135 name: UnresolvedDatabaseName,
136 new_owner: RoleId,
137) -> Result<Plan, PlanError> {
138 match resolve_database(scx, &name, if_exists)? {
139 Some(database) => Ok(Plan::AlterOwner(AlterOwnerPlan {
140 id: ObjectId::Database(database.id()),
141 object_type: ObjectType::Database,
142 new_owner,
143 })),
144 None => {
145 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
146 name: name.to_ast_string_simple(),
147 object_type: ObjectType::Database,
148 });
149
150 Ok(Plan::AlterNoop(AlterNoopPlan {
151 object_type: ObjectType::Database,
152 }))
153 }
154 }
155}
156
157fn plan_alter_schema_owner(
158 scx: &StatementContext,
159 if_exists: bool,
160 name: UnresolvedSchemaName,
161 new_owner: RoleId,
162) -> Result<Plan, PlanError> {
163 let normalized = crate::normalize::unresolved_schema_name(name.clone())?;
167 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
168 sql_bail!("cannot alter schema {name} because it is a temporary schema",)
169 }
170
171 match resolve_schema(scx, name.clone(), if_exists)? {
172 Some((database_spec, schema_spec)) => {
173 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
174 sql_bail!(
175 "cannot alter schema {name} because it is required by the database system",
176 );
177 }
178 if let SchemaSpecifier::Temporary = schema_spec {
179 sql_bail!("cannot alter schema {name} because it is a temporary schema",)
180 }
181 Ok(Plan::AlterOwner(AlterOwnerPlan {
182 id: ObjectId::Schema((database_spec, schema_spec)),
183 object_type: ObjectType::Schema,
184 new_owner,
185 }))
186 }
187 None => {
188 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
189 name: name.to_ast_string_simple(),
190 object_type: ObjectType::Schema,
191 });
192
193 Ok(Plan::AlterNoop(AlterNoopPlan {
194 object_type: ObjectType::Schema,
195 }))
196 }
197 }
198}
199
200fn plan_alter_item_owner(
201 scx: &StatementContext,
202 object_type: ObjectType,
203 if_exists: bool,
204 name: UnresolvedItemName,
205 new_owner: RoleId,
206) -> Result<Plan, PlanError> {
207 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
208 Ok(r) => r,
209 Err(PlanError::MismatchedObjectType {
211 name,
212 is_type: ObjectType::MaterializedView,
213 expected_type: ObjectType::View,
214 }) => {
215 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
216 }
217 e => e?,
218 };
219
220 match resolved {
221 Some(item) => {
222 if item.id().is_system() {
223 sql_bail!(
224 "cannot alter item {} because it is required by the database system",
225 scx.catalog.resolve_full_name(item.name()),
226 );
227 }
228
229 Ok(Plan::AlterOwner(AlterOwnerPlan {
230 id: ObjectId::Item(item.id()),
231 object_type,
232 new_owner,
233 }))
234 }
235 None => {
236 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
237 name: name.to_ast_string_simple(),
238 object_type,
239 });
240
241 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
242 }
243 }
244}
245
246fn plan_alter_network_policy_owner(
247 scx: &StatementContext,
248 if_exists: bool,
249 name: Ident,
250 new_owner: RoleId,
251) -> Result<Plan, PlanError> {
252 match resolve_network_policy(scx, name.clone(), if_exists)? {
253 Some(policy_id) => Ok(Plan::AlterOwner(AlterOwnerPlan {
254 id: ObjectId::NetworkPolicy(policy_id.id),
255 object_type: ObjectType::Schema,
256 new_owner,
257 })),
258 None => {
259 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
260 name: name.to_ast_string_simple(),
261 object_type: ObjectType::NetworkPolicy,
262 });
263
264 Ok(Plan::AlterNoop(AlterNoopPlan {
265 object_type: ObjectType::NetworkPolicy,
266 }))
267 }
268 }
269}
270
271pub fn describe_grant_role(
272 _: &StatementContext,
273 _: GrantRoleStatement<Aug>,
274) -> Result<StatementDesc, PlanError> {
275 Ok(StatementDesc::new(None))
276}
277
278pub fn plan_grant_role(
279 scx: &StatementContext,
280 GrantRoleStatement {
281 role_names,
282 member_names,
283 }: GrantRoleStatement<Aug>,
284) -> Result<Plan, PlanError> {
285 let grantor_id = scx
291 .catalog
292 .resolve_role(&SYSTEM_USER.name)
293 .expect("system user must exist")
294 .id();
295 Ok(Plan::GrantRole(GrantRolePlan {
296 role_ids: role_names
297 .into_iter()
298 .map(|role_name| role_name.id)
299 .collect(),
300 member_ids: member_names
301 .into_iter()
302 .map(|member_name| member_name.id)
303 .collect(),
304 grantor_id,
305 }))
306}
307
308pub fn describe_revoke_role(
309 _: &StatementContext,
310 _: RevokeRoleStatement<Aug>,
311) -> Result<StatementDesc, PlanError> {
312 Ok(StatementDesc::new(None))
313}
314
315pub fn plan_revoke_role(
316 scx: &StatementContext,
317 RevokeRoleStatement {
318 role_names,
319 member_names,
320 }: RevokeRoleStatement<Aug>,
321) -> Result<Plan, PlanError> {
322 let grantor_id = scx
330 .catalog
331 .resolve_role(&SYSTEM_USER.name)
332 .expect("system user must exist")
333 .id();
334 Ok(Plan::RevokeRole(RevokeRolePlan {
335 role_ids: role_names
336 .into_iter()
337 .map(|role_name| role_name.id)
338 .collect(),
339 member_ids: member_names
340 .into_iter()
341 .map(|member_name| member_name.id)
342 .collect(),
343 grantor_id,
344 }))
345}
346
347pub fn describe_grant_privileges(
348 _: &StatementContext,
349 _: GrantPrivilegesStatement<Aug>,
350) -> Result<StatementDesc, PlanError> {
351 Ok(StatementDesc::new(None))
352}
353
354pub fn plan_grant_privileges(
355 scx: &StatementContext,
356 GrantPrivilegesStatement {
357 privileges,
358 target,
359 roles,
360 }: GrantPrivilegesStatement<Aug>,
361) -> Result<Plan, PlanError> {
362 let plan = plan_update_privilege(scx, privileges, target, roles)?;
363 Ok(Plan::GrantPrivileges(plan.into()))
364}
365
366pub fn describe_revoke_privileges(
367 _: &StatementContext,
368 _: RevokePrivilegesStatement<Aug>,
369) -> Result<StatementDesc, PlanError> {
370 Ok(StatementDesc::new(None))
371}
372
373pub fn plan_revoke_privileges(
374 scx: &StatementContext,
375 RevokePrivilegesStatement {
376 privileges,
377 target,
378 roles,
379 }: RevokePrivilegesStatement<Aug>,
380) -> Result<Plan, PlanError> {
381 let plan = plan_update_privilege(scx, privileges, target, roles)?;
382 Ok(Plan::RevokePrivileges(plan.into()))
383}
384
385struct UpdatePrivilegesPlan {
386 update_privileges: Vec<UpdatePrivilege>,
387 grantees: Vec<RoleId>,
388}
389
390impl From<UpdatePrivilegesPlan> for GrantPrivilegesPlan {
391 fn from(
392 UpdatePrivilegesPlan {
393 update_privileges,
394 grantees,
395 }: UpdatePrivilegesPlan,
396 ) -> GrantPrivilegesPlan {
397 GrantPrivilegesPlan {
398 update_privileges,
399 grantees,
400 }
401 }
402}
403
404impl From<UpdatePrivilegesPlan> for RevokePrivilegesPlan {
405 fn from(
406 UpdatePrivilegesPlan {
407 update_privileges,
408 grantees,
409 }: UpdatePrivilegesPlan,
410 ) -> RevokePrivilegesPlan {
411 RevokePrivilegesPlan {
412 update_privileges,
413 revokees: grantees,
414 }
415 }
416}
417
418fn plan_update_privilege(
419 scx: &StatementContext,
420 privileges: PrivilegeSpecification,
421 target: GrantTargetSpecification<Aug>,
422 roles: Vec<ResolvedRoleName>,
423) -> Result<UpdatePrivilegesPlan, PlanError> {
424 let (object_type, target_ids) = match target {
425 GrantTargetSpecification::Object {
426 object_type,
427 object_spec_inner,
428 } => {
429 fn object_type_filter(
430 object_id: &ObjectId,
431 object_type: &ObjectType,
432 scx: &StatementContext,
433 ) -> bool {
434 if object_type == &ObjectType::Table {
435 scx.get_object_type(object_id).is_relation()
436 } else {
437 object_type == &scx.get_object_type(object_id)
438 }
439 }
440 let object_type = object_type.into();
441 let object_ids: Vec<ObjectId> = match object_spec_inner {
442 GrantTargetSpecificationInner::All(GrantTargetAllSpecification::All) => {
443 let cluster_ids = scx
444 .catalog
445 .get_clusters()
446 .into_iter()
447 .map(|cluster| cluster.id().into());
448 let database_ids = scx
449 .catalog
450 .get_databases()
451 .into_iter()
452 .map(|database| database.id().into());
453 let schema_ids = scx
454 .catalog
455 .get_schemas()
456 .into_iter()
457 .filter(|schema| !schema.id().is_temporary())
458 .map(|schema| (schema.database().clone(), schema.id().clone()).into());
459 let item_ids = scx
460 .catalog
461 .get_items()
462 .into_iter()
463 .map(|item| item.id().into());
464 cluster_ids
465 .chain(database_ids)
466 .chain(schema_ids)
467 .chain(item_ids)
468 .filter(|object_id| object_type_filter(object_id, &object_type, scx))
469 .filter(|object_id| object_id.is_user())
470 .collect()
471 }
472 GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllDatabases {
473 databases,
474 }) => {
475 let schema_ids = databases
476 .iter()
477 .map(|database| scx.get_database(database.database_id()))
478 .flat_map(|database| database.schemas().into_iter())
479 .filter(|schema| !schema.id().is_temporary())
480 .map(|schema| (schema.database().clone(), schema.id().clone()).into());
481
482 let item_ids = databases
483 .iter()
484 .map(|database| scx.get_database(database.database_id()))
485 .flat_map(|database| database.schemas().into_iter())
486 .flat_map(|schema| schema.item_ids())
487 .map(|item_id| item_id.into());
488
489 item_ids
490 .chain(schema_ids)
491 .filter(|object_id| object_type_filter(object_id, &object_type, scx))
492 .collect()
493 }
494 GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllSchemas {
495 schemas,
496 }) => schemas
497 .into_iter()
498 .map(|schema| scx.get_schema(schema.database_spec(), schema.schema_spec()))
499 .flat_map(|schema| schema.item_ids())
500 .map(|item_id| item_id.into())
501 .filter(|object_id| object_type_filter(object_id, &object_type, scx))
502 .collect(),
503 GrantTargetSpecificationInner::Objects { names } => names
504 .into_iter()
505 .map(|name| {
506 name.try_into()
507 .expect("name resolution should handle invalid objects")
508 })
509 .collect(),
510 };
511 let target_ids = object_ids.into_iter().map(|id| id.into()).collect();
512 (SystemObjectType::Object(object_type), target_ids)
513 }
514 GrantTargetSpecification::System => {
515 (SystemObjectType::System, vec![SystemObjectId::System])
516 }
517 };
518
519 let mut update_privileges = Vec::with_capacity(target_ids.len());
520
521 for target_id in target_ids {
522 if let SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary))) =
526 &target_id
527 {
528 sql_bail!(
529 "cannot grant or revoke privileges on schema {} because it is a temporary schema",
530 mz_repr::namespaces::MZ_TEMP_SCHEMA
531 );
532 }
533
534 let actual_object_type = scx.get_system_object_type(&target_id);
536 let mut reference_object_type = actual_object_type.clone();
539
540 let acl_mode = privilege_spec_to_acl_mode(scx, &privileges, actual_object_type);
541
542 if let SystemObjectId::Object(ObjectId::Item(id)) = &target_id {
543 let item = scx.get_item(id);
544 let item_type: ObjectType = item.item_type().into();
545 if (item_type == ObjectType::View
546 || item_type == ObjectType::MaterializedView
547 || item_type == ObjectType::Source)
548 && object_type == SystemObjectType::Object(ObjectType::Table)
549 {
550 reference_object_type = SystemObjectType::Object(ObjectType::Table);
552 } else if SystemObjectType::Object(item_type) != object_type {
553 let object_name = scx.catalog.resolve_full_name(item.name()).to_string();
554 return Err(PlanError::InvalidObjectType {
555 expected_type: object_type,
556 actual_type: actual_object_type,
557 object_name,
558 });
559 }
560 }
561
562 let all_object_privileges = scx.catalog.all_object_privileges(reference_object_type);
563 let invalid_privileges = acl_mode.difference(all_object_privileges);
564 if !invalid_privileges.is_empty() {
565 let object_description =
566 ErrorMessageObjectDescription::from_sys_id(&target_id, scx.catalog);
567 return Err(PlanError::InvalidPrivilegeTypes {
568 invalid_privileges,
569 object_description,
570 });
571 }
572
573 let grantor = match &target_id {
580 SystemObjectId::Object(object_id) => scx
581 .catalog
582 .get_owner_id(object_id)
583 .expect("cannot revoke privileges on objects without owners"),
584 SystemObjectId::System => scx.catalog.mz_system_role_id(),
585 };
586
587 update_privileges.push(UpdatePrivilege {
588 acl_mode,
589 target_id,
590 grantor,
591 });
592 }
593
594 let grantees = roles.into_iter().map(|role| role.id).collect();
595
596 Ok(UpdatePrivilegesPlan {
597 update_privileges,
598 grantees,
599 })
600}
601
602fn privilege_spec_to_acl_mode(
603 scx: &StatementContext,
604 privilege_spec: &PrivilegeSpecification,
605 object_type: SystemObjectType,
606) -> AclMode {
607 match privilege_spec {
608 PrivilegeSpecification::All => scx.catalog.all_object_privileges(object_type),
609 PrivilegeSpecification::Privileges(privileges) => privileges
610 .into_iter()
611 .map(|privilege| privilege_to_acl_mode(privilege.clone()))
612 .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode)),
614 }
615}
616
617fn privilege_to_acl_mode(privilege: Privilege) -> AclMode {
618 match privilege {
619 Privilege::SELECT => AclMode::SELECT,
620 Privilege::INSERT => AclMode::INSERT,
621 Privilege::UPDATE => AclMode::UPDATE,
622 Privilege::DELETE => AclMode::DELETE,
623 Privilege::USAGE => AclMode::USAGE,
624 Privilege::CREATE => AclMode::CREATE,
625 Privilege::CREATEROLE => AclMode::CREATE_ROLE,
626 Privilege::CREATEDB => AclMode::CREATE_DB,
627 Privilege::CREATECLUSTER => AclMode::CREATE_CLUSTER,
628 Privilege::CREATENETWORKPOLICY => AclMode::CREATE_NETWORK_POLICY,
629 }
630}
631
632pub fn describe_alter_default_privileges(
633 _: &StatementContext,
634 _: AlterDefaultPrivilegesStatement<Aug>,
635) -> Result<StatementDesc, PlanError> {
636 Ok(StatementDesc::new(None))
637}
638
639pub fn plan_alter_default_privileges(
640 scx: &StatementContext,
641 AlterDefaultPrivilegesStatement {
642 target_roles,
643 target_objects,
644 grant_or_revoke,
645 }: AlterDefaultPrivilegesStatement<Aug>,
646) -> Result<Plan, PlanError> {
647 let object_type: ObjectType = (*grant_or_revoke.object_type()).into();
648 match object_type {
649 ObjectType::View
650 | ObjectType::MaterializedView
651 | ObjectType::Source
652 | ObjectType::ContinualTask => sql_bail!(
653 "{object_type}S is not valid for ALTER DEFAULT PRIVILEGES, use TABLES instead"
654 ),
655 ObjectType::Sink | ObjectType::ClusterReplica | ObjectType::Role | ObjectType::Func => {
656 sql_bail!("{object_type}S do not have privileges")
657 }
658 ObjectType::Cluster | ObjectType::Database
659 if matches!(
660 target_objects,
661 GrantTargetAllSpecification::AllDatabases { .. }
662 ) =>
663 {
664 sql_bail!("cannot specify {object_type}S and IN DATABASE")
665 }
666
667 ObjectType::Cluster | ObjectType::Database | ObjectType::Schema
668 if matches!(
669 target_objects,
670 GrantTargetAllSpecification::AllSchemas { .. }
671 ) =>
672 {
673 sql_bail!("cannot specify {object_type}S and IN SCHEMA")
674 }
675 ObjectType::Table
676 | ObjectType::Index
677 | ObjectType::Type
678 | ObjectType::Secret
679 | ObjectType::Connection
680 | ObjectType::Cluster
681 | ObjectType::Database
682 | ObjectType::Schema
683 | ObjectType::NetworkPolicy => {}
684 }
685
686 let acl_mode = privilege_spec_to_acl_mode(
687 scx,
688 grant_or_revoke.privileges(),
689 SystemObjectType::Object(object_type),
690 );
691 let all_object_privileges = scx
692 .catalog
693 .all_object_privileges(SystemObjectType::Object(object_type));
694 let invalid_privileges = acl_mode.difference(all_object_privileges);
695 if !invalid_privileges.is_empty() {
696 let object_description =
697 ErrorMessageObjectDescription::from_object_type(SystemObjectType::Object(object_type));
698 return Err(PlanError::InvalidPrivilegeTypes {
699 invalid_privileges,
700 object_description,
701 });
702 }
703
704 let target_roles = match target_roles {
705 TargetRoleSpecification::Roles(roles) => roles.into_iter().map(|role| role.id).collect(),
706 TargetRoleSpecification::AllRoles => vec![RoleId::Public],
707 };
708 let mut privilege_objects = Vec::with_capacity(target_roles.len() * target_objects.len());
709 for target_role in target_roles {
710 match &target_objects {
711 GrantTargetAllSpecification::All => privilege_objects.push(DefaultPrivilegeObject {
712 role_id: target_role,
713 database_id: None,
714 schema_id: None,
715 object_type,
716 }),
717 GrantTargetAllSpecification::AllDatabases { databases } => {
718 for database in databases {
719 privilege_objects.push(DefaultPrivilegeObject {
720 role_id: target_role,
721 database_id: Some(*database.database_id()),
722 schema_id: None,
723 object_type,
724 });
725 }
726 }
727 GrantTargetAllSpecification::AllSchemas { schemas } => {
728 for schema in schemas {
729 privilege_objects.push(DefaultPrivilegeObject {
730 role_id: target_role,
731 database_id: schema.database_spec().id(),
732 schema_id: Some(schema.schema_spec().into()),
733 object_type,
734 });
735 }
736 }
737 }
738 }
739
740 let privilege_acl_items = grant_or_revoke
741 .roles()
742 .into_iter()
743 .map(|grantee| DefaultPrivilegeAclItem {
744 grantee: grantee.id,
745 acl_mode,
746 })
747 .collect();
748
749 let is_grant = match grant_or_revoke {
750 AbbreviatedGrantOrRevokeStatement::Grant(_) => true,
751 AbbreviatedGrantOrRevokeStatement::Revoke(_) => false,
752 };
753
754 Ok(Plan::AlterDefaultPrivileges(AlterDefaultPrivilegesPlan {
755 privilege_objects,
756 privilege_acl_items,
757 is_grant,
758 }))
759}
760
761pub fn describe_reassign_owned(
762 _: &StatementContext,
763 _: ReassignOwnedStatement<Aug>,
764) -> Result<StatementDesc, PlanError> {
765 Ok(StatementDesc::new(None))
766}
767
768pub fn plan_reassign_owned(
769 scx: &StatementContext,
770 ReassignOwnedStatement {
771 old_roles,
772 new_role,
773 }: ReassignOwnedStatement<Aug>,
774) -> Result<Plan, PlanError> {
775 let old_roles: BTreeSet<_> = old_roles.into_iter().map(|role| role.id).collect();
776 let mut reassign_ids: Vec<ObjectId> = Vec::new();
777
778 for replica in scx.catalog.get_cluster_replicas() {
780 if old_roles.contains(&replica.owner_id()) {
781 reassign_ids.push((replica.cluster_id(), replica.replica_id()).into());
782 }
783 }
784 for cluster in scx.catalog.get_clusters() {
786 if old_roles.contains(&cluster.owner_id()) {
787 reassign_ids.push(cluster.id().into());
788 }
789 }
790 for item in scx.catalog.get_items() {
792 if old_roles.contains(&item.owner_id()) {
793 reassign_ids.push(item.id().into());
794 }
795 }
796 for schema in scx.catalog.get_schemas() {
798 if !schema.id().is_temporary() {
799 if old_roles.contains(&schema.owner_id()) {
800 reassign_ids.push((*schema.database(), *schema.id()).into())
801 }
802 }
803 }
804 for database in scx.catalog.get_databases() {
806 if old_roles.contains(&database.owner_id()) {
807 reassign_ids.push(database.id().into());
808 }
809 }
810
811 let system_ids: Vec<_> = reassign_ids.iter().filter(|id| id.is_system()).collect();
812 if !system_ids.is_empty() {
813 let mut owners = system_ids
814 .into_iter()
815 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
816 .collect::<BTreeSet<_>>()
817 .into_iter()
818 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
819 sql_bail!(
820 "cannot reassign objects owned by role {} because they are required by the database system",
821 owners.join(", "),
822 );
823 }
824
825 Ok(Plan::ReassignOwned(ReassignOwnedPlan {
826 old_roles: old_roles.into_iter().collect(),
827 new_role: new_role.id,
828 reassign_ids,
829 }))
830}