mz_sql/plan/statement/
acl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Access control list (ACL).
11//!
12//! This module houses the handlers for statements that modify privileges in the catalog, like
13//! `GRANT`, `REVOKE`, and `REASSIGN OWNED`.
14
15use std::collections::BTreeSet;
16
17use itertools::Itertools;
18use mz_sql_parser::ast::display::AstDisplay;
19
20use crate::ast::{Ident, UnresolvedDatabaseName};
21use crate::catalog::{
22    DefaultPrivilegeAclItem, DefaultPrivilegeObject, ErrorMessageObjectDescription, ObjectType,
23    SystemObjectType,
24};
25use crate::names::{
26    Aug, ObjectId, ResolvedDatabaseSpecifier, ResolvedRoleName, SchemaSpecifier, SystemObjectId,
27};
28use crate::plan::error::PlanError;
29use crate::plan::statement::ddl::{
30    resolve_cluster, resolve_database, resolve_item_or_type, resolve_network_policy, resolve_schema,
31};
32use crate::plan::statement::{StatementContext, StatementDesc};
33use crate::plan::{
34    AlterDefaultPrivilegesPlan, AlterNoopPlan, AlterOwnerPlan, GrantPrivilegesPlan, GrantRolePlan,
35    Plan, PlanNotice, ReassignOwnedPlan, RevokePrivilegesPlan, RevokeRolePlan, UpdatePrivilege,
36};
37use crate::session::user::SYSTEM_USER;
38use mz_ore::str::StrExt;
39use mz_repr::adt::mz_acl_item::AclMode;
40use mz_repr::role_id::RoleId;
41use mz_sql_parser::ast::{
42    AbbreviatedGrantOrRevokeStatement, AlterDefaultPrivilegesStatement, AlterOwnerStatement,
43    GrantPrivilegesStatement, GrantRoleStatement, GrantTargetAllSpecification,
44    GrantTargetSpecification, GrantTargetSpecificationInner, Privilege, PrivilegeSpecification,
45    ReassignOwnedStatement, RevokePrivilegesStatement, RevokeRoleStatement,
46    TargetRoleSpecification, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName,
47};
48
49pub fn describe_alter_owner(
50    _: &StatementContext,
51    _: AlterOwnerStatement<Aug>,
52) -> Result<StatementDesc, PlanError> {
53    Ok(StatementDesc::new(None))
54}
55
56pub fn plan_alter_owner(
57    scx: &StatementContext,
58    AlterOwnerStatement {
59        object_type,
60        if_exists,
61        name,
62        new_owner,
63    }: AlterOwnerStatement<Aug>,
64) -> Result<Plan, PlanError> {
65    let object_type = object_type.into();
66    match (object_type, name) {
67        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
68            plan_alter_cluster_owner(scx, if_exists, name, new_owner.id)
69        }
70        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(_)) => {
71            bail_never_supported!("altering the owner of a cluster replica");
72        }
73        (ObjectType::Database, UnresolvedObjectName::Database(name)) => {
74            plan_alter_database_owner(scx, if_exists, name, new_owner.id)
75        }
76        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
77            plan_alter_schema_owner(scx, if_exists, name, new_owner.id)
78        }
79        (ObjectType::NetworkPolicy, UnresolvedObjectName::NetworkPolicy(name)) => {
80            plan_alter_network_policy_owner(scx, if_exists, name, new_owner.id)
81        }
82        (ObjectType::Role, UnresolvedObjectName::Role(_)) => unreachable!("rejected by the parser"),
83        (
84            object_type @ ObjectType::Cluster
85            | object_type @ ObjectType::ClusterReplica
86            | object_type @ ObjectType::Database
87            | object_type @ ObjectType::Schema
88            | object_type @ ObjectType::Role,
89            name,
90        )
91        | (
92            object_type,
93            name @ UnresolvedObjectName::Cluster(_)
94            | name @ UnresolvedObjectName::ClusterReplica(_)
95            | name @ UnresolvedObjectName::Database(_)
96            | name @ UnresolvedObjectName::Schema(_)
97            | name @ UnresolvedObjectName::NetworkPolicy(_)
98            | name @ UnresolvedObjectName::Role(_),
99        ) => {
100            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
101        }
102        (object_type, UnresolvedObjectName::Item(name)) => {
103            plan_alter_item_owner(scx, object_type, if_exists, name, new_owner.id)
104        }
105    }
106}
107
108fn plan_alter_cluster_owner(
109    scx: &StatementContext,
110    if_exists: bool,
111    name: Ident,
112    new_owner: RoleId,
113) -> Result<Plan, PlanError> {
114    match resolve_cluster(scx, &name, if_exists)? {
115        Some(cluster) => Ok(Plan::AlterOwner(AlterOwnerPlan {
116            id: ObjectId::Cluster(cluster.id()),
117            object_type: ObjectType::Cluster,
118            new_owner,
119        })),
120        None => {
121            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
122                name: name.to_ast_string_simple(),
123                object_type: ObjectType::Cluster,
124            });
125            Ok(Plan::AlterNoop(AlterNoopPlan {
126                object_type: ObjectType::Cluster,
127            }))
128        }
129    }
130}
131
132fn plan_alter_database_owner(
133    scx: &StatementContext,
134    if_exists: bool,
135    name: UnresolvedDatabaseName,
136    new_owner: RoleId,
137) -> Result<Plan, PlanError> {
138    match resolve_database(scx, &name, if_exists)? {
139        Some(database) => Ok(Plan::AlterOwner(AlterOwnerPlan {
140            id: ObjectId::Database(database.id()),
141            object_type: ObjectType::Database,
142            new_owner,
143        })),
144        None => {
145            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
146                name: name.to_ast_string_simple(),
147                object_type: ObjectType::Database,
148            });
149
150            Ok(Plan::AlterNoop(AlterNoopPlan {
151                object_type: ObjectType::Database,
152            }))
153        }
154    }
155}
156
157fn plan_alter_schema_owner(
158    scx: &StatementContext,
159    if_exists: bool,
160    name: UnresolvedSchemaName,
161    new_owner: RoleId,
162) -> Result<Plan, PlanError> {
163    match resolve_schema(scx, name.clone(), if_exists)? {
164        Some((database_spec, schema_spec)) => {
165            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
166                sql_bail!(
167                    "cannot alter schema {name} because it is required by the database system",
168                );
169            }
170            if let SchemaSpecifier::Temporary = schema_spec {
171                sql_bail!("cannot alter schema {name} because it is a temporary schema",)
172            }
173            Ok(Plan::AlterOwner(AlterOwnerPlan {
174                id: ObjectId::Schema((database_spec, schema_spec)),
175                object_type: ObjectType::Schema,
176                new_owner,
177            }))
178        }
179        None => {
180            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
181                name: name.to_ast_string_simple(),
182                object_type: ObjectType::Schema,
183            });
184
185            Ok(Plan::AlterNoop(AlterNoopPlan {
186                object_type: ObjectType::Schema,
187            }))
188        }
189    }
190}
191
192fn plan_alter_item_owner(
193    scx: &StatementContext,
194    object_type: ObjectType,
195    if_exists: bool,
196    name: UnresolvedItemName,
197    new_owner: RoleId,
198) -> Result<Plan, PlanError> {
199    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
200        Ok(r) => r,
201        // Return a more helpful error on `DROP VIEW <materialized-view>`.
202        Err(PlanError::MismatchedObjectType {
203            name,
204            is_type: ObjectType::MaterializedView,
205            expected_type: ObjectType::View,
206        }) => {
207            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
208        }
209        e => e?,
210    };
211
212    match resolved {
213        Some(item) => {
214            if item.id().is_system() {
215                sql_bail!(
216                    "cannot alter item {} because it is required by the database system",
217                    scx.catalog.resolve_full_name(item.name()),
218                );
219            }
220
221            // Progress subsources cannot be altered directly.
222            if item.is_progress_source() {
223                sql_bail!("cannot ALTER this type of source");
224            }
225
226            Ok(Plan::AlterOwner(AlterOwnerPlan {
227                id: ObjectId::Item(item.id()),
228                object_type,
229                new_owner,
230            }))
231        }
232        None => {
233            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
234                name: name.to_ast_string_simple(),
235                object_type,
236            });
237
238            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
239        }
240    }
241}
242
243fn plan_alter_network_policy_owner(
244    scx: &StatementContext,
245    if_exists: bool,
246    name: Ident,
247    new_owner: RoleId,
248) -> Result<Plan, PlanError> {
249    match resolve_network_policy(scx, name.clone(), if_exists)? {
250        Some(policy_id) => Ok(Plan::AlterOwner(AlterOwnerPlan {
251            id: ObjectId::NetworkPolicy(policy_id.id),
252            object_type: ObjectType::Schema,
253            new_owner,
254        })),
255        None => {
256            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
257                name: name.to_ast_string_simple(),
258                object_type: ObjectType::NetworkPolicy,
259            });
260
261            Ok(Plan::AlterNoop(AlterNoopPlan {
262                object_type: ObjectType::NetworkPolicy,
263            }))
264        }
265    }
266}
267
268pub fn describe_grant_role(
269    _: &StatementContext,
270    _: GrantRoleStatement<Aug>,
271) -> Result<StatementDesc, PlanError> {
272    Ok(StatementDesc::new(None))
273}
274
275pub fn plan_grant_role(
276    scx: &StatementContext,
277    GrantRoleStatement {
278        role_names,
279        member_names,
280    }: GrantRoleStatement<Aug>,
281) -> Result<Plan, PlanError> {
282    // In PostgreSQL, the grantor must either be a role with ADMIN OPTION on the role being granted,
283    // or the bootstrap superuser. We do not have ADMIN OPTION implemented and 'mz_system' is our
284    // equivalent of the bootstrap superuser. Therefore the grantor is always 'mz_system'.
285    // For more details see:
286    // https://github.com/postgres/postgres/blob/064eb89e83ea0f59426c92906329f1e6c423dfa4/src/backend/commands/user.c#L2180-L2238
287    let grantor_id = scx
288        .catalog
289        .resolve_role(&SYSTEM_USER.name)
290        .expect("system user must exist")
291        .id();
292    Ok(Plan::GrantRole(GrantRolePlan {
293        role_ids: role_names
294            .into_iter()
295            .map(|role_name| role_name.id)
296            .collect(),
297        member_ids: member_names
298            .into_iter()
299            .map(|member_name| member_name.id)
300            .collect(),
301        grantor_id,
302    }))
303}
304
305pub fn describe_revoke_role(
306    _: &StatementContext,
307    _: RevokeRoleStatement<Aug>,
308) -> Result<StatementDesc, PlanError> {
309    Ok(StatementDesc::new(None))
310}
311
312pub fn plan_revoke_role(
313    scx: &StatementContext,
314    RevokeRoleStatement {
315        role_names,
316        member_names,
317    }: RevokeRoleStatement<Aug>,
318) -> Result<Plan, PlanError> {
319    // In PostgreSQL, the same role membership can be granted multiple times by different grantors.
320    // When revoking a role membership, only the membership granted by the specified grantor is
321    // revoked. The grantor must either be a role with ADMIN OPTION on the role being granted,
322    // or the bootstrap superuser. We do not have ADMIN OPTION implemented and 'mz_system' is our
323    // equivalent of the bootstrap superuser. Therefore the grantor is always 'mz_system'.
324    // For more details see:
325    // https://github.com/postgres/postgres/blob/064eb89e83ea0f59426c92906329f1e6c423dfa4/src/backend/commands/user.c#L2180-L2238
326    let grantor_id = scx
327        .catalog
328        .resolve_role(&SYSTEM_USER.name)
329        .expect("system user must exist")
330        .id();
331    Ok(Plan::RevokeRole(RevokeRolePlan {
332        role_ids: role_names
333            .into_iter()
334            .map(|role_name| role_name.id)
335            .collect(),
336        member_ids: member_names
337            .into_iter()
338            .map(|member_name| member_name.id)
339            .collect(),
340        grantor_id,
341    }))
342}
343
344pub fn describe_grant_privileges(
345    _: &StatementContext,
346    _: GrantPrivilegesStatement<Aug>,
347) -> Result<StatementDesc, PlanError> {
348    Ok(StatementDesc::new(None))
349}
350
351pub fn plan_grant_privileges(
352    scx: &StatementContext,
353    GrantPrivilegesStatement {
354        privileges,
355        target,
356        roles,
357    }: GrantPrivilegesStatement<Aug>,
358) -> Result<Plan, PlanError> {
359    let plan = plan_update_privilege(scx, privileges, target, roles)?;
360    Ok(Plan::GrantPrivileges(plan.into()))
361}
362
363pub fn describe_revoke_privileges(
364    _: &StatementContext,
365    _: RevokePrivilegesStatement<Aug>,
366) -> Result<StatementDesc, PlanError> {
367    Ok(StatementDesc::new(None))
368}
369
370pub fn plan_revoke_privileges(
371    scx: &StatementContext,
372    RevokePrivilegesStatement {
373        privileges,
374        target,
375        roles,
376    }: RevokePrivilegesStatement<Aug>,
377) -> Result<Plan, PlanError> {
378    let plan = plan_update_privilege(scx, privileges, target, roles)?;
379    Ok(Plan::RevokePrivileges(plan.into()))
380}
381
382struct UpdatePrivilegesPlan {
383    update_privileges: Vec<UpdatePrivilege>,
384    grantees: Vec<RoleId>,
385}
386
387impl From<UpdatePrivilegesPlan> for GrantPrivilegesPlan {
388    fn from(
389        UpdatePrivilegesPlan {
390            update_privileges,
391            grantees,
392        }: UpdatePrivilegesPlan,
393    ) -> GrantPrivilegesPlan {
394        GrantPrivilegesPlan {
395            update_privileges,
396            grantees,
397        }
398    }
399}
400
401impl From<UpdatePrivilegesPlan> for RevokePrivilegesPlan {
402    fn from(
403        UpdatePrivilegesPlan {
404            update_privileges,
405            grantees,
406        }: UpdatePrivilegesPlan,
407    ) -> RevokePrivilegesPlan {
408        RevokePrivilegesPlan {
409            update_privileges,
410            revokees: grantees,
411        }
412    }
413}
414
415fn plan_update_privilege(
416    scx: &StatementContext,
417    privileges: PrivilegeSpecification,
418    target: GrantTargetSpecification<Aug>,
419    roles: Vec<ResolvedRoleName>,
420) -> Result<UpdatePrivilegesPlan, PlanError> {
421    let (object_type, target_ids) = match target {
422        GrantTargetSpecification::Object {
423            object_type,
424            object_spec_inner,
425        } => {
426            fn object_type_filter(
427                object_id: &ObjectId,
428                object_type: &ObjectType,
429                scx: &StatementContext,
430            ) -> bool {
431                if object_type == &ObjectType::Table {
432                    scx.get_object_type(object_id).is_relation()
433                } else {
434                    object_type == &scx.get_object_type(object_id)
435                }
436            }
437            let object_type = object_type.into();
438            let object_ids: Vec<ObjectId> = match object_spec_inner {
439                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::All) => {
440                    let cluster_ids = scx
441                        .catalog
442                        .get_clusters()
443                        .into_iter()
444                        .map(|cluster| cluster.id().into());
445                    let database_ids = scx
446                        .catalog
447                        .get_databases()
448                        .into_iter()
449                        .map(|database| database.id().into());
450                    let schema_ids = scx
451                        .catalog
452                        .get_schemas()
453                        .into_iter()
454                        .filter(|schema| !schema.id().is_temporary())
455                        .map(|schema| (schema.database().clone(), schema.id().clone()).into());
456                    let item_ids = scx
457                        .catalog
458                        .get_items()
459                        .into_iter()
460                        .map(|item| item.id().into());
461                    cluster_ids
462                        .chain(database_ids)
463                        .chain(schema_ids)
464                        .chain(item_ids)
465                        .filter(|object_id| object_type_filter(object_id, &object_type, scx))
466                        .filter(|object_id| object_id.is_user())
467                        .collect()
468                }
469                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllDatabases {
470                    databases,
471                }) => {
472                    let schema_ids = databases
473                        .iter()
474                        .map(|database| scx.get_database(database.database_id()))
475                        .flat_map(|database| database.schemas().into_iter())
476                        .filter(|schema| !schema.id().is_temporary())
477                        .map(|schema| (schema.database().clone(), schema.id().clone()).into());
478
479                    let item_ids = databases
480                        .iter()
481                        .map(|database| scx.get_database(database.database_id()))
482                        .flat_map(|database| database.schemas().into_iter())
483                        .flat_map(|schema| schema.item_ids())
484                        .map(|item_id| item_id.into());
485
486                    item_ids
487                        .chain(schema_ids)
488                        .filter(|object_id| object_type_filter(object_id, &object_type, scx))
489                        .collect()
490                }
491                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllSchemas {
492                    schemas,
493                }) => schemas
494                    .into_iter()
495                    .map(|schema| scx.get_schema(schema.database_spec(), schema.schema_spec()))
496                    .flat_map(|schema| schema.item_ids())
497                    .map(|item_id| item_id.into())
498                    .filter(|object_id| object_type_filter(object_id, &object_type, scx))
499                    .collect(),
500                GrantTargetSpecificationInner::Objects { names } => names
501                    .into_iter()
502                    .map(|name| {
503                        name.try_into()
504                            .expect("name resolution should handle invalid objects")
505                    })
506                    .collect(),
507            };
508            let target_ids = object_ids.into_iter().map(|id| id.into()).collect();
509            (SystemObjectType::Object(object_type), target_ids)
510        }
511        GrantTargetSpecification::System => {
512            (SystemObjectType::System, vec![SystemObjectId::System])
513        }
514    };
515
516    let mut update_privileges = Vec::with_capacity(target_ids.len());
517
518    for target_id in target_ids {
519        // The actual type of the object.
520        let actual_object_type = scx.get_system_object_type(&target_id);
521        // The type used for privileges, for example if the actual type is a view, the reference
522        // type is table.
523        let mut reference_object_type = actual_object_type.clone();
524
525        let acl_mode = privilege_spec_to_acl_mode(scx, &privileges, actual_object_type);
526
527        if let SystemObjectId::Object(ObjectId::Item(id)) = &target_id {
528            let item = scx.get_item(id);
529            let item_type: ObjectType = item.item_type().into();
530            if (item_type == ObjectType::View
531                || item_type == ObjectType::MaterializedView
532                || item_type == ObjectType::Source)
533                && object_type == SystemObjectType::Object(ObjectType::Table)
534            {
535                // This is an expected mis-match to match PostgreSQL semantics.
536                reference_object_type = SystemObjectType::Object(ObjectType::Table);
537            } else if SystemObjectType::Object(item_type) != object_type {
538                let object_name = scx.catalog.resolve_full_name(item.name()).to_string();
539                return Err(PlanError::InvalidObjectType {
540                    expected_type: object_type,
541                    actual_type: actual_object_type,
542                    object_name,
543                });
544            }
545        }
546
547        let all_object_privileges = scx.catalog.all_object_privileges(reference_object_type);
548        let invalid_privileges = acl_mode.difference(all_object_privileges);
549        if !invalid_privileges.is_empty() {
550            let object_description =
551                ErrorMessageObjectDescription::from_sys_id(&target_id, scx.catalog);
552            return Err(PlanError::InvalidPrivilegeTypes {
553                invalid_privileges,
554                object_description,
555            });
556        }
557
558        // In PostgreSQL, the grantor must always be either the object owner or some role that has been
559        // been explicitly granted grant options. In Materialize, we haven't implemented grant options
560        // so the grantor is always the object owner.
561        //
562        // For more details see:
563        // https://github.com/postgres/postgres/blob/78d5952dd0e66afc4447eec07f770991fa406cce/src/backend/utils/adt/acl.c#L5154-L5246
564        let grantor = match &target_id {
565            SystemObjectId::Object(object_id) => scx
566                .catalog
567                .get_owner_id(object_id)
568                .expect("cannot revoke privileges on objects without owners"),
569            SystemObjectId::System => scx.catalog.mz_system_role_id(),
570        };
571
572        update_privileges.push(UpdatePrivilege {
573            acl_mode,
574            target_id,
575            grantor,
576        });
577    }
578
579    let grantees = roles.into_iter().map(|role| role.id).collect();
580
581    Ok(UpdatePrivilegesPlan {
582        update_privileges,
583        grantees,
584    })
585}
586
587fn privilege_spec_to_acl_mode(
588    scx: &StatementContext,
589    privilege_spec: &PrivilegeSpecification,
590    object_type: SystemObjectType,
591) -> AclMode {
592    match privilege_spec {
593        PrivilegeSpecification::All => scx.catalog.all_object_privileges(object_type),
594        PrivilegeSpecification::Privileges(privileges) => privileges
595            .into_iter()
596            .map(|privilege| privilege_to_acl_mode(privilege.clone()))
597            // PostgreSQL doesn't care about duplicate privileges, so we don't either.
598            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode)),
599    }
600}
601
602fn privilege_to_acl_mode(privilege: Privilege) -> AclMode {
603    match privilege {
604        Privilege::SELECT => AclMode::SELECT,
605        Privilege::INSERT => AclMode::INSERT,
606        Privilege::UPDATE => AclMode::UPDATE,
607        Privilege::DELETE => AclMode::DELETE,
608        Privilege::USAGE => AclMode::USAGE,
609        Privilege::CREATE => AclMode::CREATE,
610        Privilege::CREATEROLE => AclMode::CREATE_ROLE,
611        Privilege::CREATEDB => AclMode::CREATE_DB,
612        Privilege::CREATECLUSTER => AclMode::CREATE_CLUSTER,
613        Privilege::CREATENETWORKPOLICY => AclMode::CREATE_NETWORK_POLICY,
614    }
615}
616
617pub fn describe_alter_default_privileges(
618    _: &StatementContext,
619    _: AlterDefaultPrivilegesStatement<Aug>,
620) -> Result<StatementDesc, PlanError> {
621    Ok(StatementDesc::new(None))
622}
623
624pub fn plan_alter_default_privileges(
625    scx: &StatementContext,
626    AlterDefaultPrivilegesStatement {
627        target_roles,
628        target_objects,
629        grant_or_revoke,
630    }: AlterDefaultPrivilegesStatement<Aug>,
631) -> Result<Plan, PlanError> {
632    let object_type: ObjectType = (*grant_or_revoke.object_type()).into();
633    match object_type {
634        ObjectType::View
635        | ObjectType::MaterializedView
636        | ObjectType::Source
637        | ObjectType::ContinualTask => sql_bail!(
638            "{object_type}S is not valid for ALTER DEFAULT PRIVILEGES, use TABLES instead"
639        ),
640        ObjectType::Sink | ObjectType::ClusterReplica | ObjectType::Role | ObjectType::Func => {
641            sql_bail!("{object_type}S do not have privileges")
642        }
643        ObjectType::Cluster | ObjectType::Database
644            if matches!(
645                target_objects,
646                GrantTargetAllSpecification::AllDatabases { .. }
647            ) =>
648        {
649            sql_bail!("cannot specify {object_type}S and IN DATABASE")
650        }
651
652        ObjectType::Cluster | ObjectType::Database | ObjectType::Schema
653            if matches!(
654                target_objects,
655                GrantTargetAllSpecification::AllSchemas { .. }
656            ) =>
657        {
658            sql_bail!("cannot specify {object_type}S and IN SCHEMA")
659        }
660        ObjectType::Table
661        | ObjectType::Index
662        | ObjectType::Type
663        | ObjectType::Secret
664        | ObjectType::Connection
665        | ObjectType::Cluster
666        | ObjectType::Database
667        | ObjectType::Schema
668        | ObjectType::NetworkPolicy => {}
669    }
670
671    let acl_mode = privilege_spec_to_acl_mode(
672        scx,
673        grant_or_revoke.privileges(),
674        SystemObjectType::Object(object_type),
675    );
676    let all_object_privileges = scx
677        .catalog
678        .all_object_privileges(SystemObjectType::Object(object_type));
679    let invalid_privileges = acl_mode.difference(all_object_privileges);
680    if !invalid_privileges.is_empty() {
681        let object_description =
682            ErrorMessageObjectDescription::from_object_type(SystemObjectType::Object(object_type));
683        return Err(PlanError::InvalidPrivilegeTypes {
684            invalid_privileges,
685            object_description,
686        });
687    }
688
689    let target_roles = match target_roles {
690        TargetRoleSpecification::Roles(roles) => roles.into_iter().map(|role| role.id).collect(),
691        TargetRoleSpecification::AllRoles => vec![RoleId::Public],
692    };
693    let mut privilege_objects = Vec::with_capacity(target_roles.len() * target_objects.len());
694    for target_role in target_roles {
695        match &target_objects {
696            GrantTargetAllSpecification::All => privilege_objects.push(DefaultPrivilegeObject {
697                role_id: target_role,
698                database_id: None,
699                schema_id: None,
700                object_type,
701            }),
702            GrantTargetAllSpecification::AllDatabases { databases } => {
703                for database in databases {
704                    privilege_objects.push(DefaultPrivilegeObject {
705                        role_id: target_role,
706                        database_id: Some(*database.database_id()),
707                        schema_id: None,
708                        object_type,
709                    });
710                }
711            }
712            GrantTargetAllSpecification::AllSchemas { schemas } => {
713                for schema in schemas {
714                    privilege_objects.push(DefaultPrivilegeObject {
715                        role_id: target_role,
716                        database_id: schema.database_spec().id(),
717                        schema_id: Some(schema.schema_spec().into()),
718                        object_type,
719                    });
720                }
721            }
722        }
723    }
724
725    let privilege_acl_items = grant_or_revoke
726        .roles()
727        .into_iter()
728        .map(|grantee| DefaultPrivilegeAclItem {
729            grantee: grantee.id,
730            acl_mode,
731        })
732        .collect();
733
734    let is_grant = match grant_or_revoke {
735        AbbreviatedGrantOrRevokeStatement::Grant(_) => true,
736        AbbreviatedGrantOrRevokeStatement::Revoke(_) => false,
737    };
738
739    Ok(Plan::AlterDefaultPrivileges(AlterDefaultPrivilegesPlan {
740        privilege_objects,
741        privilege_acl_items,
742        is_grant,
743    }))
744}
745
746pub fn describe_reassign_owned(
747    _: &StatementContext,
748    _: ReassignOwnedStatement<Aug>,
749) -> Result<StatementDesc, PlanError> {
750    Ok(StatementDesc::new(None))
751}
752
753pub fn plan_reassign_owned(
754    scx: &StatementContext,
755    ReassignOwnedStatement {
756        old_roles,
757        new_role,
758    }: ReassignOwnedStatement<Aug>,
759) -> Result<Plan, PlanError> {
760    let old_roles: BTreeSet<_> = old_roles.into_iter().map(|role| role.id).collect();
761    let mut reassign_ids: Vec<ObjectId> = Vec::new();
762
763    // Replicas
764    for replica in scx.catalog.get_cluster_replicas() {
765        if old_roles.contains(&replica.owner_id()) {
766            reassign_ids.push((replica.cluster_id(), replica.replica_id()).into());
767        }
768    }
769    // Clusters
770    for cluster in scx.catalog.get_clusters() {
771        if old_roles.contains(&cluster.owner_id()) {
772            reassign_ids.push(cluster.id().into());
773        }
774    }
775    // Items
776    for item in scx.catalog.get_items() {
777        if old_roles.contains(&item.owner_id()) {
778            reassign_ids.push(item.id().into());
779        }
780    }
781    // Schemas
782    for schema in scx.catalog.get_schemas() {
783        if !schema.id().is_temporary() {
784            if old_roles.contains(&schema.owner_id()) {
785                reassign_ids.push((*schema.database(), *schema.id()).into())
786            }
787        }
788    }
789    // Databases
790    for database in scx.catalog.get_databases() {
791        if old_roles.contains(&database.owner_id()) {
792            reassign_ids.push(database.id().into());
793        }
794    }
795
796    let system_ids: Vec<_> = reassign_ids.iter().filter(|id| id.is_system()).collect();
797    if !system_ids.is_empty() {
798        let mut owners = system_ids
799            .into_iter()
800            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
801            .collect::<BTreeSet<_>>()
802            .into_iter()
803            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
804        sql_bail!(
805            "cannot reassign objects owned by role {} because they are required by the database system",
806            owners.join(", "),
807        );
808    }
809
810    Ok(Plan::ReassignOwned(ReassignOwnedPlan {
811        old_roles: old_roles.into_iter().collect(),
812        new_role: new_role.id,
813        reassign_ids,
814    }))
815}