mz_sql/plan/statement/
acl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Access control list (ACL).
11//!
12//! This module houses the handlers for statements that modify privileges in the catalog, like
13//! `GRANT`, `REVOKE`, and `REASSIGN OWNED`.
14
15use std::collections::BTreeSet;
16
17use itertools::Itertools;
18use mz_sql_parser::ast::display::AstDisplay;
19
20use crate::ast::{Ident, UnresolvedDatabaseName};
21use crate::catalog::{
22    DefaultPrivilegeAclItem, DefaultPrivilegeObject, ErrorMessageObjectDescription, ObjectType,
23    SystemObjectType,
24};
25use crate::names::{
26    Aug, ObjectId, ResolvedDatabaseSpecifier, ResolvedRoleName, SchemaSpecifier, SystemObjectId,
27};
28use crate::plan::error::PlanError;
29use crate::plan::statement::ddl::{
30    resolve_cluster, resolve_database, resolve_item_or_type, resolve_network_policy, resolve_schema,
31};
32use crate::plan::statement::{StatementContext, StatementDesc};
33use crate::plan::{
34    AlterDefaultPrivilegesPlan, AlterNoopPlan, AlterOwnerPlan, GrantPrivilegesPlan, GrantRolePlan,
35    Plan, PlanNotice, ReassignOwnedPlan, RevokePrivilegesPlan, RevokeRolePlan, UpdatePrivilege,
36};
37use crate::session::user::SYSTEM_USER;
38use mz_ore::str::StrExt;
39use mz_repr::adt::mz_acl_item::AclMode;
40use mz_repr::role_id::RoleId;
41use mz_sql_parser::ast::{
42    AbbreviatedGrantOrRevokeStatement, AlterDefaultPrivilegesStatement, AlterOwnerStatement,
43    GrantPrivilegesStatement, GrantRoleStatement, GrantTargetAllSpecification,
44    GrantTargetSpecification, GrantTargetSpecificationInner, Privilege, PrivilegeSpecification,
45    ReassignOwnedStatement, RevokePrivilegesStatement, RevokeRoleStatement,
46    TargetRoleSpecification, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName,
47};
48
49pub fn describe_alter_owner(
50    _: &StatementContext,
51    _: AlterOwnerStatement<Aug>,
52) -> Result<StatementDesc, PlanError> {
53    Ok(StatementDesc::new(None))
54}
55
56pub fn plan_alter_owner(
57    scx: &StatementContext,
58    AlterOwnerStatement {
59        object_type,
60        if_exists,
61        name,
62        new_owner,
63    }: AlterOwnerStatement<Aug>,
64) -> Result<Plan, PlanError> {
65    let object_type = object_type.into();
66    match (object_type, name) {
67        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
68            plan_alter_cluster_owner(scx, if_exists, name, new_owner.id)
69        }
70        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(_)) => {
71            bail_never_supported!("altering the owner of a cluster replica");
72        }
73        (ObjectType::Database, UnresolvedObjectName::Database(name)) => {
74            plan_alter_database_owner(scx, if_exists, name, new_owner.id)
75        }
76        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
77            plan_alter_schema_owner(scx, if_exists, name, new_owner.id)
78        }
79        (ObjectType::NetworkPolicy, UnresolvedObjectName::NetworkPolicy(name)) => {
80            plan_alter_network_policy_owner(scx, if_exists, name, new_owner.id)
81        }
82        (ObjectType::Role, UnresolvedObjectName::Role(_)) => unreachable!("rejected by the parser"),
83        (
84            object_type @ ObjectType::Cluster
85            | object_type @ ObjectType::ClusterReplica
86            | object_type @ ObjectType::Database
87            | object_type @ ObjectType::Schema
88            | object_type @ ObjectType::Role,
89            name,
90        )
91        | (
92            object_type,
93            name @ UnresolvedObjectName::Cluster(_)
94            | name @ UnresolvedObjectName::ClusterReplica(_)
95            | name @ UnresolvedObjectName::Database(_)
96            | name @ UnresolvedObjectName::Schema(_)
97            | name @ UnresolvedObjectName::NetworkPolicy(_)
98            | name @ UnresolvedObjectName::Role(_),
99        ) => {
100            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
101        }
102        (object_type, UnresolvedObjectName::Item(name)) => {
103            plan_alter_item_owner(scx, object_type, if_exists, name, new_owner.id)
104        }
105    }
106}
107
108fn plan_alter_cluster_owner(
109    scx: &StatementContext,
110    if_exists: bool,
111    name: Ident,
112    new_owner: RoleId,
113) -> Result<Plan, PlanError> {
114    match resolve_cluster(scx, &name, if_exists)? {
115        Some(cluster) => Ok(Plan::AlterOwner(AlterOwnerPlan {
116            id: ObjectId::Cluster(cluster.id()),
117            object_type: ObjectType::Cluster,
118            new_owner,
119        })),
120        None => {
121            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
122                name: name.to_ast_string_simple(),
123                object_type: ObjectType::Cluster,
124            });
125            Ok(Plan::AlterNoop(AlterNoopPlan {
126                object_type: ObjectType::Cluster,
127            }))
128        }
129    }
130}
131
132fn plan_alter_database_owner(
133    scx: &StatementContext,
134    if_exists: bool,
135    name: UnresolvedDatabaseName,
136    new_owner: RoleId,
137) -> Result<Plan, PlanError> {
138    match resolve_database(scx, &name, if_exists)? {
139        Some(database) => Ok(Plan::AlterOwner(AlterOwnerPlan {
140            id: ObjectId::Database(database.id()),
141            object_type: ObjectType::Database,
142            new_owner,
143        })),
144        None => {
145            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
146                name: name.to_ast_string_simple(),
147                object_type: ObjectType::Database,
148            });
149
150            Ok(Plan::AlterNoop(AlterNoopPlan {
151                object_type: ObjectType::Database,
152            }))
153        }
154    }
155}
156
157fn plan_alter_schema_owner(
158    scx: &StatementContext,
159    if_exists: bool,
160    name: UnresolvedSchemaName,
161    new_owner: RoleId,
162) -> Result<Plan, PlanError> {
163    match resolve_schema(scx, name.clone(), if_exists)? {
164        Some((database_spec, schema_spec)) => {
165            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
166                sql_bail!(
167                    "cannot alter schema {name} because it is required by the database system",
168                );
169            }
170            if let SchemaSpecifier::Temporary = schema_spec {
171                sql_bail!("cannot alter schema {name} because it is a temporary schema",)
172            }
173            Ok(Plan::AlterOwner(AlterOwnerPlan {
174                id: ObjectId::Schema((database_spec, schema_spec)),
175                object_type: ObjectType::Schema,
176                new_owner,
177            }))
178        }
179        None => {
180            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
181                name: name.to_ast_string_simple(),
182                object_type: ObjectType::Schema,
183            });
184
185            Ok(Plan::AlterNoop(AlterNoopPlan {
186                object_type: ObjectType::Schema,
187            }))
188        }
189    }
190}
191
192fn plan_alter_item_owner(
193    scx: &StatementContext,
194    object_type: ObjectType,
195    if_exists: bool,
196    name: UnresolvedItemName,
197    new_owner: RoleId,
198) -> Result<Plan, PlanError> {
199    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
200        Ok(r) => r,
201        // Return a more helpful error on `DROP VIEW <materialized-view>`.
202        Err(PlanError::MismatchedObjectType {
203            name,
204            is_type: ObjectType::MaterializedView,
205            expected_type: ObjectType::View,
206        }) => {
207            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
208        }
209        e => e?,
210    };
211
212    match resolved {
213        Some(item) => {
214            if item.id().is_system() {
215                sql_bail!(
216                    "cannot alter item {} because it is required by the database system",
217                    scx.catalog.resolve_full_name(item.name()),
218                );
219            }
220
221            Ok(Plan::AlterOwner(AlterOwnerPlan {
222                id: ObjectId::Item(item.id()),
223                object_type,
224                new_owner,
225            }))
226        }
227        None => {
228            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
229                name: name.to_ast_string_simple(),
230                object_type,
231            });
232
233            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
234        }
235    }
236}
237
238fn plan_alter_network_policy_owner(
239    scx: &StatementContext,
240    if_exists: bool,
241    name: Ident,
242    new_owner: RoleId,
243) -> Result<Plan, PlanError> {
244    match resolve_network_policy(scx, name.clone(), if_exists)? {
245        Some(policy_id) => Ok(Plan::AlterOwner(AlterOwnerPlan {
246            id: ObjectId::NetworkPolicy(policy_id.id),
247            object_type: ObjectType::Schema,
248            new_owner,
249        })),
250        None => {
251            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
252                name: name.to_ast_string_simple(),
253                object_type: ObjectType::NetworkPolicy,
254            });
255
256            Ok(Plan::AlterNoop(AlterNoopPlan {
257                object_type: ObjectType::NetworkPolicy,
258            }))
259        }
260    }
261}
262
263pub fn describe_grant_role(
264    _: &StatementContext,
265    _: GrantRoleStatement<Aug>,
266) -> Result<StatementDesc, PlanError> {
267    Ok(StatementDesc::new(None))
268}
269
270pub fn plan_grant_role(
271    scx: &StatementContext,
272    GrantRoleStatement {
273        role_names,
274        member_names,
275    }: GrantRoleStatement<Aug>,
276) -> Result<Plan, PlanError> {
277    // In PostgreSQL, the grantor must either be a role with ADMIN OPTION on the role being granted,
278    // or the bootstrap superuser. We do not have ADMIN OPTION implemented and 'mz_system' is our
279    // equivalent of the bootstrap superuser. Therefore the grantor is always 'mz_system'.
280    // For more details see:
281    // https://github.com/postgres/postgres/blob/064eb89e83ea0f59426c92906329f1e6c423dfa4/src/backend/commands/user.c#L2180-L2238
282    let grantor_id = scx
283        .catalog
284        .resolve_role(&SYSTEM_USER.name)
285        .expect("system user must exist")
286        .id();
287    Ok(Plan::GrantRole(GrantRolePlan {
288        role_ids: role_names
289            .into_iter()
290            .map(|role_name| role_name.id)
291            .collect(),
292        member_ids: member_names
293            .into_iter()
294            .map(|member_name| member_name.id)
295            .collect(),
296        grantor_id,
297    }))
298}
299
300pub fn describe_revoke_role(
301    _: &StatementContext,
302    _: RevokeRoleStatement<Aug>,
303) -> Result<StatementDesc, PlanError> {
304    Ok(StatementDesc::new(None))
305}
306
307pub fn plan_revoke_role(
308    scx: &StatementContext,
309    RevokeRoleStatement {
310        role_names,
311        member_names,
312    }: RevokeRoleStatement<Aug>,
313) -> Result<Plan, PlanError> {
314    // In PostgreSQL, the same role membership can be granted multiple times by different grantors.
315    // When revoking a role membership, only the membership granted by the specified grantor is
316    // revoked. The grantor must either be a role with ADMIN OPTION on the role being granted,
317    // or the bootstrap superuser. We do not have ADMIN OPTION implemented and 'mz_system' is our
318    // equivalent of the bootstrap superuser. Therefore the grantor is always 'mz_system'.
319    // For more details see:
320    // https://github.com/postgres/postgres/blob/064eb89e83ea0f59426c92906329f1e6c423dfa4/src/backend/commands/user.c#L2180-L2238
321    let grantor_id = scx
322        .catalog
323        .resolve_role(&SYSTEM_USER.name)
324        .expect("system user must exist")
325        .id();
326    Ok(Plan::RevokeRole(RevokeRolePlan {
327        role_ids: role_names
328            .into_iter()
329            .map(|role_name| role_name.id)
330            .collect(),
331        member_ids: member_names
332            .into_iter()
333            .map(|member_name| member_name.id)
334            .collect(),
335        grantor_id,
336    }))
337}
338
339pub fn describe_grant_privileges(
340    _: &StatementContext,
341    _: GrantPrivilegesStatement<Aug>,
342) -> Result<StatementDesc, PlanError> {
343    Ok(StatementDesc::new(None))
344}
345
346pub fn plan_grant_privileges(
347    scx: &StatementContext,
348    GrantPrivilegesStatement {
349        privileges,
350        target,
351        roles,
352    }: GrantPrivilegesStatement<Aug>,
353) -> Result<Plan, PlanError> {
354    let plan = plan_update_privilege(scx, privileges, target, roles)?;
355    Ok(Plan::GrantPrivileges(plan.into()))
356}
357
358pub fn describe_revoke_privileges(
359    _: &StatementContext,
360    _: RevokePrivilegesStatement<Aug>,
361) -> Result<StatementDesc, PlanError> {
362    Ok(StatementDesc::new(None))
363}
364
365pub fn plan_revoke_privileges(
366    scx: &StatementContext,
367    RevokePrivilegesStatement {
368        privileges,
369        target,
370        roles,
371    }: RevokePrivilegesStatement<Aug>,
372) -> Result<Plan, PlanError> {
373    let plan = plan_update_privilege(scx, privileges, target, roles)?;
374    Ok(Plan::RevokePrivileges(plan.into()))
375}
376
377struct UpdatePrivilegesPlan {
378    update_privileges: Vec<UpdatePrivilege>,
379    grantees: Vec<RoleId>,
380}
381
382impl From<UpdatePrivilegesPlan> for GrantPrivilegesPlan {
383    fn from(
384        UpdatePrivilegesPlan {
385            update_privileges,
386            grantees,
387        }: UpdatePrivilegesPlan,
388    ) -> GrantPrivilegesPlan {
389        GrantPrivilegesPlan {
390            update_privileges,
391            grantees,
392        }
393    }
394}
395
396impl From<UpdatePrivilegesPlan> for RevokePrivilegesPlan {
397    fn from(
398        UpdatePrivilegesPlan {
399            update_privileges,
400            grantees,
401        }: UpdatePrivilegesPlan,
402    ) -> RevokePrivilegesPlan {
403        RevokePrivilegesPlan {
404            update_privileges,
405            revokees: grantees,
406        }
407    }
408}
409
410fn plan_update_privilege(
411    scx: &StatementContext,
412    privileges: PrivilegeSpecification,
413    target: GrantTargetSpecification<Aug>,
414    roles: Vec<ResolvedRoleName>,
415) -> Result<UpdatePrivilegesPlan, PlanError> {
416    let (object_type, target_ids) = match target {
417        GrantTargetSpecification::Object {
418            object_type,
419            object_spec_inner,
420        } => {
421            fn object_type_filter(
422                object_id: &ObjectId,
423                object_type: &ObjectType,
424                scx: &StatementContext,
425            ) -> bool {
426                if object_type == &ObjectType::Table {
427                    scx.get_object_type(object_id).is_relation()
428                } else {
429                    object_type == &scx.get_object_type(object_id)
430                }
431            }
432            let object_type = object_type.into();
433            let object_ids: Vec<ObjectId> = match object_spec_inner {
434                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::All) => {
435                    let cluster_ids = scx
436                        .catalog
437                        .get_clusters()
438                        .into_iter()
439                        .map(|cluster| cluster.id().into());
440                    let database_ids = scx
441                        .catalog
442                        .get_databases()
443                        .into_iter()
444                        .map(|database| database.id().into());
445                    let schema_ids = scx
446                        .catalog
447                        .get_schemas()
448                        .into_iter()
449                        .filter(|schema| !schema.id().is_temporary())
450                        .map(|schema| (schema.database().clone(), schema.id().clone()).into());
451                    let item_ids = scx
452                        .catalog
453                        .get_items()
454                        .into_iter()
455                        .map(|item| item.id().into());
456                    cluster_ids
457                        .chain(database_ids)
458                        .chain(schema_ids)
459                        .chain(item_ids)
460                        .filter(|object_id| object_type_filter(object_id, &object_type, scx))
461                        .filter(|object_id| object_id.is_user())
462                        .collect()
463                }
464                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllDatabases {
465                    databases,
466                }) => {
467                    let schema_ids = databases
468                        .iter()
469                        .map(|database| scx.get_database(database.database_id()))
470                        .flat_map(|database| database.schemas().into_iter())
471                        .filter(|schema| !schema.id().is_temporary())
472                        .map(|schema| (schema.database().clone(), schema.id().clone()).into());
473
474                    let item_ids = databases
475                        .iter()
476                        .map(|database| scx.get_database(database.database_id()))
477                        .flat_map(|database| database.schemas().into_iter())
478                        .flat_map(|schema| schema.item_ids())
479                        .map(|item_id| item_id.into());
480
481                    item_ids
482                        .chain(schema_ids)
483                        .filter(|object_id| object_type_filter(object_id, &object_type, scx))
484                        .collect()
485                }
486                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllSchemas {
487                    schemas,
488                }) => schemas
489                    .into_iter()
490                    .map(|schema| scx.get_schema(schema.database_spec(), schema.schema_spec()))
491                    .flat_map(|schema| schema.item_ids())
492                    .map(|item_id| item_id.into())
493                    .filter(|object_id| object_type_filter(object_id, &object_type, scx))
494                    .collect(),
495                GrantTargetSpecificationInner::Objects { names } => names
496                    .into_iter()
497                    .map(|name| {
498                        name.try_into()
499                            .expect("name resolution should handle invalid objects")
500                    })
501                    .collect(),
502            };
503            let target_ids = object_ids.into_iter().map(|id| id.into()).collect();
504            (SystemObjectType::Object(object_type), target_ids)
505        }
506        GrantTargetSpecification::System => {
507            (SystemObjectType::System, vec![SystemObjectId::System])
508        }
509    };
510
511    let mut update_privileges = Vec::with_capacity(target_ids.len());
512
513    for target_id in target_ids {
514        // The actual type of the object.
515        let actual_object_type = scx.get_system_object_type(&target_id);
516        // The type used for privileges, for example if the actual type is a view, the reference
517        // type is table.
518        let mut reference_object_type = actual_object_type.clone();
519
520        let acl_mode = privilege_spec_to_acl_mode(scx, &privileges, actual_object_type);
521
522        if let SystemObjectId::Object(ObjectId::Item(id)) = &target_id {
523            let item = scx.get_item(id);
524            let item_type: ObjectType = item.item_type().into();
525            if (item_type == ObjectType::View
526                || item_type == ObjectType::MaterializedView
527                || item_type == ObjectType::Source)
528                && object_type == SystemObjectType::Object(ObjectType::Table)
529            {
530                // This is an expected mis-match to match PostgreSQL semantics.
531                reference_object_type = SystemObjectType::Object(ObjectType::Table);
532            } else if SystemObjectType::Object(item_type) != object_type {
533                let object_name = scx.catalog.resolve_full_name(item.name()).to_string();
534                return Err(PlanError::InvalidObjectType {
535                    expected_type: object_type,
536                    actual_type: actual_object_type,
537                    object_name,
538                });
539            }
540        }
541
542        let all_object_privileges = scx.catalog.all_object_privileges(reference_object_type);
543        let invalid_privileges = acl_mode.difference(all_object_privileges);
544        if !invalid_privileges.is_empty() {
545            let object_description =
546                ErrorMessageObjectDescription::from_sys_id(&target_id, scx.catalog);
547            return Err(PlanError::InvalidPrivilegeTypes {
548                invalid_privileges,
549                object_description,
550            });
551        }
552
553        // In PostgreSQL, the grantor must always be either the object owner or some role that has been
554        // been explicitly granted grant options. In Materialize, we haven't implemented grant options
555        // so the grantor is always the object owner.
556        //
557        // For more details see:
558        // https://github.com/postgres/postgres/blob/78d5952dd0e66afc4447eec07f770991fa406cce/src/backend/utils/adt/acl.c#L5154-L5246
559        let grantor = match &target_id {
560            SystemObjectId::Object(object_id) => scx
561                .catalog
562                .get_owner_id(object_id)
563                .expect("cannot revoke privileges on objects without owners"),
564            SystemObjectId::System => scx.catalog.mz_system_role_id(),
565        };
566
567        update_privileges.push(UpdatePrivilege {
568            acl_mode,
569            target_id,
570            grantor,
571        });
572    }
573
574    let grantees = roles.into_iter().map(|role| role.id).collect();
575
576    Ok(UpdatePrivilegesPlan {
577        update_privileges,
578        grantees,
579    })
580}
581
582fn privilege_spec_to_acl_mode(
583    scx: &StatementContext,
584    privilege_spec: &PrivilegeSpecification,
585    object_type: SystemObjectType,
586) -> AclMode {
587    match privilege_spec {
588        PrivilegeSpecification::All => scx.catalog.all_object_privileges(object_type),
589        PrivilegeSpecification::Privileges(privileges) => privileges
590            .into_iter()
591            .map(|privilege| privilege_to_acl_mode(privilege.clone()))
592            // PostgreSQL doesn't care about duplicate privileges, so we don't either.
593            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode)),
594    }
595}
596
597fn privilege_to_acl_mode(privilege: Privilege) -> AclMode {
598    match privilege {
599        Privilege::SELECT => AclMode::SELECT,
600        Privilege::INSERT => AclMode::INSERT,
601        Privilege::UPDATE => AclMode::UPDATE,
602        Privilege::DELETE => AclMode::DELETE,
603        Privilege::USAGE => AclMode::USAGE,
604        Privilege::CREATE => AclMode::CREATE,
605        Privilege::CREATEROLE => AclMode::CREATE_ROLE,
606        Privilege::CREATEDB => AclMode::CREATE_DB,
607        Privilege::CREATECLUSTER => AclMode::CREATE_CLUSTER,
608        Privilege::CREATENETWORKPOLICY => AclMode::CREATE_NETWORK_POLICY,
609    }
610}
611
612pub fn describe_alter_default_privileges(
613    _: &StatementContext,
614    _: AlterDefaultPrivilegesStatement<Aug>,
615) -> Result<StatementDesc, PlanError> {
616    Ok(StatementDesc::new(None))
617}
618
619pub fn plan_alter_default_privileges(
620    scx: &StatementContext,
621    AlterDefaultPrivilegesStatement {
622        target_roles,
623        target_objects,
624        grant_or_revoke,
625    }: AlterDefaultPrivilegesStatement<Aug>,
626) -> Result<Plan, PlanError> {
627    let object_type: ObjectType = (*grant_or_revoke.object_type()).into();
628    match object_type {
629        ObjectType::View
630        | ObjectType::MaterializedView
631        | ObjectType::Source
632        | ObjectType::ContinualTask => sql_bail!(
633            "{object_type}S is not valid for ALTER DEFAULT PRIVILEGES, use TABLES instead"
634        ),
635        ObjectType::Sink | ObjectType::ClusterReplica | ObjectType::Role | ObjectType::Func => {
636            sql_bail!("{object_type}S do not have privileges")
637        }
638        ObjectType::Cluster | ObjectType::Database
639            if matches!(
640                target_objects,
641                GrantTargetAllSpecification::AllDatabases { .. }
642            ) =>
643        {
644            sql_bail!("cannot specify {object_type}S and IN DATABASE")
645        }
646
647        ObjectType::Cluster | ObjectType::Database | ObjectType::Schema
648            if matches!(
649                target_objects,
650                GrantTargetAllSpecification::AllSchemas { .. }
651            ) =>
652        {
653            sql_bail!("cannot specify {object_type}S and IN SCHEMA")
654        }
655        ObjectType::Table
656        | ObjectType::Index
657        | ObjectType::Type
658        | ObjectType::Secret
659        | ObjectType::Connection
660        | ObjectType::Cluster
661        | ObjectType::Database
662        | ObjectType::Schema
663        | ObjectType::NetworkPolicy => {}
664    }
665
666    let acl_mode = privilege_spec_to_acl_mode(
667        scx,
668        grant_or_revoke.privileges(),
669        SystemObjectType::Object(object_type),
670    );
671    let all_object_privileges = scx
672        .catalog
673        .all_object_privileges(SystemObjectType::Object(object_type));
674    let invalid_privileges = acl_mode.difference(all_object_privileges);
675    if !invalid_privileges.is_empty() {
676        let object_description =
677            ErrorMessageObjectDescription::from_object_type(SystemObjectType::Object(object_type));
678        return Err(PlanError::InvalidPrivilegeTypes {
679            invalid_privileges,
680            object_description,
681        });
682    }
683
684    let target_roles = match target_roles {
685        TargetRoleSpecification::Roles(roles) => roles.into_iter().map(|role| role.id).collect(),
686        TargetRoleSpecification::AllRoles => vec![RoleId::Public],
687    };
688    let mut privilege_objects = Vec::with_capacity(target_roles.len() * target_objects.len());
689    for target_role in target_roles {
690        match &target_objects {
691            GrantTargetAllSpecification::All => privilege_objects.push(DefaultPrivilegeObject {
692                role_id: target_role,
693                database_id: None,
694                schema_id: None,
695                object_type,
696            }),
697            GrantTargetAllSpecification::AllDatabases { databases } => {
698                for database in databases {
699                    privilege_objects.push(DefaultPrivilegeObject {
700                        role_id: target_role,
701                        database_id: Some(*database.database_id()),
702                        schema_id: None,
703                        object_type,
704                    });
705                }
706            }
707            GrantTargetAllSpecification::AllSchemas { schemas } => {
708                for schema in schemas {
709                    privilege_objects.push(DefaultPrivilegeObject {
710                        role_id: target_role,
711                        database_id: schema.database_spec().id(),
712                        schema_id: Some(schema.schema_spec().into()),
713                        object_type,
714                    });
715                }
716            }
717        }
718    }
719
720    let privilege_acl_items = grant_or_revoke
721        .roles()
722        .into_iter()
723        .map(|grantee| DefaultPrivilegeAclItem {
724            grantee: grantee.id,
725            acl_mode,
726        })
727        .collect();
728
729    let is_grant = match grant_or_revoke {
730        AbbreviatedGrantOrRevokeStatement::Grant(_) => true,
731        AbbreviatedGrantOrRevokeStatement::Revoke(_) => false,
732    };
733
734    Ok(Plan::AlterDefaultPrivileges(AlterDefaultPrivilegesPlan {
735        privilege_objects,
736        privilege_acl_items,
737        is_grant,
738    }))
739}
740
741pub fn describe_reassign_owned(
742    _: &StatementContext,
743    _: ReassignOwnedStatement<Aug>,
744) -> Result<StatementDesc, PlanError> {
745    Ok(StatementDesc::new(None))
746}
747
748pub fn plan_reassign_owned(
749    scx: &StatementContext,
750    ReassignOwnedStatement {
751        old_roles,
752        new_role,
753    }: ReassignOwnedStatement<Aug>,
754) -> Result<Plan, PlanError> {
755    let old_roles: BTreeSet<_> = old_roles.into_iter().map(|role| role.id).collect();
756    let mut reassign_ids: Vec<ObjectId> = Vec::new();
757
758    // Replicas
759    for replica in scx.catalog.get_cluster_replicas() {
760        if old_roles.contains(&replica.owner_id()) {
761            reassign_ids.push((replica.cluster_id(), replica.replica_id()).into());
762        }
763    }
764    // Clusters
765    for cluster in scx.catalog.get_clusters() {
766        if old_roles.contains(&cluster.owner_id()) {
767            reassign_ids.push(cluster.id().into());
768        }
769    }
770    // Items
771    for item in scx.catalog.get_items() {
772        if old_roles.contains(&item.owner_id()) {
773            reassign_ids.push(item.id().into());
774        }
775    }
776    // Schemas
777    for schema in scx.catalog.get_schemas() {
778        if !schema.id().is_temporary() {
779            if old_roles.contains(&schema.owner_id()) {
780                reassign_ids.push((*schema.database(), *schema.id()).into())
781            }
782        }
783    }
784    // Databases
785    for database in scx.catalog.get_databases() {
786        if old_roles.contains(&database.owner_id()) {
787            reassign_ids.push(database.id().into());
788        }
789    }
790
791    let system_ids: Vec<_> = reassign_ids.iter().filter(|id| id.is_system()).collect();
792    if !system_ids.is_empty() {
793        let mut owners = system_ids
794            .into_iter()
795            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
796            .collect::<BTreeSet<_>>()
797            .into_iter()
798            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
799        sql_bail!(
800            "cannot reassign objects owned by role {} because they are required by the database system",
801            owners.join(", "),
802        );
803    }
804
805    Ok(Plan::ReassignOwned(ReassignOwnedPlan {
806        old_roles: old_roles.into_iter().collect(),
807        new_role: new_role.id,
808        reassign_ids,
809    }))
810}