mz_sql/plan/statement/
acl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Access control list (ACL).
11//!
12//! This module houses the handlers for statements that modify privileges in the catalog, like
13//! `GRANT`, `REVOKE`, and `REASSIGN OWNED`.
14
15use std::collections::BTreeSet;
16
17use itertools::Itertools;
18use mz_sql_parser::ast::display::AstDisplay;
19
20use crate::ast::{Ident, UnresolvedDatabaseName};
21use crate::catalog::{
22    DefaultPrivilegeAclItem, DefaultPrivilegeObject, ErrorMessageObjectDescription, ObjectType,
23    SystemObjectType,
24};
25use crate::names::{
26    Aug, ObjectId, ResolvedDatabaseSpecifier, ResolvedRoleName, SchemaSpecifier, SystemObjectId,
27};
28use crate::plan::error::PlanError;
29use crate::plan::statement::ddl::{
30    resolve_cluster, resolve_database, resolve_item_or_type, resolve_network_policy, resolve_schema,
31};
32use crate::plan::statement::{StatementContext, StatementDesc};
33use crate::plan::{
34    AlterDefaultPrivilegesPlan, AlterNoopPlan, AlterOwnerPlan, GrantPrivilegesPlan, GrantRolePlan,
35    Plan, PlanNotice, ReassignOwnedPlan, RevokePrivilegesPlan, RevokeRolePlan, UpdatePrivilege,
36};
37use crate::session::user::SYSTEM_USER;
38use mz_ore::str::StrExt;
39use mz_repr::adt::mz_acl_item::AclMode;
40use mz_repr::role_id::RoleId;
41use mz_sql_parser::ast::{
42    AbbreviatedGrantOrRevokeStatement, AlterDefaultPrivilegesStatement, AlterOwnerStatement,
43    GrantPrivilegesStatement, GrantRoleStatement, GrantTargetAllSpecification,
44    GrantTargetSpecification, GrantTargetSpecificationInner, Privilege, PrivilegeSpecification,
45    ReassignOwnedStatement, RevokePrivilegesStatement, RevokeRoleStatement,
46    TargetRoleSpecification, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName,
47};
48
49pub fn describe_alter_owner(
50    _: &StatementContext,
51    _: AlterOwnerStatement<Aug>,
52) -> Result<StatementDesc, PlanError> {
53    Ok(StatementDesc::new(None))
54}
55
56pub fn plan_alter_owner(
57    scx: &StatementContext,
58    AlterOwnerStatement {
59        object_type,
60        if_exists,
61        name,
62        new_owner,
63    }: AlterOwnerStatement<Aug>,
64) -> Result<Plan, PlanError> {
65    let object_type = object_type.into();
66    match (object_type, name) {
67        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
68            plan_alter_cluster_owner(scx, if_exists, name, new_owner.id)
69        }
70        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(_)) => {
71            bail_never_supported!("altering the owner of a cluster replica");
72        }
73        (ObjectType::Database, UnresolvedObjectName::Database(name)) => {
74            plan_alter_database_owner(scx, if_exists, name, new_owner.id)
75        }
76        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
77            plan_alter_schema_owner(scx, if_exists, name, new_owner.id)
78        }
79        (ObjectType::NetworkPolicy, UnresolvedObjectName::NetworkPolicy(name)) => {
80            plan_alter_network_policy_owner(scx, if_exists, name, new_owner.id)
81        }
82        (ObjectType::Role, UnresolvedObjectName::Role(_)) => unreachable!("rejected by the parser"),
83        (
84            object_type @ ObjectType::Cluster
85            | object_type @ ObjectType::ClusterReplica
86            | object_type @ ObjectType::Database
87            | object_type @ ObjectType::Schema
88            | object_type @ ObjectType::Role,
89            name,
90        )
91        | (
92            object_type,
93            name @ UnresolvedObjectName::Cluster(_)
94            | name @ UnresolvedObjectName::ClusterReplica(_)
95            | name @ UnresolvedObjectName::Database(_)
96            | name @ UnresolvedObjectName::Schema(_)
97            | name @ UnresolvedObjectName::NetworkPolicy(_)
98            | name @ UnresolvedObjectName::Role(_),
99        ) => {
100            unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
101        }
102        (object_type, UnresolvedObjectName::Item(name)) => {
103            plan_alter_item_owner(scx, object_type, if_exists, name, new_owner.id)
104        }
105    }
106}
107
108fn plan_alter_cluster_owner(
109    scx: &StatementContext,
110    if_exists: bool,
111    name: Ident,
112    new_owner: RoleId,
113) -> Result<Plan, PlanError> {
114    match resolve_cluster(scx, &name, if_exists)? {
115        Some(cluster) => Ok(Plan::AlterOwner(AlterOwnerPlan {
116            id: ObjectId::Cluster(cluster.id()),
117            object_type: ObjectType::Cluster,
118            new_owner,
119        })),
120        None => {
121            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
122                name: name.to_ast_string_simple(),
123                object_type: ObjectType::Cluster,
124            });
125            Ok(Plan::AlterNoop(AlterNoopPlan {
126                object_type: ObjectType::Cluster,
127            }))
128        }
129    }
130}
131
132fn plan_alter_database_owner(
133    scx: &StatementContext,
134    if_exists: bool,
135    name: UnresolvedDatabaseName,
136    new_owner: RoleId,
137) -> Result<Plan, PlanError> {
138    match resolve_database(scx, &name, if_exists)? {
139        Some(database) => Ok(Plan::AlterOwner(AlterOwnerPlan {
140            id: ObjectId::Database(database.id()),
141            object_type: ObjectType::Database,
142            new_owner,
143        })),
144        None => {
145            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
146                name: name.to_ast_string_simple(),
147                object_type: ObjectType::Database,
148            });
149
150            Ok(Plan::AlterNoop(AlterNoopPlan {
151                object_type: ObjectType::Database,
152            }))
153        }
154    }
155}
156
157fn plan_alter_schema_owner(
158    scx: &StatementContext,
159    if_exists: bool,
160    name: UnresolvedSchemaName,
161    new_owner: RoleId,
162) -> Result<Plan, PlanError> {
163    // Special case for mz_temp: with lazy temporary schema creation, the temp
164    // schema may not exist yet, but we still need to return the correct error.
165    // Check the schema name directly against MZ_TEMP_SCHEMA.
166    let normalized = crate::normalize::unresolved_schema_name(name.clone())?;
167    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
168        sql_bail!("cannot alter schema {name} because it is a temporary schema",)
169    }
170
171    match resolve_schema(scx, name.clone(), if_exists)? {
172        Some((database_spec, schema_spec)) => {
173            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
174                sql_bail!(
175                    "cannot alter schema {name} because it is required by the database system",
176                );
177            }
178            if let SchemaSpecifier::Temporary = schema_spec {
179                sql_bail!("cannot alter schema {name} because it is a temporary schema",)
180            }
181            Ok(Plan::AlterOwner(AlterOwnerPlan {
182                id: ObjectId::Schema((database_spec, schema_spec)),
183                object_type: ObjectType::Schema,
184                new_owner,
185            }))
186        }
187        None => {
188            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
189                name: name.to_ast_string_simple(),
190                object_type: ObjectType::Schema,
191            });
192
193            Ok(Plan::AlterNoop(AlterNoopPlan {
194                object_type: ObjectType::Schema,
195            }))
196        }
197    }
198}
199
200fn plan_alter_item_owner(
201    scx: &StatementContext,
202    object_type: ObjectType,
203    if_exists: bool,
204    name: UnresolvedItemName,
205    new_owner: RoleId,
206) -> Result<Plan, PlanError> {
207    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
208        Ok(r) => r,
209        // Return a more helpful error on `DROP VIEW <materialized-view>`.
210        Err(PlanError::MismatchedObjectType {
211            name,
212            is_type: ObjectType::MaterializedView,
213            expected_type: ObjectType::View,
214        }) => {
215            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
216        }
217        e => e?,
218    };
219
220    match resolved {
221        Some(item) => {
222            if item.id().is_system() {
223                sql_bail!(
224                    "cannot alter item {} because it is required by the database system",
225                    scx.catalog.resolve_full_name(item.name()),
226                );
227            }
228
229            Ok(Plan::AlterOwner(AlterOwnerPlan {
230                id: ObjectId::Item(item.id()),
231                object_type,
232                new_owner,
233            }))
234        }
235        None => {
236            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
237                name: name.to_ast_string_simple(),
238                object_type,
239            });
240
241            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
242        }
243    }
244}
245
246fn plan_alter_network_policy_owner(
247    scx: &StatementContext,
248    if_exists: bool,
249    name: Ident,
250    new_owner: RoleId,
251) -> Result<Plan, PlanError> {
252    match resolve_network_policy(scx, name.clone(), if_exists)? {
253        Some(policy_id) => Ok(Plan::AlterOwner(AlterOwnerPlan {
254            id: ObjectId::NetworkPolicy(policy_id.id),
255            object_type: ObjectType::Schema,
256            new_owner,
257        })),
258        None => {
259            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
260                name: name.to_ast_string_simple(),
261                object_type: ObjectType::NetworkPolicy,
262            });
263
264            Ok(Plan::AlterNoop(AlterNoopPlan {
265                object_type: ObjectType::NetworkPolicy,
266            }))
267        }
268    }
269}
270
271pub fn describe_grant_role(
272    _: &StatementContext,
273    _: GrantRoleStatement<Aug>,
274) -> Result<StatementDesc, PlanError> {
275    Ok(StatementDesc::new(None))
276}
277
278pub fn plan_grant_role(
279    scx: &StatementContext,
280    GrantRoleStatement {
281        role_names,
282        member_names,
283    }: GrantRoleStatement<Aug>,
284) -> Result<Plan, PlanError> {
285    // In PostgreSQL, the grantor must either be a role with ADMIN OPTION on the role being granted,
286    // or the bootstrap superuser. We do not have ADMIN OPTION implemented and 'mz_system' is our
287    // equivalent of the bootstrap superuser. Therefore the grantor is always 'mz_system'.
288    // For more details see:
289    // https://github.com/postgres/postgres/blob/064eb89e83ea0f59426c92906329f1e6c423dfa4/src/backend/commands/user.c#L2180-L2238
290    let grantor_id = scx
291        .catalog
292        .resolve_role(&SYSTEM_USER.name)
293        .expect("system user must exist")
294        .id();
295    Ok(Plan::GrantRole(GrantRolePlan {
296        role_ids: role_names
297            .into_iter()
298            .map(|role_name| role_name.id)
299            .collect(),
300        member_ids: member_names
301            .into_iter()
302            .map(|member_name| member_name.id)
303            .collect(),
304        grantor_id,
305    }))
306}
307
308pub fn describe_revoke_role(
309    _: &StatementContext,
310    _: RevokeRoleStatement<Aug>,
311) -> Result<StatementDesc, PlanError> {
312    Ok(StatementDesc::new(None))
313}
314
315pub fn plan_revoke_role(
316    scx: &StatementContext,
317    RevokeRoleStatement {
318        role_names,
319        member_names,
320    }: RevokeRoleStatement<Aug>,
321) -> Result<Plan, PlanError> {
322    // In PostgreSQL, the same role membership can be granted multiple times by different grantors.
323    // When revoking a role membership, only the membership granted by the specified grantor is
324    // revoked. The grantor must either be a role with ADMIN OPTION on the role being granted,
325    // or the bootstrap superuser. We do not have ADMIN OPTION implemented and 'mz_system' is our
326    // equivalent of the bootstrap superuser. Therefore the grantor is always 'mz_system'.
327    // For more details see:
328    // https://github.com/postgres/postgres/blob/064eb89e83ea0f59426c92906329f1e6c423dfa4/src/backend/commands/user.c#L2180-L2238
329    let grantor_id = scx
330        .catalog
331        .resolve_role(&SYSTEM_USER.name)
332        .expect("system user must exist")
333        .id();
334    Ok(Plan::RevokeRole(RevokeRolePlan {
335        role_ids: role_names
336            .into_iter()
337            .map(|role_name| role_name.id)
338            .collect(),
339        member_ids: member_names
340            .into_iter()
341            .map(|member_name| member_name.id)
342            .collect(),
343        grantor_id,
344    }))
345}
346
347pub fn describe_grant_privileges(
348    _: &StatementContext,
349    _: GrantPrivilegesStatement<Aug>,
350) -> Result<StatementDesc, PlanError> {
351    Ok(StatementDesc::new(None))
352}
353
354pub fn plan_grant_privileges(
355    scx: &StatementContext,
356    GrantPrivilegesStatement {
357        privileges,
358        target,
359        roles,
360    }: GrantPrivilegesStatement<Aug>,
361) -> Result<Plan, PlanError> {
362    let plan = plan_update_privilege(scx, privileges, target, roles)?;
363    Ok(Plan::GrantPrivileges(plan.into()))
364}
365
366pub fn describe_revoke_privileges(
367    _: &StatementContext,
368    _: RevokePrivilegesStatement<Aug>,
369) -> Result<StatementDesc, PlanError> {
370    Ok(StatementDesc::new(None))
371}
372
373pub fn plan_revoke_privileges(
374    scx: &StatementContext,
375    RevokePrivilegesStatement {
376        privileges,
377        target,
378        roles,
379    }: RevokePrivilegesStatement<Aug>,
380) -> Result<Plan, PlanError> {
381    let plan = plan_update_privilege(scx, privileges, target, roles)?;
382    Ok(Plan::RevokePrivileges(plan.into()))
383}
384
385struct UpdatePrivilegesPlan {
386    update_privileges: Vec<UpdatePrivilege>,
387    grantees: Vec<RoleId>,
388}
389
390impl From<UpdatePrivilegesPlan> for GrantPrivilegesPlan {
391    fn from(
392        UpdatePrivilegesPlan {
393            update_privileges,
394            grantees,
395        }: UpdatePrivilegesPlan,
396    ) -> GrantPrivilegesPlan {
397        GrantPrivilegesPlan {
398            update_privileges,
399            grantees,
400        }
401    }
402}
403
404impl From<UpdatePrivilegesPlan> for RevokePrivilegesPlan {
405    fn from(
406        UpdatePrivilegesPlan {
407            update_privileges,
408            grantees,
409        }: UpdatePrivilegesPlan,
410    ) -> RevokePrivilegesPlan {
411        RevokePrivilegesPlan {
412            update_privileges,
413            revokees: grantees,
414        }
415    }
416}
417
418fn plan_update_privilege(
419    scx: &StatementContext,
420    privileges: PrivilegeSpecification,
421    target: GrantTargetSpecification<Aug>,
422    roles: Vec<ResolvedRoleName>,
423) -> Result<UpdatePrivilegesPlan, PlanError> {
424    let (object_type, target_ids) = match target {
425        GrantTargetSpecification::Object {
426            object_type,
427            object_spec_inner,
428        } => {
429            fn object_type_filter(
430                object_id: &ObjectId,
431                object_type: &ObjectType,
432                scx: &StatementContext,
433            ) -> bool {
434                if object_type == &ObjectType::Table {
435                    scx.get_object_type(object_id).is_relation()
436                } else {
437                    object_type == &scx.get_object_type(object_id)
438                }
439            }
440            let object_type = object_type.into();
441            let object_ids: Vec<ObjectId> = match object_spec_inner {
442                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::All) => {
443                    let cluster_ids = scx
444                        .catalog
445                        .get_clusters()
446                        .into_iter()
447                        .map(|cluster| cluster.id().into());
448                    let database_ids = scx
449                        .catalog
450                        .get_databases()
451                        .into_iter()
452                        .map(|database| database.id().into());
453                    let schema_ids = scx
454                        .catalog
455                        .get_schemas()
456                        .into_iter()
457                        .filter(|schema| !schema.id().is_temporary())
458                        .map(|schema| (schema.database().clone(), schema.id().clone()).into());
459                    let item_ids = scx
460                        .catalog
461                        .get_items()
462                        .into_iter()
463                        .map(|item| item.id().into());
464                    cluster_ids
465                        .chain(database_ids)
466                        .chain(schema_ids)
467                        .chain(item_ids)
468                        .filter(|object_id| object_type_filter(object_id, &object_type, scx))
469                        .filter(|object_id| object_id.is_user())
470                        .collect()
471                }
472                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllDatabases {
473                    databases,
474                }) => {
475                    let schema_ids = databases
476                        .iter()
477                        .map(|database| scx.get_database(database.database_id()))
478                        .flat_map(|database| database.schemas().into_iter())
479                        .filter(|schema| !schema.id().is_temporary())
480                        .map(|schema| (schema.database().clone(), schema.id().clone()).into());
481
482                    let item_ids = databases
483                        .iter()
484                        .map(|database| scx.get_database(database.database_id()))
485                        .flat_map(|database| database.schemas().into_iter())
486                        .flat_map(|schema| schema.item_ids())
487                        .map(|item_id| item_id.into());
488
489                    item_ids
490                        .chain(schema_ids)
491                        .filter(|object_id| object_type_filter(object_id, &object_type, scx))
492                        .collect()
493                }
494                GrantTargetSpecificationInner::All(GrantTargetAllSpecification::AllSchemas {
495                    schemas,
496                }) => schemas
497                    .into_iter()
498                    .map(|schema| scx.get_schema(schema.database_spec(), schema.schema_spec()))
499                    .flat_map(|schema| schema.item_ids())
500                    .map(|item_id| item_id.into())
501                    .filter(|object_id| object_type_filter(object_id, &object_type, scx))
502                    .collect(),
503                GrantTargetSpecificationInner::Objects { names } => names
504                    .into_iter()
505                    .map(|name| {
506                        name.try_into()
507                            .expect("name resolution should handle invalid objects")
508                    })
509                    .collect(),
510            };
511            let target_ids = object_ids.into_iter().map(|id| id.into()).collect();
512            (SystemObjectType::Object(object_type), target_ids)
513        }
514        GrantTargetSpecification::System => {
515            (SystemObjectType::System, vec![SystemObjectId::System])
516        }
517    };
518
519    let mut update_privileges = Vec::with_capacity(target_ids.len());
520
521    for target_id in target_ids {
522        // Temporary schemas cannot have privileges granted or revoked - they are
523        // connection-specific and transient. With lazy temporary schema creation,
524        // the temp schema may not exist yet, but we still need to return the correct error.
525        if let SystemObjectId::Object(ObjectId::Schema((_, SchemaSpecifier::Temporary))) =
526            &target_id
527        {
528            sql_bail!(
529                "cannot grant or revoke privileges on schema {} because it is a temporary schema",
530                mz_repr::namespaces::MZ_TEMP_SCHEMA
531            );
532        }
533
534        // The actual type of the object.
535        let actual_object_type = scx.get_system_object_type(&target_id);
536        // The type used for privileges, for example if the actual type is a view, the reference
537        // type is table.
538        let mut reference_object_type = actual_object_type.clone();
539
540        let acl_mode = privilege_spec_to_acl_mode(scx, &privileges, actual_object_type);
541
542        if let SystemObjectId::Object(ObjectId::Item(id)) = &target_id {
543            let item = scx.get_item(id);
544            let item_type: ObjectType = item.item_type().into();
545            if (item_type == ObjectType::View
546                || item_type == ObjectType::MaterializedView
547                || item_type == ObjectType::Source)
548                && object_type == SystemObjectType::Object(ObjectType::Table)
549            {
550                // This is an expected mis-match to match PostgreSQL semantics.
551                reference_object_type = SystemObjectType::Object(ObjectType::Table);
552            } else if SystemObjectType::Object(item_type) != object_type {
553                let object_name = scx.catalog.resolve_full_name(item.name()).to_string();
554                return Err(PlanError::InvalidObjectType {
555                    expected_type: object_type,
556                    actual_type: actual_object_type,
557                    object_name,
558                });
559            }
560        }
561
562        let all_object_privileges = scx.catalog.all_object_privileges(reference_object_type);
563        let invalid_privileges = acl_mode.difference(all_object_privileges);
564        if !invalid_privileges.is_empty() {
565            let object_description =
566                ErrorMessageObjectDescription::from_sys_id(&target_id, scx.catalog);
567            return Err(PlanError::InvalidPrivilegeTypes {
568                invalid_privileges,
569                object_description,
570            });
571        }
572
573        // In PostgreSQL, the grantor must always be either the object owner or some role that has been
574        // been explicitly granted grant options. In Materialize, we haven't implemented grant options
575        // so the grantor is always the object owner.
576        //
577        // For more details see:
578        // https://github.com/postgres/postgres/blob/78d5952dd0e66afc4447eec07f770991fa406cce/src/backend/utils/adt/acl.c#L5154-L5246
579        let grantor = match &target_id {
580            SystemObjectId::Object(object_id) => scx
581                .catalog
582                .get_owner_id(object_id)
583                .expect("cannot revoke privileges on objects without owners"),
584            SystemObjectId::System => scx.catalog.mz_system_role_id(),
585        };
586
587        update_privileges.push(UpdatePrivilege {
588            acl_mode,
589            target_id,
590            grantor,
591        });
592    }
593
594    let grantees = roles.into_iter().map(|role| role.id).collect();
595
596    Ok(UpdatePrivilegesPlan {
597        update_privileges,
598        grantees,
599    })
600}
601
602fn privilege_spec_to_acl_mode(
603    scx: &StatementContext,
604    privilege_spec: &PrivilegeSpecification,
605    object_type: SystemObjectType,
606) -> AclMode {
607    match privilege_spec {
608        PrivilegeSpecification::All => scx.catalog.all_object_privileges(object_type),
609        PrivilegeSpecification::Privileges(privileges) => privileges
610            .into_iter()
611            .map(|privilege| privilege_to_acl_mode(privilege.clone()))
612            // PostgreSQL doesn't care about duplicate privileges, so we don't either.
613            .fold(AclMode::empty(), |accum, acl_mode| accum.union(acl_mode)),
614    }
615}
616
617fn privilege_to_acl_mode(privilege: Privilege) -> AclMode {
618    match privilege {
619        Privilege::SELECT => AclMode::SELECT,
620        Privilege::INSERT => AclMode::INSERT,
621        Privilege::UPDATE => AclMode::UPDATE,
622        Privilege::DELETE => AclMode::DELETE,
623        Privilege::USAGE => AclMode::USAGE,
624        Privilege::CREATE => AclMode::CREATE,
625        Privilege::CREATEROLE => AclMode::CREATE_ROLE,
626        Privilege::CREATEDB => AclMode::CREATE_DB,
627        Privilege::CREATECLUSTER => AclMode::CREATE_CLUSTER,
628        Privilege::CREATENETWORKPOLICY => AclMode::CREATE_NETWORK_POLICY,
629    }
630}
631
632pub fn describe_alter_default_privileges(
633    _: &StatementContext,
634    _: AlterDefaultPrivilegesStatement<Aug>,
635) -> Result<StatementDesc, PlanError> {
636    Ok(StatementDesc::new(None))
637}
638
639pub fn plan_alter_default_privileges(
640    scx: &StatementContext,
641    AlterDefaultPrivilegesStatement {
642        target_roles,
643        target_objects,
644        grant_or_revoke,
645    }: AlterDefaultPrivilegesStatement<Aug>,
646) -> Result<Plan, PlanError> {
647    let object_type: ObjectType = (*grant_or_revoke.object_type()).into();
648    match object_type {
649        ObjectType::View
650        | ObjectType::MaterializedView
651        | ObjectType::Source
652        | ObjectType::ContinualTask => sql_bail!(
653            "{object_type}S is not valid for ALTER DEFAULT PRIVILEGES, use TABLES instead"
654        ),
655        ObjectType::Sink | ObjectType::ClusterReplica | ObjectType::Role | ObjectType::Func => {
656            sql_bail!("{object_type}S do not have privileges")
657        }
658        ObjectType::Cluster | ObjectType::Database
659            if matches!(
660                target_objects,
661                GrantTargetAllSpecification::AllDatabases { .. }
662            ) =>
663        {
664            sql_bail!("cannot specify {object_type}S and IN DATABASE")
665        }
666
667        ObjectType::Cluster | ObjectType::Database | ObjectType::Schema
668            if matches!(
669                target_objects,
670                GrantTargetAllSpecification::AllSchemas { .. }
671            ) =>
672        {
673            sql_bail!("cannot specify {object_type}S and IN SCHEMA")
674        }
675        ObjectType::Table
676        | ObjectType::Index
677        | ObjectType::Type
678        | ObjectType::Secret
679        | ObjectType::Connection
680        | ObjectType::Cluster
681        | ObjectType::Database
682        | ObjectType::Schema
683        | ObjectType::NetworkPolicy => {}
684    }
685
686    let acl_mode = privilege_spec_to_acl_mode(
687        scx,
688        grant_or_revoke.privileges(),
689        SystemObjectType::Object(object_type),
690    );
691    let all_object_privileges = scx
692        .catalog
693        .all_object_privileges(SystemObjectType::Object(object_type));
694    let invalid_privileges = acl_mode.difference(all_object_privileges);
695    if !invalid_privileges.is_empty() {
696        let object_description =
697            ErrorMessageObjectDescription::from_object_type(SystemObjectType::Object(object_type));
698        return Err(PlanError::InvalidPrivilegeTypes {
699            invalid_privileges,
700            object_description,
701        });
702    }
703
704    let target_roles = match target_roles {
705        TargetRoleSpecification::Roles(roles) => roles.into_iter().map(|role| role.id).collect(),
706        TargetRoleSpecification::AllRoles => vec![RoleId::Public],
707    };
708    let mut privilege_objects = Vec::with_capacity(target_roles.len() * target_objects.len());
709    for target_role in target_roles {
710        match &target_objects {
711            GrantTargetAllSpecification::All => privilege_objects.push(DefaultPrivilegeObject {
712                role_id: target_role,
713                database_id: None,
714                schema_id: None,
715                object_type,
716            }),
717            GrantTargetAllSpecification::AllDatabases { databases } => {
718                for database in databases {
719                    privilege_objects.push(DefaultPrivilegeObject {
720                        role_id: target_role,
721                        database_id: Some(*database.database_id()),
722                        schema_id: None,
723                        object_type,
724                    });
725                }
726            }
727            GrantTargetAllSpecification::AllSchemas { schemas } => {
728                for schema in schemas {
729                    privilege_objects.push(DefaultPrivilegeObject {
730                        role_id: target_role,
731                        database_id: schema.database_spec().id(),
732                        schema_id: Some(schema.schema_spec().into()),
733                        object_type,
734                    });
735                }
736            }
737        }
738    }
739
740    let privilege_acl_items = grant_or_revoke
741        .roles()
742        .into_iter()
743        .map(|grantee| DefaultPrivilegeAclItem {
744            grantee: grantee.id,
745            acl_mode,
746        })
747        .collect();
748
749    let is_grant = match grant_or_revoke {
750        AbbreviatedGrantOrRevokeStatement::Grant(_) => true,
751        AbbreviatedGrantOrRevokeStatement::Revoke(_) => false,
752    };
753
754    Ok(Plan::AlterDefaultPrivileges(AlterDefaultPrivilegesPlan {
755        privilege_objects,
756        privilege_acl_items,
757        is_grant,
758    }))
759}
760
761pub fn describe_reassign_owned(
762    _: &StatementContext,
763    _: ReassignOwnedStatement<Aug>,
764) -> Result<StatementDesc, PlanError> {
765    Ok(StatementDesc::new(None))
766}
767
768pub fn plan_reassign_owned(
769    scx: &StatementContext,
770    ReassignOwnedStatement {
771        old_roles,
772        new_role,
773    }: ReassignOwnedStatement<Aug>,
774) -> Result<Plan, PlanError> {
775    let old_roles: BTreeSet<_> = old_roles.into_iter().map(|role| role.id).collect();
776    let mut reassign_ids: Vec<ObjectId> = Vec::new();
777
778    // Replicas
779    for replica in scx.catalog.get_cluster_replicas() {
780        if old_roles.contains(&replica.owner_id()) {
781            reassign_ids.push((replica.cluster_id(), replica.replica_id()).into());
782        }
783    }
784    // Clusters
785    for cluster in scx.catalog.get_clusters() {
786        if old_roles.contains(&cluster.owner_id()) {
787            reassign_ids.push(cluster.id().into());
788        }
789    }
790    // Items
791    for item in scx.catalog.get_items() {
792        if old_roles.contains(&item.owner_id()) {
793            reassign_ids.push(item.id().into());
794        }
795    }
796    // Schemas
797    for schema in scx.catalog.get_schemas() {
798        if !schema.id().is_temporary() {
799            if old_roles.contains(&schema.owner_id()) {
800                reassign_ids.push((*schema.database(), *schema.id()).into())
801            }
802        }
803    }
804    // Databases
805    for database in scx.catalog.get_databases() {
806        if old_roles.contains(&database.owner_id()) {
807            reassign_ids.push(database.id().into());
808        }
809    }
810
811    let system_ids: Vec<_> = reassign_ids.iter().filter(|id| id.is_system()).collect();
812    if !system_ids.is_empty() {
813        let mut owners = system_ids
814            .into_iter()
815            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
816            .collect::<BTreeSet<_>>()
817            .into_iter()
818            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
819        sql_bail!(
820            "cannot reassign objects owned by role {} because they are required by the database system",
821            owners.join(", "),
822        );
823    }
824
825    Ok(Plan::ReassignOwned(ReassignOwnedPlan {
826        old_roles: old_roles.into_iter().collect(),
827        new_role: new_role.id,
828        reassign_ids,
829    }))
830}